#2 Provide sample MQTT scripts for the new infrastructure
Merged 5 years ago by hughesjr. Opened 5 years ago by jcpunk.
jcpunk/centos-git-common mqtt-tools  into  master

file added
+46
@@ -0,0 +1,46 @@ 

+ # MQTT scripts

+ 

+ The mqtt.git.centos.org server requires authentication.  As a result we've provided some client server scripts that will let you protect your keys.

+ 

+ These can also be used as a basis for building your own MQTT automation scripts.

+ 

+ ## Scripts:

+ 

+ * send-mqtt-to-dbus.py - Connects the MQTT messages to a dbus interface.

+     To fully protect your keys you can setup the system bus (a config is provided by --dbus-config)

+     Then you can have this run as a dedicated user that has access to your keys.

+     See the `on_mqtt_connect` and `on_mqtt_message` functions for customizing the behavior.

+ 

+ * listen-on-dbus-for-mqtt-signals.py - Listens to messages sent to dbus and performs an action.

+     You can set this to run a generic command or customize it to fit your needs.

+     See the `signal_recieved` function for customizing the behavior.

+ 

+ * example-safe-command.py - It is an example of how to run a command from listen-on-dbus-for-mqtt-signals.py

+ 

+ * send-mqtt-to-irc.py - An untested IRC bot that will (in theory) chat out the messages.

+ 

+ ## Systemd Unit:

+ 

+ Some sample systemd unit files are provided to work with the example scripts.

+ 

+ NOTE: They require customization before use.

+       You must at minimum set the User= to a trusted user.

+ 

+ * listen-on-dbus-for-mqtt-signals.service

+     You should adjust the path of commands and select a safe command to execute.

+ 

+ * send-mqtt-to-dbus.service

+     You should setup the system dbus profile with --dbus-config

+ 

+ ## Container notes:

+ 

+ It is _not_ considered safe to share the host dbus (system or session) with a container.  This can permit the container to escape into the host and violate the security of your system.

+ 

+ For example, here is how you can reboot a host from dbus if you've got rights.

+ ```

+ DBUS_SYSTEM_BUS_ADDRESS=unix:path=/run/dbus/system_bus_socket \

+   dbus-send --system --print-reply    \

+   --dest=org.freedesktop.systemd1     \

+   /org/freedesktop/systemd1           \

+   org.freedesktop.systemd1.Manager.Reboot

+ ```

@@ -0,0 +1,55 @@ 

+ #!/usr/bin/env python3.6

+ #pylint: disable=line-too-long

+ #

+ #  Copyright (2019).  Fermi Research Alliance, LLC.

+ #  Initial Author: Pat Riehecky <riehecky@fnal.gov>

+ #

+ '''

+     Example command to run from listen-on-dbus-for-mqtt-signals.py

+ '''

+ 

+ ## Uncomment these for python2 support

+ #from __future__ import unicode_literals

+ #from __future__ import absolute_import

+ #from __future__ import print_function

+ 

+ import json

+ import sys

+ import textwrap

+ 

+ from pprint import pprint

+ 

+ try:

+     from argparse import ArgumentParser

+ except ImportError:  # pragma: no cover

+     print("Please install argparse - rpm: python-argparse", file=sys.stderr)

+     raise

+ 

+ ##########################################

+ def setup_args():

+     '''

+         Setup the argparse object.

+ 

+         Make sure all fields have defaults so we could use this as an object

+     '''

+     parser = ArgumentParser(description=textwrap.dedent(__doc__))

+ 

+     parser.add_argument('signal', help='The dbus signal is set here')

+ 

+     return parser

+ 

+ ##########################################

+ 

+ ##########################################

+ ##########################################

+ if __name__ == '__main__':

+ 

+     PARSER = setup_args()

+     ARGS = PARSER.parse_args()

+ 

+     MESSAGE = json.loads(sys.stdin.read())

+     print("Your dbus-signal was %s" % ARGS.signal)

+     print("Your message was decoded as %s (between the lines)" % type(MESSAGE))

+     print("------------------------------------------------")

+     pprint(MESSAGE)

+     print("------------------------------------------------")

@@ -0,0 +1,128 @@ 

+ #!/usr/bin/env python3.6

+ #pylint: disable=line-too-long

+ #

+ #  Copyright (2019).  Fermi Research Alliance, LLC.

+ #  Initial Author: Pat Riehecky <riehecky@fnal.gov>

+ #

+ '''

+     Listen to dbus events

+ '''

+ 

+ ## Uncomment these for python2 support

+ #from __future__ import unicode_literals

+ #from __future__ import absolute_import

+ #from __future__ import print_function

+ 

+ import json

+ import logging

+ import os.path

+ import sys

+ import textwrap

+ 

+ from subprocess import Popen, PIPE

+ 

+ DBUS_INTERFACE = 'org.centos.git.mqtt'

+ 

+ try:

+     from pydbus import SystemBus, SessionBus

+     from pydbus.generic import signal

+ except ImportError:  # pragma: no cover

+     print("Please install pydbus - rpm: python-pydbus", file=sys.stderr)

+     raise

+ 

+ try:

+     from gi.repository.GLib import MainLoop

+ except ImportError:  # pragma: no cover

+     print("Please install pygobject - rpm: python-gobject", file=sys.stderr)

+     raise

+ 

+ try:

+     from argparse import ArgumentParser

+ except ImportError:  # pragma: no cover

+     print("Please install argparse - rpm: python-argparse", file=sys.stderr)

+     raise

+ 

+ ##########################################

+ def setup_args():

+     '''

+         Setup the argparse object.

+ 

+         Make sure all fields have defaults so we could use this as an object

+     '''

+     parser = ArgumentParser(description=textwrap.dedent(__doc__))

+ 

+     parser.add_argument('--debug',action='store_true',

+                         help='Print out all debugging actions',

+                         default=False)

+     parser.add_argument('--dbus-use-system-bus',action='store_true',

+                         help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf',

+                         default=False)

+     parser.add_argument('--run-command', metavar='<ABSOLUTE_PATH>',

+                         help='Command to run with message payload. sys.argv[1] will be the DBUS signal name, STDIN will be the payload as json. If no run command, simply print the results to STDOUT.',

+                         default='', type=str)

+ 

+     return parser

+ 

+ ##########################################

+ 

+ ##########################################

+ ##########################################

+ if __name__ == '__main__':

+ 

+     PARSER = setup_args()

+     ARGS = PARSER.parse_args()

+ 

+     if ARGS.run_command != '':

+         if not os.path.exists(ARGS.run_command):

+             raise ValueError('No such file %s', ARGS.run_command)

+ 

+     MYLOGGER = logging.getLogger()

+ 

+     if ARGS.debug:

+         MYLOGGER.setLevel(logging.DEBUG)

+     else:

+         MYLOGGER.setLevel(logging.WARNING)

+ 

+     handler = logging.StreamHandler(sys.stderr)

+     handler.setLevel(logging.DEBUG)

+     formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

+     handler.setFormatter(formatter)

+     MYLOGGER.addHandler(handler)

+ 

+     PROGRAM_NAME = os.path.basename(sys.argv[0])

+     MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])

+ 

+     if ARGS.dbus_use_system_bus:

+         BUS = SystemBus()

+     else:

+         BUS = SessionBus()

+ 

+     def signal_recieved(sender, obj, iface, signal, params):

+         ''' Define in scope so I can read ARGS '''

+         # sanitize all my single quotes

+         signal_msg = json.dumps(json.loads(params[0]))

+ 

+         logging.debug("sender:%s object:%s iface:%s signal:%s all_params:%s signal_msg=%s", sender, obj, iface, signal, params, signal_msg)

+ 

+         logging.debug("Running %s %s < %s", ARGS.run_command, signal, signal_msg)

+         if ARGS.run_command == '':

+             print("signal:%s signal_msg:%s" % (signal, signal_msg), file=sys.stderr)

+         else:

+             # Or you can customize this to fit your needs

+             proc = Popen([ARGS.run_command, signal], stdin=PIPE, cwd='/tmp', start_new_session=True, universal_newlines=True)

+             proc.communicate(input=signal_msg)

+             proc.wait(timeout=300)

+ 

+     if ARGS.dbus_use_system_bus:

+         MYLOGGER.debug('Subscribing to system bus %s', DBUS_INTERFACE)

+     else:

+         MYLOGGER.debug('Subscribing to session bus %s', DBUS_INTERFACE)

+ 

+     BUS.subscribe(iface=DBUS_INTERFACE, signal_fired=signal_recieved)

+ 

+     # loop forever, until CTRL+C, or something goes wrong

+     try:

+         MainLoop().run()

+     except KeyboardInterrupt:

+         logging.debug('Got CTRL+C, exiting cleanly')

+         raise SystemExit

@@ -0,0 +1,16 @@ 

+ [Unit]

+ Description=Listen to org.centos.git.mqtt for actions

+ After=network.target

+ 

+ [Service]

+ Type=simple

+ PrivateTmp=yes

+ Restart=on-failure

+ RestartSec=2s

+ 

+ #User=<TRUSTEDUSERNAME>

+ Group=nobody

+ ExecStart=/usr/local/bin/listen-on-dbus-for-mqtt-signals.py --dbus-use-system-bus --run-command=/usr/local/bin/my_safe_command.py

+ 

+ [Install]

+ WantedBy=multi-user.target

@@ -0,0 +1,288 @@ 

+ #!/usr/bin/env python3.6

+ #pylint: disable=line-too-long

+ #

+ #  Copyright (2019).  Fermi Research Alliance, LLC.

+ #  Initial Author: Pat Riehecky <riehecky@fnal.gov>

+ #

+ '''

+     Connect to the MQTT server and convert messages into dbus signals.

+ '''

+ 

+ ## Uncomment these for python2 support

+ #from __future__ import unicode_literals

+ #from __future__ import absolute_import

+ #from __future__ import print_function

+ 

+ import datetime

+ import logging

+ import json

+ import os.path

+ import sys

+ import random

+ import textwrap

+ 

+ DBUS_INTERFACE = 'org.centos.git.mqtt'

+ 

+ try:

+     import paho.mqtt.client

+ except ImportError:  # pragma: no cover

+     print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr)

+     raise

+ 

+ try:

+     from pydbus import SystemBus, SessionBus

+     from pydbus.generic import signal

+ except ImportError:  # pragma: no cover

+     print("Please install pydbus - rpm: python-pydbus", file=sys.stderr)

+     raise

+ 

+ try:

+     from gi.repository.GLib import MainLoop

+ except ImportError:  # pragma: no cover

+     print("Please install pygobject - rpm: python-gobject", file=sys.stderr)

+     raise

+ 

+ try:

+     from argparse import ArgumentParser

+ except ImportError:  # pragma: no cover

+     print("Please install argparse - rpm: python-argparse", file=sys.stderr)

+     raise

+ 

+ ##########################################

+ def setup_args():

+     '''

+         Setup the argparse object.

+ 

+         Make sure all fields have defaults so we could use this as an object

+     '''

+     ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert'

+     user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert'

+     user_privkey = str(os.path.expanduser('~/')) + '.centos.cert'

+ 

+     # use a psudo random number for keepalive to help spread out the load

+     #  some time between 1m 30s and 2m 10s

+     keep_alive = random.randint(90, 130)

+ 

+     parser = ArgumentParser(description=textwrap.dedent(__doc__))

+ 

+     parser.add_argument('--debug',action='store_true',

+                         help='Print out all debugging actions',

+                         default=False)

+     parser.add_argument('--client-connection-name', metavar='<UNIQUESTRING>',

+                         help='Use this specific name when connecting. Default is a psudo-random string.',

+                         default='', type=str)

+     parser.add_argument('--mqtt-server', metavar='<HOSTNAME>',

+                         help='Connect to this MQTT server',

+                         default='mqtt.git.centos.org', type=str)

+     parser.add_argument('--mqtt-port', metavar='<PORTNUMBER>',

+                         help='Connect to MQTT server on this port',

+                         default='8883', type=int)

+     parser.add_argument('--mqtt-source-ip', metavar='<SOURCE_IP>',

+                         help='Connect to MQTT server from this address. Default is any.',

+                         default='', type=str)

+     parser.add_argument('--mqtt-topic', metavar='<TOPIC_ID>',

+                         action='append', nargs='+', type=str,

+                         help='Which MQTT topic should we watch. You may set multiple times.')

+     parser.add_argument('--mqtt-keepalive', metavar='<SECONDS>',

+                         help='Seconds between MQTT keepalive packets.',

+                         default=keep_alive, type=int)

+     parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl',

+                         help='Should MQTT use SSL? Default is to use SSL (and the SSL port).')

+     parser.add_argument('--mqtt-server-ca', metavar='<ABSOLUTE_PATH>',

+                         help='Use this CA cert to validate the MQTT Server.',

+                         default=ca_cert, type=str)

+     parser.add_argument('--mqtt-client-cert', metavar='<ABSOLUTE_PATH>',

+                         help='Use this public key to identify yourself.',

+                         default=user_pubkey, type=str)

+     parser.add_argument('--mqtt-client-key', metavar='<ABSOLUTE_PATH>',

+                         help='The private key that matches with --mqtt-client-cert .',

+                         default=user_privkey, type=str)

+     parser.add_argument('--dbus-use-system-bus',action='store_true',

+                         help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf',

+                         default=False)

+     parser.add_argument('--dbus-config',action='store_true',

+                         help='Just output the SystemBus permissions file and exit',

+                         default=False)

+ 

+     return parser

+ 

+ ##########################################

+ class BusMessage(object):

+     """

+     Server_XML definition.

+     """

+     dbus = """<?xml version="1.0" encoding="UTF-8" ?>

+     <node>

+         <interface name="{}">

+             <signal name="message">

+                 <arg type='s'/>

+             </signal>

+         </interface>

+     </node>

+     """.format(DBUS_INTERFACE)

+ 

+     # Function does all the work already

+     message = signal()

+ 

+ def DbusPermissionsConf(interface_name):

+     '''

+         For the SystemBus you need permission to create endpoints

+     '''

+     import getpass

+     whoami = getpass.getuser()

+ 

+     xml = '''<?xml version="1.0" encoding="UTF-8" ?>

+ <!-- Copy me into /etc/dbus-1/system.d/{interface_name}.conf and reload dbus -->

+ <!--  You can send the signal with: dbus-send &dash;&dash;system &dash;&dash;type=signal / org.centos.git.mqtt.message 'string:{{"test": ["1", "2"]}}' -->

+ <!--  You can watch bus with dbus-monitor &dash;&dash;system 'interface=org.centos.git.mqtt' -->

+ <!DOCTYPE busconfig PUBLIC

+           "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"

+           "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">

+ <busconfig>

+     <!-- Always allow root to do anything -->

+     <policy user="root">

+         <allow own="{interface_name}" />

+         <allow send_interface="{interface_name}/" />

+     </policy>

+     <!-- Allow a single non-root user to setup interface and send to endpoint -->

+     <!--  You can change this to group='somegroup' if you desire -->

+     <policy user="{whoami}">

+         <allow own="{interface_name}" />

+         <allow send_interface="{interface_name}" send_destination="{interface_name}.message" />

+     </policy>

+     <!-- Always allow anyone to listen  -->

+     <policy context="default">

+         <allow receive_interface="{interface_name}" />

+         <allow receive_sender="{interface_name}.message" />

+     </policy>

+ </busconfig>

+ '''

+     return xml.format(interface_name=interface_name, whoami=whoami)

+ 

+ ##########################################

+ def on_mqtt_message(client, userdata, message):

+     ''' What should I do if I get a message? '''

+     logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8"))

+ 

+     # Or you can customize this to fit your needs

+     signal = {message.topic: message.payload.decode("utf-8")}

+     userdata['emit'].message(json.dumps((signal)))

+ 

+     logging.debug('Sending signal: %s', json.dumps(signal))

+ 

+ def on_mqtt_disconnect(client, userdata, rc):

+     ''' If you get a connection error, print it out '''

+     if rc:

+         logging.error('Disconnected with error ErrCode:%s', rc)

+         logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc))

+         logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc))

+         raise SystemExit

+ 

+     logging.error('Disconnected from MQTT Server')

+ 

+ def on_mqtt_connect(client, userdata, flags, rc):

+     ''' Automatically subscribe to all topics '''

+     logging.debug('Connected with status code : %s', rc)

+ 

+     for topic in userdata['topics']:

+         client.subscribe(topic)

+         logging.info('Subscribing to topic %s', topic)

+         signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())}

+         userdata['emit'].message(json.dumps(signal))

+ 

+ ##########################################

+ ##########################################

+ if __name__ == '__main__':

+ 

+     PARSER = setup_args()

+     ARGS = PARSER.parse_args()

+ 

+     if ARGS.dbus_config:

+         print(DbusPermissionsConf(DBUS_INTERFACE))

+         raise SystemExit

+ 

+     MYLOGGER = logging.getLogger()

+ 

+     if ARGS.debug:

+         MYLOGGER.setLevel(logging.DEBUG)

+     else:

+         MYLOGGER.setLevel(logging.WARNING)

+ 

+     handler = logging.StreamHandler(sys.stderr)

+     handler.setLevel(logging.DEBUG)

+     formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

+     handler.setFormatter(formatter)

+     MYLOGGER.addHandler(handler)

+ 

+     PROGRAM_NAME = os.path.basename(sys.argv[0])

+     MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])

+ 

+     if ARGS.client_connection_name:

+         MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port)

+     else:

+         MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port)

+ 

+     CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True)

+ 

+     if ARGS.mqtt_ssl:

+         ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca)

+         if not os.path.exists(ARGS.mqtt_server_ca):

+             raise ValueError('No such file %s', ARGS.mqtt_server_ca)

+ 

+         ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert)

+         if not os.path.exists(ARGS.mqtt_client_cert):

+             raise ValueError('No such file %s', ARGS.mqtt_client_cert)

+ 

+         ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key)

+         if not os.path.exists(ARGS.mqtt_client_key):

+             raise ValueError('No such file %s', ARGS.mqtt_client_key)

+ 

+         MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key)

+         CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key)

+ 

+     try:

+         CLIENT.enable_logger(logger=MYLOGGER)

+     except AttributeError:

+         # Added in 1.2.x of mqtt library

+         pass

+ 

+     CLIENT.on_connect = on_mqtt_connect

+     CLIENT.on_message = on_mqtt_message

+     CLIENT.on_disconnect = on_mqtt_disconnect

+ 

+     CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip)

+ 

+     DBUS_MESSAGE = BusMessage()

+ 

+     if not ARGS.mqtt_topic:

+         ARGS.mqtt_topic = ['git.centos.org/#',]

+ 

+     CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE})

+ 

+     # loop_start will run in background async

+     CLIENT.loop_start()

+ 

+     if ARGS.dbus_use_system_bus:

+         BUS = SystemBus()

+     else:

+         BUS = SessionBus()

+ 

+     if ARGS.dbus_use_system_bus:

+         MYLOGGER.debug('Publishing to system bus %s', DBUS_INTERFACE)

+     else:

+         MYLOGGER.debug('Publishing to session bus %s', DBUS_INTERFACE)

+ 

+     BUS.publish(DBUS_INTERFACE, DBUS_MESSAGE)

+ 

+     # loop forever, until CTRL+C, or something goes wrong

+     try:

+         MainLoop().run()

+     except KeyboardInterrupt:

+         CLIENT.disconnect()

+         logging.debug('Got CTRL+C, exiting cleanly')

+         raise SystemExit

+     except:

+         CLIENT.disconnect()

+         raise

+     finally:

+         CLIENT.disconnect()

@@ -0,0 +1,25 @@ 

+ [Unit]

+ Description=Bridge mqtt.git.centos.org to dbus

+ After=network.target

+ 

+ [Service]

+ Type=dbus

+ BusName=org.centos.git.mqtt

+ NoNewPrivileges=yes

+ PrivateTmp=yes

+ PrivateDevices=yes

+ DevicePolicy=closed

+ ProtectSystem=full

+ ProtectHome=read-only

+ Restart=on-failure

+ RestartSec=5s

+ 

+ # NOTE: You should run this as the user (or edit the file)

+ #  /usr/local/bin/send-mqtt-to-dbus.py --dbus-config > /etc/dbus-1/system.d/org.centos.git.mqtt.conf

+ 

+ #User=<TRUSTEDUSERNAME>

+ Group=nobody

+ ExecStart=/usr/local/bin/send-mqtt-to-dbus.py --dbus-use-system-bus --mqtt-server-ca=~/.centos/ca.pem --mqtt-client-cert=~/.centos/client.pem --mqtt-client-key=~/.centos/client.key

+ 

+ [Install]

+ WantedBy=multi-user.target

@@ -0,0 +1,246 @@ 

+ #!/usr/bin/env python3.6

+ #pylint: disable=line-too-long

+ #

+ #  Copyright (2019).  Fermi Research Alliance, LLC.

+ #  Initial Author: Pat Riehecky <riehecky@fnal.gov>

+ #

+ '''

+     Connect to the MQTT server and convert messages into irc messages.

+ '''

+ 

+ ## Uncomment these for python2 support

+ #from __future__ import unicode_literals

+ #from __future__ import absolute_import

+ #from __future__ import print_function

+ 

+ import datetime

+ import logging

+ import os.path

+ import sys

+ import random

+ import textwrap

+ 

+ try:

+     import paho.mqtt.client

+ except ImportError:  # pragma: no cover

+     print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr)

+     raise

+ 

+ try:

+     import irc.client

+ except ImportError:  # pragma: no cover

+     print("Please install irc - pip install --user irc", file=sys.stderr)

+     raise

+ 

+ try:

+     from gi.repository.GLib import MainLoop

+ except ImportError:  # pragma: no cover

+     print("Please install pygobject - rpm: python-gobject", file=sys.stderr)

+     raise

+ 

+ try:

+     from argparse import ArgumentParser

+ except ImportError:  # pragma: no cover

+     print("Please install argparse - rpm: python-argparse", file=sys.stderr)

+     raise

+ 

+ ##########################################

+ def setup_args():

+     '''

+         Setup the argparse object.

+ 

+         Make sure all fields have defaults so we could use this as an object

+     '''

+     ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert'

+     user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert'

+     user_privkey = str(os.path.expanduser('~/')) + '.centos.cert'

+ 

+     # use a psudo random number for keepalive to help spread out the load

+     #  some time between 1m 30s and 2m 10s

+     keep_alive = random.randint(90, 130)

+ 

+     parser = ArgumentParser(description=textwrap.dedent(__doc__))

+ 

+     parser.add_argument('--debug',action='store_true',

+                         help='Print out all debugging actions',

+                         default=False)

+     parser.add_argument('--client-connection-name', metavar='<UNIQUESTRING>',

+                         help='Use this specific name when connecting. Default is a psudo-random string.',

+                         default='', type=str)

+     parser.add_argument('--mqtt-server', metavar='<HOSTNAME>',

+                         help='Connect to this MQTT server',

+                         default='mqtt.git.centos.org', type=str)

+     parser.add_argument('--mqtt-port', metavar='<PORTNUMBER>',

+                         help='Connect to MQTT server on this port',

+                         default='8883', type=int)

+     parser.add_argument('--mqtt-source-ip', metavar='<SOURCE_IP>',

+                         help='Connect to MQTT server from this address. Default is any.',

+                         default='', type=str)

+     parser.add_argument('--mqtt-topic', metavar='<TOPIC_ID>',

+                         action='append', nargs='+', type=str,

+                         help='Which MQTT topic should we watch. You may set multiple times.')

+     parser.add_argument('--mqtt-keepalive', metavar='<SECONDS>',

+                         help='Seconds between MQTT keepalive packets.',

+                         default=keep_alive, type=int)

+     parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl',

+                         help='Should MQTT use SSL? Default is to use SSL (and the SSL port).')

+     parser.add_argument('--mqtt-server-ca', metavar='<ABSOLUTE_PATH>',

+                         help='Use this CA cert to validate the MQTT Server.',

+                         default=ca_cert, type=str)

+     parser.add_argument('--mqtt-client-cert', metavar='<ABSOLUTE_PATH>',

+                         help='Use this public key to identify yourself.',

+                         default=user_pubkey, type=str)

+     parser.add_argument('--mqtt-client-key', metavar='<ABSOLUTE_PATH>',

+                         help='The private key that matches with --mqtt-client-cert .',

+                         default=user_privkey, type=str)

+     parser.add_argument('--irc-server', metavar='<HOSTNAME>',

+                         help='The hostname of your irc server.',

+                         default='chat.freenode.net', type=str)

+     parser.add_argument('--irc-port', default=6667, type=int,

+                         help='IRC port number.',

+     parser.add_argument('--irc-bot-name', metavar='<BOT_NIC>',

+                         help='The name of your IRC bot.',

+                         default='testbot__', type=str)

+     parser.add_argument('--irc-bot-password', metavar='<PASSWORD>',

+                         help='The password for your IRC bot.',

+                         type=str)

+     parser.add_argument('--irc-bot-admin', metavar='<YOUR_NIC>',

+                         help="The name of your IRC bot's owner.",

+                         default='', type=str)

+     parser.add_argument('--irc-channel', metavar='<WHERE_TO_POST>',

+                         help="The name of an IRC channel where your bot will psot.",

+                         default='#bot-testing', type=str)

+ 

+     return parser

+ 

+ ##########################################

+ class IRCBot(irc.client.SimpleIRCClient):

+     ''' An IRC bot as an object '''

+     def __init__(self, channel, admin_nic):

+         irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)

+         self.target_channel = channel

+         self.admin_nic = admin_nic

+ 

+         if self.admin_nic == '':

+             raise ValueError("You must set an owner for your bot")

+ 

+     def on_welcome(self, connection, event):

+         if irc.client.is_channel(self.target_channel):

+             connection.join(self.target_channel)

+             self.connection.notice(self.target_channel, "Hello, I am a bot owned by " + self.admin_nic)

+ 

+     def on_disconnect(self, connection, event):

+         sys.exit(0)

+ 

+     def send_message(self, message):

+         self.connection.notice(self.target, message)

+ 

+ ##########################################

+ def on_mqtt_message(client, userdata, message):

+     ''' What should I do if I get a message? '''

+     logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8"))

+ 

+     # Or you can customize this to fit your needs

+ 

+     logging.debug('Sending signal: %s', signal)

+ 

+ def on_mqtt_disconnect(client, userdata, rc):

+     ''' If you get a connection error, print it out '''

+     if rc:

+         logging.error('Disconnected with error ErrCode:%s', rc)

+         logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc))

+         logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc))

+         raise SystemExit

+ 

+     logging.error('Disconnected from MQTT Server')

+ 

+ def on_mqtt_connect(client, userdata, flags, rc):

+     ''' Automatically subscribe to all topics '''

+     logging.debug('Connected with status code : %s', rc)

+ 

+     for topic in userdata['topics']:

+         client.subscribe(topic)

+         logging.info('Subscribing to topic %s', topic)

+         signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())}

+         userdata['emit'].message(str(signal))

+ 

+ ##########################################

+ ##########################################

+ if __name__ == '__main__':

+ 

+     PARSER = setup_args()

+     ARGS = PARSER.parse_args()

+ 

+     MYLOGGER = logging.getLogger()

+ 

+     if ARGS.debug:

+         MYLOGGER.setLevel(logging.DEBUG)

+     else:

+         MYLOGGER.setLevel(logging.WARNING)

+ 

+     handler = logging.StreamHandler(sys.stderr)

+     handler.setLevel(logging.DEBUG)

+     formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

+     handler.setFormatter(formatter)

+     MYLOGGER.addHandler(handler)

+ 

+     PROGRAM_NAME = os.path.basename(sys.argv[0])

+     MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])

+ 

+     MYBOT = IRCBot(ARGS.irc_channel, ARGS.irc_bot_name, ARGS.irc_server, ARGS.irc_port, ARGS.irc_channel, ARGS.irc_bot_admin)

+ 

+     if ARGS.client_connection_name:

+         MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port)

+     else:

+         MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port)

+ 

+     CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True)

+ 

+     if ARGS.mqtt_ssl:

+         ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca)

+         if not os.path.exists(ARGS.mqtt_server_ca):

+             raise ValueError('No such file %s', ARGS.mqtt_server_ca)

+ 

+         ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert)

+         if not os.path.exists(ARGS.mqtt_client_cert):

+             raise ValueError('No such file %s', ARGS.mqtt_client_cert)

+ 

+         ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key)

+         if not os.path.exists(ARGS.mqtt_client_key):

+             raise ValueError('No such file %s', ARGS.mqtt_client_key)

+ 

+         MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key)

+         CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key)

+ 

+     try:

+         CLIENT.enable_logger(logger=MYLOGGER)

+     except AttributeError:

+         # Added in 1.2.x of mqtt library

+         pass

+ 

+     CLIENT.on_connect = on_mqtt_connect

+     CLIENT.on_message = on_mqtt_message

+     CLIENT.on_disconnect = on_mqtt_disconnect

+ 

+     CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip)

+ 

+     if not ARGS.mqtt_topic:

+         ARGS.mqtt_topic = ['git.centos.org/#',]

+ 

+     CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE})

+ 

+     # loop_start will run in background async

+     CLIENT.loop_start()

+ 

+     # loop forever, until CTRL+C, or something goes wrong

+     try:

+         MainLoop().run()

+     except KeyboardInterrupt:

+         CLIENT.disconnect()

+         logging.debug('Got CTRL+C, exiting cleanly')

+         raise SystemExit

+     except:

+         CLIENT.disconnect()

+         raise

+     finally:

+         CLIENT.disconnect()

This is a handful of various scripts for interacting with the MQTT server. I've provided an example of a client/server model and an all-in-one tool.

These are targeted at python3.6 from EPEL7 but generally run fine on python2.7 (with the __furture__ bits enabled)

Pull-Request has been merged by hughesjr

5 years ago