| |
| |
| |
| |
| |
| |
| ''' |
| Connect to the MQTT server and convert messages into dbus signals. |
| ''' |
| |
| |
| |
| |
| |
| |
| import datetime |
| import logging |
| import json |
| import os.path |
| import sys |
| import random |
| import textwrap |
| |
| DBUS_INTERFACE = 'org.centos.git.mqtt' |
| |
| try: |
| import paho.mqtt.client |
| except ImportError: |
| print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr) |
| raise |
| |
| try: |
| from pydbus import SystemBus, SessionBus |
| from pydbus.generic import signal |
| except ImportError: |
| print("Please install pydbus - rpm: python-pydbus", file=sys.stderr) |
| raise |
| |
| try: |
| from gi.repository.GLib import MainLoop |
| except ImportError: |
| print("Please install pygobject - rpm: python-gobject", file=sys.stderr) |
| raise |
| |
| try: |
| from argparse import ArgumentParser |
| except ImportError: |
| print("Please install argparse - rpm: python-argparse", file=sys.stderr) |
| raise |
| |
| |
| def setup_args(): |
| ''' |
| Setup the argparse object. |
| |
| Make sure all fields have defaults so we could use this as an object |
| ''' |
| ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert' |
| user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert' |
| user_privkey = str(os.path.expanduser('~/')) + '.centos.cert' |
| |
| |
| |
| keep_alive = random.randint(90, 130) |
| |
| parser = ArgumentParser(description=textwrap.dedent(__doc__)) |
| |
| parser.add_argument('--debug',action='store_true', |
| help='Print out all debugging actions', |
| default=False) |
| parser.add_argument('--client-connection-name', metavar='<UNIQUESTRING>', |
| help='Use this specific name when connecting. Default is a psudo-random string.', |
| default='', type=str) |
| parser.add_argument('--mqtt-server', metavar='<HOSTNAME>', |
| help='Connect to this MQTT server', |
| default='mqtt.git.centos.org', type=str) |
| parser.add_argument('--mqtt-port', metavar='<PORTNUMBER>', |
| help='Connect to MQTT server on this port', |
| default='8883', type=int) |
| parser.add_argument('--mqtt-source-ip', metavar='<SOURCE_IP>', |
| help='Connect to MQTT server from this address. Default is any.', |
| default='', type=str) |
| parser.add_argument('--mqtt-topic', metavar='<TOPIC_ID>', |
| action='append', nargs='+', type=str, |
| help='Which MQTT topic should we watch. You may set multiple times.') |
| parser.add_argument('--mqtt-keepalive', metavar='<SECONDS>', |
| help='Seconds between MQTT keepalive packets.', |
| default=keep_alive, type=int) |
| parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl', |
| help='Should MQTT use SSL? Default is to use SSL (and the SSL port).') |
| parser.add_argument('--mqtt-server-ca', metavar='<ABSOLUTE_PATH>', |
| help='Use this CA cert to validate the MQTT Server.', |
| default=ca_cert, type=str) |
| parser.add_argument('--mqtt-client-cert', metavar='<ABSOLUTE_PATH>', |
| help='Use this public key to identify yourself.', |
| default=user_pubkey, type=str) |
| parser.add_argument('--mqtt-client-key', metavar='<ABSOLUTE_PATH>', |
| help='The private key that matches with --mqtt-client-cert .', |
| default=user_privkey, type=str) |
| parser.add_argument('--dbus-use-system-bus',action='store_true', |
| help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf', |
| default=False) |
| parser.add_argument('--dbus-config',action='store_true', |
| help='Just output the SystemBus permissions file and exit', |
| default=False) |
| |
| return parser |
| |
| |
| class BusMessage(object): |
| """ |
| Server_XML definition. |
| """ |
| dbus = """<?xml version="1.0" encoding="UTF-8" ?> |
| <node> |
| <interface name="{}"> |
| <signal name="message"> |
| <arg type='s'/> |
| </signal> |
| </interface> |
| </node> |
| """.format(DBUS_INTERFACE) |
| |
| |
| message = signal() |
| |
| def DbusPermissionsConf(interface_name): |
| ''' |
| For the SystemBus you need permission to create endpoints |
| ''' |
| import getpass |
| whoami = getpass.getuser() |
| |
| xml = '''<?xml version="1.0" encoding="UTF-8" ?> |
| <!-- Copy me into /etc/dbus-1/system.d/{interface_name}.conf and reload dbus --> |
| <!-- You can send the signal with: dbus-send ‐‐system ‐‐type=signal / org.centos.git.mqtt.message 'string:{{"test": ["1", "2"]}}' --> |
| <!-- You can watch bus with dbus-monitor ‐‐system 'interface=org.centos.git.mqtt' --> |
| <!DOCTYPE busconfig PUBLIC |
| "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN" |
| "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd"> |
| <busconfig> |
| <!-- Always allow root to do anything --> |
| <policy user="root"> |
| <allow own="{interface_name}" /> |
| <allow send_interface="{interface_name}/" /> |
| </policy> |
| <!-- Allow a single non-root user to setup interface and send to endpoint --> |
| <!-- You can change this to group='somegroup' if you desire --> |
| <policy user="{whoami}"> |
| <allow own="{interface_name}" /> |
| <allow send_interface="{interface_name}" send_destination="{interface_name}.message" /> |
| </policy> |
| <!-- Always allow anyone to listen --> |
| <policy context="default"> |
| <allow receive_interface="{interface_name}" /> |
| <allow receive_sender="{interface_name}.message" /> |
| </policy> |
| </busconfig> |
| ''' |
| return xml.format(interface_name=interface_name, whoami=whoami) |
| |
| |
| def on_mqtt_message(client, userdata, message): |
| ''' What should I do if I get a message? ''' |
| logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8")) |
| |
| |
| signal = {message.topic: json.loads(message.payload.decode("utf-8"))} |
| userdata['emit'].message(json.dumps((signal))) |
| |
| logging.debug('Sending signal: %s', json.dumps(signal)) |
| |
| def on_mqtt_disconnect(client, userdata, rc): |
| ''' If you get a connection error, print it out ''' |
| if rc: |
| logging.error('Disconnected with error ErrCode:%s', rc) |
| logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc)) |
| logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc)) |
| raise SystemExit |
| |
| logging.error('Disconnected from MQTT Server') |
| |
| def on_mqtt_connect(client, userdata, flags, rc): |
| ''' Automatically subscribe to all topics ''' |
| logging.debug('Connected with status code : %s', rc) |
| |
| for topic in userdata['topics']: |
| client.subscribe(topic) |
| logging.info('Subscribing to topic %s', topic) |
| signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())} |
| userdata['emit'].message(json.dumps(signal)) |
| |
| |
| |
| if __name__ == '__main__': |
| |
| PARSER = setup_args() |
| ARGS = PARSER.parse_args() |
| |
| if ARGS.dbus_config: |
| print(DbusPermissionsConf(DBUS_INTERFACE)) |
| raise SystemExit |
| |
| MYLOGGER = logging.getLogger() |
| |
| if ARGS.debug: |
| MYLOGGER.setLevel(logging.DEBUG) |
| else: |
| MYLOGGER.setLevel(logging.WARNING) |
| |
| handler = logging.StreamHandler(sys.stderr) |
| handler.setLevel(logging.DEBUG) |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
| handler.setFormatter(formatter) |
| MYLOGGER.addHandler(handler) |
| |
| PROGRAM_NAME = os.path.basename(sys.argv[0]) |
| MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:]) |
| |
| if ARGS.client_connection_name: |
| MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port) |
| else: |
| MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port) |
| |
| CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True) |
| |
| if ARGS.mqtt_ssl: |
| ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca) |
| if not os.path.exists(ARGS.mqtt_server_ca): |
| raise ValueError('No such file %s', ARGS.mqtt_server_ca) |
| |
| ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert) |
| if not os.path.exists(ARGS.mqtt_client_cert): |
| raise ValueError('No such file %s', ARGS.mqtt_client_cert) |
| |
| ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key) |
| if not os.path.exists(ARGS.mqtt_client_key): |
| raise ValueError('No such file %s', ARGS.mqtt_client_key) |
| |
| MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key) |
| CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key) |
| |
| try: |
| CLIENT.enable_logger(logger=MYLOGGER) |
| except AttributeError: |
| |
| pass |
| |
| CLIENT.on_connect = on_mqtt_connect |
| CLIENT.on_message = on_mqtt_message |
| CLIENT.on_disconnect = on_mqtt_disconnect |
| |
| CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip) |
| |
| DBUS_MESSAGE = BusMessage() |
| |
| if not ARGS.mqtt_topic: |
| ARGS.mqtt_topic = ['git.centos.org/#',] |
| |
| CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE}) |
| |
| |
| CLIENT.loop_start() |
| |
| if ARGS.dbus_use_system_bus: |
| BUS = SystemBus() |
| else: |
| BUS = SessionBus() |
| |
| if ARGS.dbus_use_system_bus: |
| MYLOGGER.debug('Publishing to system bus %s', DBUS_INTERFACE) |
| else: |
| MYLOGGER.debug('Publishing to session bus %s', DBUS_INTERFACE) |
| |
| BUS.publish(DBUS_INTERFACE, DBUS_MESSAGE) |
| |
| |
| try: |
| MainLoop().run() |
| except KeyboardInterrupt: |
| CLIENT.disconnect() |
| logging.debug('Got CTRL+C, exiting cleanly') |
| raise SystemExit |
| except: |
| CLIENT.disconnect() |
| raise |
| finally: |
| CLIENT.disconnect() |