From 489f5a7024646596723879a0d77ce10ff20ad38e Mon Sep 17 00:00:00 2001 From: Johnny Hughes Date: Apr 11 2019 18:23:17 +0000 Subject: Merge #2 `Provide sample MQTT scripts for the new infrastructure` --- diff --git a/mqtt/README.md b/mqtt/README.md new file mode 100644 index 0000000..d47d81d --- /dev/null +++ b/mqtt/README.md @@ -0,0 +1,46 @@ +# MQTT scripts + +The mqtt.git.centos.org server requires authentication. As a result we've provided some client server scripts that will let you protect your keys. + +These can also be used as a basis for building your own MQTT automation scripts. + +## Scripts: + +* send-mqtt-to-dbus.py - Connects the MQTT messages to a dbus interface. + To fully protect your keys you can setup the system bus (a config is provided by --dbus-config) + Then you can have this run as a dedicated user that has access to your keys. + See the `on_mqtt_connect` and `on_mqtt_message` functions for customizing the behavior. + +* listen-on-dbus-for-mqtt-signals.py - Listens to messages sent to dbus and performs an action. + You can set this to run a generic command or customize it to fit your needs. + See the `signal_recieved` function for customizing the behavior. + +* example-safe-command.py - It is an example of how to run a command from listen-on-dbus-for-mqtt-signals.py + +* send-mqtt-to-irc.py - An untested IRC bot that will (in theory) chat out the messages. + +## Systemd Unit: + +Some sample systemd unit files are provided to work with the example scripts. + +NOTE: They require customization before use. + You must at minimum set the User= to a trusted user. + +* listen-on-dbus-for-mqtt-signals.service + You should adjust the path of commands and select a safe command to execute. + +* send-mqtt-to-dbus.service + You should setup the system dbus profile with --dbus-config + +## Container notes: + +It is _not_ considered safe to share the host dbus (system or session) with a container. This can permit the container to escape into the host and violate the security of your system. + +For example, here is how you can reboot a host from dbus if you've got rights. +``` +DBUS_SYSTEM_BUS_ADDRESS=unix:path=/run/dbus/system_bus_socket \ + dbus-send --system --print-reply \ + --dest=org.freedesktop.systemd1 \ + /org/freedesktop/systemd1 \ + org.freedesktop.systemd1.Manager.Reboot +``` diff --git a/mqtt/example-safe-command.py b/mqtt/example-safe-command.py new file mode 100755 index 0000000..cb0fc25 --- /dev/null +++ b/mqtt/example-safe-command.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3.6 +#pylint: disable=line-too-long +# +# Copyright (2019). Fermi Research Alliance, LLC. +# Initial Author: Pat Riehecky +# +''' + Example command to run from listen-on-dbus-for-mqtt-signals.py +''' + +## Uncomment these for python2 support +#from __future__ import unicode_literals +#from __future__ import absolute_import +#from __future__ import print_function + +import json +import sys +import textwrap + +from pprint import pprint + +try: + from argparse import ArgumentParser +except ImportError: # pragma: no cover + print("Please install argparse - rpm: python-argparse", file=sys.stderr) + raise + +########################################## +def setup_args(): + ''' + Setup the argparse object. + + Make sure all fields have defaults so we could use this as an object + ''' + parser = ArgumentParser(description=textwrap.dedent(__doc__)) + + parser.add_argument('signal', help='The dbus signal is set here') + + return parser + +########################################## + +########################################## +########################################## +if __name__ == '__main__': + + PARSER = setup_args() + ARGS = PARSER.parse_args() + + MESSAGE = json.loads(sys.stdin.read()) + print("Your dbus-signal was %s" % ARGS.signal) + print("Your message was decoded as %s (between the lines)" % type(MESSAGE)) + print("------------------------------------------------") + pprint(MESSAGE) + print("------------------------------------------------") diff --git a/mqtt/listen-on-dbus-for-mqtt-signals.py b/mqtt/listen-on-dbus-for-mqtt-signals.py new file mode 100755 index 0000000..79fac93 --- /dev/null +++ b/mqtt/listen-on-dbus-for-mqtt-signals.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3.6 +#pylint: disable=line-too-long +# +# Copyright (2019). Fermi Research Alliance, LLC. +# Initial Author: Pat Riehecky +# +''' + Listen to dbus events +''' + +## Uncomment these for python2 support +#from __future__ import unicode_literals +#from __future__ import absolute_import +#from __future__ import print_function + +import json +import logging +import os.path +import sys +import textwrap + +from subprocess import Popen, PIPE + +DBUS_INTERFACE = 'org.centos.git.mqtt' + +try: + from pydbus import SystemBus, SessionBus + from pydbus.generic import signal +except ImportError: # pragma: no cover + print("Please install pydbus - rpm: python-pydbus", file=sys.stderr) + raise + +try: + from gi.repository.GLib import MainLoop +except ImportError: # pragma: no cover + print("Please install pygobject - rpm: python-gobject", file=sys.stderr) + raise + +try: + from argparse import ArgumentParser +except ImportError: # pragma: no cover + print("Please install argparse - rpm: python-argparse", file=sys.stderr) + raise + +########################################## +def setup_args(): + ''' + Setup the argparse object. + + Make sure all fields have defaults so we could use this as an object + ''' + parser = ArgumentParser(description=textwrap.dedent(__doc__)) + + parser.add_argument('--debug',action='store_true', + help='Print out all debugging actions', + default=False) + parser.add_argument('--dbus-use-system-bus',action='store_true', + help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf', + default=False) + parser.add_argument('--run-command', metavar='', + help='Command to run with message payload. sys.argv[1] will be the DBUS signal name, STDIN will be the payload as json. If no run command, simply print the results to STDOUT.', + default='', type=str) + + return parser + +########################################## + +########################################## +########################################## +if __name__ == '__main__': + + PARSER = setup_args() + ARGS = PARSER.parse_args() + + if ARGS.run_command != '': + if not os.path.exists(ARGS.run_command): + raise ValueError('No such file %s', ARGS.run_command) + + MYLOGGER = logging.getLogger() + + if ARGS.debug: + MYLOGGER.setLevel(logging.DEBUG) + else: + MYLOGGER.setLevel(logging.WARNING) + + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + MYLOGGER.addHandler(handler) + + PROGRAM_NAME = os.path.basename(sys.argv[0]) + MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:]) + + if ARGS.dbus_use_system_bus: + BUS = SystemBus() + else: + BUS = SessionBus() + + def signal_recieved(sender, obj, iface, signal, params): + ''' Define in scope so I can read ARGS ''' + # sanitize all my single quotes + signal_msg = json.dumps(json.loads(params[0])) + + logging.debug("sender:%s object:%s iface:%s signal:%s all_params:%s signal_msg=%s", sender, obj, iface, signal, params, signal_msg) + + logging.debug("Running %s %s < %s", ARGS.run_command, signal, signal_msg) + if ARGS.run_command == '': + print("signal:%s signal_msg:%s" % (signal, signal_msg), file=sys.stderr) + else: + # Or you can customize this to fit your needs + proc = Popen([ARGS.run_command, signal], stdin=PIPE, cwd='/tmp', start_new_session=True, universal_newlines=True) + proc.communicate(input=signal_msg) + proc.wait(timeout=300) + + if ARGS.dbus_use_system_bus: + MYLOGGER.debug('Subscribing to system bus %s', DBUS_INTERFACE) + else: + MYLOGGER.debug('Subscribing to session bus %s', DBUS_INTERFACE) + + BUS.subscribe(iface=DBUS_INTERFACE, signal_fired=signal_recieved) + + # loop forever, until CTRL+C, or something goes wrong + try: + MainLoop().run() + except KeyboardInterrupt: + logging.debug('Got CTRL+C, exiting cleanly') + raise SystemExit diff --git a/mqtt/listen-on-dbus-for-mqtt-signals.service b/mqtt/listen-on-dbus-for-mqtt-signals.service new file mode 100644 index 0000000..4c4094c --- /dev/null +++ b/mqtt/listen-on-dbus-for-mqtt-signals.service @@ -0,0 +1,16 @@ +[Unit] +Description=Listen to org.centos.git.mqtt for actions +After=network.target + +[Service] +Type=simple +PrivateTmp=yes +Restart=on-failure +RestartSec=2s + +#User= +Group=nobody +ExecStart=/usr/local/bin/listen-on-dbus-for-mqtt-signals.py --dbus-use-system-bus --run-command=/usr/local/bin/my_safe_command.py + +[Install] +WantedBy=multi-user.target diff --git a/mqtt/send-mqtt-to-dbus.py b/mqtt/send-mqtt-to-dbus.py new file mode 100755 index 0000000..604cf22 --- /dev/null +++ b/mqtt/send-mqtt-to-dbus.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python3.6 +#pylint: disable=line-too-long +# +# Copyright (2019). Fermi Research Alliance, LLC. +# Initial Author: Pat Riehecky +# +''' + Connect to the MQTT server and convert messages into dbus signals. +''' + +## Uncomment these for python2 support +#from __future__ import unicode_literals +#from __future__ import absolute_import +#from __future__ import print_function + +import datetime +import logging +import json +import os.path +import sys +import random +import textwrap + +DBUS_INTERFACE = 'org.centos.git.mqtt' + +try: + import paho.mqtt.client +except ImportError: # pragma: no cover + print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr) + raise + +try: + from pydbus import SystemBus, SessionBus + from pydbus.generic import signal +except ImportError: # pragma: no cover + print("Please install pydbus - rpm: python-pydbus", file=sys.stderr) + raise + +try: + from gi.repository.GLib import MainLoop +except ImportError: # pragma: no cover + print("Please install pygobject - rpm: python-gobject", file=sys.stderr) + raise + +try: + from argparse import ArgumentParser +except ImportError: # pragma: no cover + print("Please install argparse - rpm: python-argparse", file=sys.stderr) + raise + +########################################## +def setup_args(): + ''' + Setup the argparse object. + + Make sure all fields have defaults so we could use this as an object + ''' + ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert' + user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert' + user_privkey = str(os.path.expanduser('~/')) + '.centos.cert' + + # use a psudo random number for keepalive to help spread out the load + # some time between 1m 30s and 2m 10s + keep_alive = random.randint(90, 130) + + parser = ArgumentParser(description=textwrap.dedent(__doc__)) + + parser.add_argument('--debug',action='store_true', + help='Print out all debugging actions', + default=False) + parser.add_argument('--client-connection-name', metavar='', + help='Use this specific name when connecting. Default is a psudo-random string.', + default='', type=str) + parser.add_argument('--mqtt-server', metavar='', + help='Connect to this MQTT server', + default='mqtt.git.centos.org', type=str) + parser.add_argument('--mqtt-port', metavar='', + help='Connect to MQTT server on this port', + default='8883', type=int) + parser.add_argument('--mqtt-source-ip', metavar='', + help='Connect to MQTT server from this address. Default is any.', + default='', type=str) + parser.add_argument('--mqtt-topic', metavar='', + action='append', nargs='+', type=str, + help='Which MQTT topic should we watch. You may set multiple times.') + parser.add_argument('--mqtt-keepalive', metavar='', + help='Seconds between MQTT keepalive packets.', + default=keep_alive, type=int) + parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl', + help='Should MQTT use SSL? Default is to use SSL (and the SSL port).') + parser.add_argument('--mqtt-server-ca', metavar='', + help='Use this CA cert to validate the MQTT Server.', + default=ca_cert, type=str) + parser.add_argument('--mqtt-client-cert', metavar='', + help='Use this public key to identify yourself.', + default=user_pubkey, type=str) + parser.add_argument('--mqtt-client-key', metavar='', + help='The private key that matches with --mqtt-client-cert .', + default=user_privkey, type=str) + parser.add_argument('--dbus-use-system-bus',action='store_true', + help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf', + default=False) + parser.add_argument('--dbus-config',action='store_true', + help='Just output the SystemBus permissions file and exit', + default=False) + + return parser + +########################################## +class BusMessage(object): + """ + Server_XML definition. + """ + dbus = """ + + + + + + + + """.format(DBUS_INTERFACE) + + # Function does all the work already + message = signal() + +def DbusPermissionsConf(interface_name): + ''' + For the SystemBus you need permission to create endpoints + ''' + import getpass + whoami = getpass.getuser() + + xml = ''' + + + + + + + + + + + + + + + + + + + + + + +''' + return xml.format(interface_name=interface_name, whoami=whoami) + +########################################## +def on_mqtt_message(client, userdata, message): + ''' What should I do if I get a message? ''' + logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8")) + + # Or you can customize this to fit your needs + signal = {message.topic: message.payload.decode("utf-8")} + userdata['emit'].message(json.dumps((signal))) + + logging.debug('Sending signal: %s', json.dumps(signal)) + +def on_mqtt_disconnect(client, userdata, rc): + ''' If you get a connection error, print it out ''' + if rc: + logging.error('Disconnected with error ErrCode:%s', rc) + logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc)) + logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc)) + raise SystemExit + + logging.error('Disconnected from MQTT Server') + +def on_mqtt_connect(client, userdata, flags, rc): + ''' Automatically subscribe to all topics ''' + logging.debug('Connected with status code : %s', rc) + + for topic in userdata['topics']: + client.subscribe(topic) + logging.info('Subscribing to topic %s', topic) + signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())} + userdata['emit'].message(json.dumps(signal)) + +########################################## +########################################## +if __name__ == '__main__': + + PARSER = setup_args() + ARGS = PARSER.parse_args() + + if ARGS.dbus_config: + print(DbusPermissionsConf(DBUS_INTERFACE)) + raise SystemExit + + MYLOGGER = logging.getLogger() + + if ARGS.debug: + MYLOGGER.setLevel(logging.DEBUG) + else: + MYLOGGER.setLevel(logging.WARNING) + + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + MYLOGGER.addHandler(handler) + + PROGRAM_NAME = os.path.basename(sys.argv[0]) + MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:]) + + if ARGS.client_connection_name: + MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port) + else: + MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port) + + CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True) + + if ARGS.mqtt_ssl: + ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca) + if not os.path.exists(ARGS.mqtt_server_ca): + raise ValueError('No such file %s', ARGS.mqtt_server_ca) + + ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert) + if not os.path.exists(ARGS.mqtt_client_cert): + raise ValueError('No such file %s', ARGS.mqtt_client_cert) + + ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key) + if not os.path.exists(ARGS.mqtt_client_key): + raise ValueError('No such file %s', ARGS.mqtt_client_key) + + MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key) + CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key) + + try: + CLIENT.enable_logger(logger=MYLOGGER) + except AttributeError: + # Added in 1.2.x of mqtt library + pass + + CLIENT.on_connect = on_mqtt_connect + CLIENT.on_message = on_mqtt_message + CLIENT.on_disconnect = on_mqtt_disconnect + + CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip) + + DBUS_MESSAGE = BusMessage() + + if not ARGS.mqtt_topic: + ARGS.mqtt_topic = ['git.centos.org/#',] + + CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE}) + + # loop_start will run in background async + CLIENT.loop_start() + + if ARGS.dbus_use_system_bus: + BUS = SystemBus() + else: + BUS = SessionBus() + + if ARGS.dbus_use_system_bus: + MYLOGGER.debug('Publishing to system bus %s', DBUS_INTERFACE) + else: + MYLOGGER.debug('Publishing to session bus %s', DBUS_INTERFACE) + + BUS.publish(DBUS_INTERFACE, DBUS_MESSAGE) + + # loop forever, until CTRL+C, or something goes wrong + try: + MainLoop().run() + except KeyboardInterrupt: + CLIENT.disconnect() + logging.debug('Got CTRL+C, exiting cleanly') + raise SystemExit + except: + CLIENT.disconnect() + raise + finally: + CLIENT.disconnect() diff --git a/mqtt/send-mqtt-to-dbus.service b/mqtt/send-mqtt-to-dbus.service new file mode 100644 index 0000000..813f0a9 --- /dev/null +++ b/mqtt/send-mqtt-to-dbus.service @@ -0,0 +1,25 @@ +[Unit] +Description=Bridge mqtt.git.centos.org to dbus +After=network.target + +[Service] +Type=dbus +BusName=org.centos.git.mqtt +NoNewPrivileges=yes +PrivateTmp=yes +PrivateDevices=yes +DevicePolicy=closed +ProtectSystem=full +ProtectHome=read-only +Restart=on-failure +RestartSec=5s + +# NOTE: You should run this as the user (or edit the file) +# /usr/local/bin/send-mqtt-to-dbus.py --dbus-config > /etc/dbus-1/system.d/org.centos.git.mqtt.conf + +#User= +Group=nobody +ExecStart=/usr/local/bin/send-mqtt-to-dbus.py --dbus-use-system-bus --mqtt-server-ca=~/.centos/ca.pem --mqtt-client-cert=~/.centos/client.pem --mqtt-client-key=~/.centos/client.key + +[Install] +WantedBy=multi-user.target diff --git a/mqtt/send-mqtt-to-irc.py b/mqtt/send-mqtt-to-irc.py new file mode 100755 index 0000000..e0a31c0 --- /dev/null +++ b/mqtt/send-mqtt-to-irc.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3.6 +#pylint: disable=line-too-long +# +# Copyright (2019). Fermi Research Alliance, LLC. +# Initial Author: Pat Riehecky +# +''' + Connect to the MQTT server and convert messages into irc messages. +''' + +## Uncomment these for python2 support +#from __future__ import unicode_literals +#from __future__ import absolute_import +#from __future__ import print_function + +import datetime +import logging +import os.path +import sys +import random +import textwrap + +try: + import paho.mqtt.client +except ImportError: # pragma: no cover + print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr) + raise + +try: + import irc.client +except ImportError: # pragma: no cover + print("Please install irc - pip install --user irc", file=sys.stderr) + raise + +try: + from gi.repository.GLib import MainLoop +except ImportError: # pragma: no cover + print("Please install pygobject - rpm: python-gobject", file=sys.stderr) + raise + +try: + from argparse import ArgumentParser +except ImportError: # pragma: no cover + print("Please install argparse - rpm: python-argparse", file=sys.stderr) + raise + +########################################## +def setup_args(): + ''' + Setup the argparse object. + + Make sure all fields have defaults so we could use this as an object + ''' + ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert' + user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert' + user_privkey = str(os.path.expanduser('~/')) + '.centos.cert' + + # use a psudo random number for keepalive to help spread out the load + # some time between 1m 30s and 2m 10s + keep_alive = random.randint(90, 130) + + parser = ArgumentParser(description=textwrap.dedent(__doc__)) + + parser.add_argument('--debug',action='store_true', + help='Print out all debugging actions', + default=False) + parser.add_argument('--client-connection-name', metavar='', + help='Use this specific name when connecting. Default is a psudo-random string.', + default='', type=str) + parser.add_argument('--mqtt-server', metavar='', + help='Connect to this MQTT server', + default='mqtt.git.centos.org', type=str) + parser.add_argument('--mqtt-port', metavar='', + help='Connect to MQTT server on this port', + default='8883', type=int) + parser.add_argument('--mqtt-source-ip', metavar='', + help='Connect to MQTT server from this address. Default is any.', + default='', type=str) + parser.add_argument('--mqtt-topic', metavar='', + action='append', nargs='+', type=str, + help='Which MQTT topic should we watch. You may set multiple times.') + parser.add_argument('--mqtt-keepalive', metavar='', + help='Seconds between MQTT keepalive packets.', + default=keep_alive, type=int) + parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl', + help='Should MQTT use SSL? Default is to use SSL (and the SSL port).') + parser.add_argument('--mqtt-server-ca', metavar='', + help='Use this CA cert to validate the MQTT Server.', + default=ca_cert, type=str) + parser.add_argument('--mqtt-client-cert', metavar='', + help='Use this public key to identify yourself.', + default=user_pubkey, type=str) + parser.add_argument('--mqtt-client-key', metavar='', + help='The private key that matches with --mqtt-client-cert .', + default=user_privkey, type=str) + parser.add_argument('--irc-server', metavar='', + help='The hostname of your irc server.', + default='chat.freenode.net', type=str) + parser.add_argument('--irc-port', default=6667, type=int, + help='IRC port number.', + parser.add_argument('--irc-bot-name', metavar='', + help='The name of your IRC bot.', + default='testbot__', type=str) + parser.add_argument('--irc-bot-password', metavar='', + help='The password for your IRC bot.', + type=str) + parser.add_argument('--irc-bot-admin', metavar='', + help="The name of your IRC bot's owner.", + default='', type=str) + parser.add_argument('--irc-channel', metavar='', + help="The name of an IRC channel where your bot will psot.", + default='#bot-testing', type=str) + + return parser + +########################################## +class IRCBot(irc.client.SimpleIRCClient): + ''' An IRC bot as an object ''' + def __init__(self, channel, admin_nic): + irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname) + self.target_channel = channel + self.admin_nic = admin_nic + + if self.admin_nic == '': + raise ValueError("You must set an owner for your bot") + + def on_welcome(self, connection, event): + if irc.client.is_channel(self.target_channel): + connection.join(self.target_channel) + self.connection.notice(self.target_channel, "Hello, I am a bot owned by " + self.admin_nic) + + def on_disconnect(self, connection, event): + sys.exit(0) + + def send_message(self, message): + self.connection.notice(self.target, message) + +########################################## +def on_mqtt_message(client, userdata, message): + ''' What should I do if I get a message? ''' + logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8")) + + # Or you can customize this to fit your needs + + logging.debug('Sending signal: %s', signal) + +def on_mqtt_disconnect(client, userdata, rc): + ''' If you get a connection error, print it out ''' + if rc: + logging.error('Disconnected with error ErrCode:%s', rc) + logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc)) + logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc)) + raise SystemExit + + logging.error('Disconnected from MQTT Server') + +def on_mqtt_connect(client, userdata, flags, rc): + ''' Automatically subscribe to all topics ''' + logging.debug('Connected with status code : %s', rc) + + for topic in userdata['topics']: + client.subscribe(topic) + logging.info('Subscribing to topic %s', topic) + signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())} + userdata['emit'].message(str(signal)) + +########################################## +########################################## +if __name__ == '__main__': + + PARSER = setup_args() + ARGS = PARSER.parse_args() + + MYLOGGER = logging.getLogger() + + if ARGS.debug: + MYLOGGER.setLevel(logging.DEBUG) + else: + MYLOGGER.setLevel(logging.WARNING) + + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + MYLOGGER.addHandler(handler) + + PROGRAM_NAME = os.path.basename(sys.argv[0]) + MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:]) + + MYBOT = IRCBot(ARGS.irc_channel, ARGS.irc_bot_name, ARGS.irc_server, ARGS.irc_port, ARGS.irc_channel, ARGS.irc_bot_admin) + + if ARGS.client_connection_name: + MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port) + else: + MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port) + + CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True) + + if ARGS.mqtt_ssl: + ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca) + if not os.path.exists(ARGS.mqtt_server_ca): + raise ValueError('No such file %s', ARGS.mqtt_server_ca) + + ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert) + if not os.path.exists(ARGS.mqtt_client_cert): + raise ValueError('No such file %s', ARGS.mqtt_client_cert) + + ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key) + if not os.path.exists(ARGS.mqtt_client_key): + raise ValueError('No such file %s', ARGS.mqtt_client_key) + + MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key) + CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key) + + try: + CLIENT.enable_logger(logger=MYLOGGER) + except AttributeError: + # Added in 1.2.x of mqtt library + pass + + CLIENT.on_connect = on_mqtt_connect + CLIENT.on_message = on_mqtt_message + CLIENT.on_disconnect = on_mqtt_disconnect + + CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip) + + if not ARGS.mqtt_topic: + ARGS.mqtt_topic = ['git.centos.org/#',] + + CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE}) + + # loop_start will run in background async + CLIENT.loop_start() + + # loop forever, until CTRL+C, or something goes wrong + try: + MainLoop().run() + except KeyboardInterrupt: + CLIENT.disconnect() + logging.debug('Got CTRL+C, exiting cleanly') + raise SystemExit + except: + CLIENT.disconnect() + raise + finally: + CLIENT.disconnect()