|
|
0b0750 |
#!/usr/bin/env python3.6
|
|
|
0b0750 |
#pylint: disable=line-too-long
|
|
|
0b0750 |
#
|
|
|
0b0750 |
# Copyright (2019). Fermi Research Alliance, LLC.
|
|
|
0b0750 |
# Initial Author: Pat Riehecky <riehecky@fnal.gov>
|
|
|
0b0750 |
#
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
Connect to the MQTT server and convert messages into dbus signals.
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
|
|
|
0b0750 |
## Uncomment these for python2 support
|
|
|
0b0750 |
#from __future__ import unicode_literals
|
|
|
0b0750 |
#from __future__ import absolute_import
|
|
|
0b0750 |
#from __future__ import print_function
|
|
|
0b0750 |
|
|
|
0b0750 |
import datetime
|
|
|
0b0750 |
import logging
|
|
|
0b0750 |
import json
|
|
|
0b0750 |
import os.path
|
|
|
0b0750 |
import sys
|
|
|
0b0750 |
import random
|
|
|
0b0750 |
import textwrap
|
|
|
0b0750 |
|
|
|
0b0750 |
DBUS_INTERFACE = 'org.centos.git.mqtt'
|
|
|
0b0750 |
|
|
|
0b0750 |
try:
|
|
|
0b0750 |
import paho.mqtt.client
|
|
|
0b0750 |
except ImportError: # pragma: no cover
|
|
|
0b0750 |
print("Please install paho.mqtt.client - rpm: python-paho-mqtt", file=sys.stderr)
|
|
|
0b0750 |
raise
|
|
|
0b0750 |
|
|
|
0b0750 |
try:
|
|
|
0b0750 |
from pydbus import SystemBus, SessionBus
|
|
|
0b0750 |
from pydbus.generic import signal
|
|
|
0b0750 |
except ImportError: # pragma: no cover
|
|
|
0b0750 |
print("Please install pydbus - rpm: python-pydbus", file=sys.stderr)
|
|
|
0b0750 |
raise
|
|
|
0b0750 |
|
|
|
0b0750 |
try:
|
|
|
0b0750 |
from gi.repository.GLib import MainLoop
|
|
|
0b0750 |
except ImportError: # pragma: no cover
|
|
|
0b0750 |
print("Please install pygobject - rpm: python-gobject", file=sys.stderr)
|
|
|
0b0750 |
raise
|
|
|
0b0750 |
|
|
|
0b0750 |
try:
|
|
|
0b0750 |
from argparse import ArgumentParser
|
|
|
0b0750 |
except ImportError: # pragma: no cover
|
|
|
0b0750 |
print("Please install argparse - rpm: python-argparse", file=sys.stderr)
|
|
|
0b0750 |
raise
|
|
|
0b0750 |
|
|
|
0b0750 |
##########################################
|
|
|
0b0750 |
def setup_args():
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
Setup the argparse object.
|
|
|
0b0750 |
|
|
|
0b0750 |
Make sure all fields have defaults so we could use this as an object
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
ca_cert = str(os.path.expanduser('~/')) + '.centos-server-ca.cert'
|
|
|
0b0750 |
user_pubkey = str(os.path.expanduser('~/')) + '.centos.cert'
|
|
|
0b0750 |
user_privkey = str(os.path.expanduser('~/')) + '.centos.cert'
|
|
|
0b0750 |
|
|
|
0b0750 |
# use a psudo random number for keepalive to help spread out the load
|
|
|
0b0750 |
# some time between 1m 30s and 2m 10s
|
|
|
0b0750 |
keep_alive = random.randint(90, 130)
|
|
|
0b0750 |
|
|
|
0b0750 |
parser = ArgumentParser(description=textwrap.dedent(__doc__))
|
|
|
0b0750 |
|
|
|
0b0750 |
parser.add_argument('--debug',action='store_true',
|
|
|
0b0750 |
help='Print out all debugging actions',
|
|
|
0b0750 |
default=False)
|
|
|
0b0750 |
parser.add_argument('--client-connection-name', metavar='<UNIQUESTRING>',
|
|
|
0b0750 |
help='Use this specific name when connecting. Default is a psudo-random string.',
|
|
|
0b0750 |
default='', type=str)
|
|
|
0b0750 |
parser.add_argument('--mqtt-server', metavar='<HOSTNAME>',
|
|
|
0b0750 |
help='Connect to this MQTT server',
|
|
|
0b0750 |
default='mqtt.git.centos.org', type=str)
|
|
|
0b0750 |
parser.add_argument('--mqtt-port', metavar='<PORTNUMBER>',
|
|
|
0b0750 |
help='Connect to MQTT server on this port',
|
|
|
0b0750 |
default='8883', type=int)
|
|
|
0b0750 |
parser.add_argument('--mqtt-source-ip', metavar='<SOURCE_IP>',
|
|
|
0b0750 |
help='Connect to MQTT server from this address. Default is any.',
|
|
|
0b0750 |
default='', type=str)
|
|
|
0b0750 |
parser.add_argument('--mqtt-topic', metavar='<TOPIC_ID>',
|
|
|
0b0750 |
action='append', nargs='+', type=str,
|
|
|
0b0750 |
help='Which MQTT topic should we watch. You may set multiple times.')
|
|
|
0b0750 |
parser.add_argument('--mqtt-keepalive', metavar='<SECONDS>',
|
|
|
0b0750 |
help='Seconds between MQTT keepalive packets.',
|
|
|
0b0750 |
default=keep_alive, type=int)
|
|
|
0b0750 |
parser.add_argument('--mqtt-no-ssl', action='store_false', dest='mqtt_ssl',
|
|
|
0b0750 |
help='Should MQTT use SSL? Default is to use SSL (and the SSL port).')
|
|
|
0b0750 |
parser.add_argument('--mqtt-server-ca', metavar='<ABSOLUTE_PATH>',
|
|
|
0b0750 |
help='Use this CA cert to validate the MQTT Server.',
|
|
|
0b0750 |
default=ca_cert, type=str)
|
|
|
0b0750 |
parser.add_argument('--mqtt-client-cert', metavar='<ABSOLUTE_PATH>',
|
|
|
0b0750 |
help='Use this public key to identify yourself.',
|
|
|
0b0750 |
default=user_pubkey, type=str)
|
|
|
0b0750 |
parser.add_argument('--mqtt-client-key', metavar='<ABSOLUTE_PATH>',
|
|
|
0b0750 |
help='The private key that matches with --mqtt-client-cert .',
|
|
|
0b0750 |
default=user_privkey, type=str)
|
|
|
0b0750 |
parser.add_argument('--dbus-use-system-bus',action='store_true',
|
|
|
0b0750 |
help='Should we use the global SystemBus or the user SessionBus. The SystemBus requires settings in /etc/dbus-1/system.d/myservice.conf',
|
|
|
0b0750 |
default=False)
|
|
|
0b0750 |
parser.add_argument('--dbus-config',action='store_true',
|
|
|
0b0750 |
help='Just output the SystemBus permissions file and exit',
|
|
|
0b0750 |
default=False)
|
|
|
0b0750 |
|
|
|
0b0750 |
return parser
|
|
|
0b0750 |
|
|
|
0b0750 |
##########################################
|
|
|
0b0750 |
class BusMessage(object):
|
|
|
0b0750 |
"""
|
|
|
0b0750 |
Server_XML definition.
|
|
|
0b0750 |
"""
|
|
|
0b0750 |
dbus = """
|
|
|
0b0750 |
<node>
|
|
|
0b0750 |
<interface name="{}">
|
|
|
0b0750 |
<signal name="message">
|
|
|
0b0750 |
<arg type='s'/>
|
|
|
0b0750 |
</signal>
|
|
|
0b0750 |
</interface>
|
|
|
0b0750 |
</node>
|
|
|
0b0750 |
""".format(DBUS_INTERFACE)
|
|
|
0b0750 |
|
|
|
0b0750 |
# Function does all the work already
|
|
|
0b0750 |
message = signal()
|
|
|
0b0750 |
|
|
|
0b0750 |
def DbusPermissionsConf(interface_name):
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
For the SystemBus you need permission to create endpoints
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
import getpass
|
|
|
0b0750 |
whoami = getpass.getuser()
|
|
|
0b0750 |
|
|
|
0b0750 |
xml = '''
|
|
|
0b0750 |
|
|
|
0b0750 |
|
|
|
0b0750 |
|
|
|
0b0750 |
|
|
|
0b0750 |
"-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
|
|
|
0b0750 |
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
|
|
|
0b0750 |
<busconfig>
|
|
|
0b0750 |
|
|
|
0b0750 |
<policy user="root">
|
|
|
0b0750 |
<allow own="{interface_name}" />
|
|
|
0b0750 |
<allow send_interface="{interface_name}/" />
|
|
|
0b0750 |
</policy>
|
|
|
0b0750 |
|
|
|
0b0750 |
|
|
|
0b0750 |
<policy user="{whoami}">
|
|
|
0b0750 |
<allow own="{interface_name}" />
|
|
|
0b0750 |
<allow send_interface="{interface_name}" send_destination="{interface_name}.message" />
|
|
|
0b0750 |
</policy>
|
|
|
0b0750 |
|
|
|
0b0750 |
<policy context="default">
|
|
|
0b0750 |
<allow receive_interface="{interface_name}" />
|
|
|
0b0750 |
<allow receive_sender="{interface_name}.message" />
|
|
|
0b0750 |
</policy>
|
|
|
0b0750 |
</busconfig>
|
|
|
0b0750 |
'''
|
|
|
0b0750 |
return xml.format(interface_name=interface_name, whoami=whoami)
|
|
|
0b0750 |
|
|
|
0b0750 |
##########################################
|
|
|
0b0750 |
def on_mqtt_message(client, userdata, message):
|
|
|
0b0750 |
''' What should I do if I get a message? '''
|
|
|
0b0750 |
logging.debug('Message received topic:%s payload:%s', message.topic, message.payload.decode("utf-8"))
|
|
|
0b0750 |
|
|
|
0b0750 |
# Or you can customize this to fit your needs
|
|
|
dc7af1 |
signal = {message.topic: json.loads(message.payload.decode("utf-8"))}
|
|
|
0b0750 |
userdata['emit'].message(json.dumps((signal)))
|
|
|
0b0750 |
|
|
|
0b0750 |
logging.debug('Sending signal: %s', json.dumps(signal))
|
|
|
0b0750 |
|
|
|
0b0750 |
def on_mqtt_disconnect(client, userdata, rc):
|
|
|
0b0750 |
''' If you get a connection error, print it out '''
|
|
|
0b0750 |
if rc:
|
|
|
0b0750 |
logging.error('Disconnected with error ErrCode:%s', rc)
|
|
|
0b0750 |
logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.error_string(rc))
|
|
|
0b0750 |
logging.error('ErrCode:%s might be - %s', rc, paho.mqtt.client.connack_string(rc))
|
|
|
0b0750 |
raise SystemExit
|
|
|
0b0750 |
|
|
|
0b0750 |
logging.error('Disconnected from MQTT Server')
|
|
|
0b0750 |
|
|
|
0b0750 |
def on_mqtt_connect(client, userdata, flags, rc):
|
|
|
0b0750 |
''' Automatically subscribe to all topics '''
|
|
|
0b0750 |
logging.debug('Connected with status code : %s', rc)
|
|
|
0b0750 |
|
|
|
0b0750 |
for topic in userdata['topics']:
|
|
|
0b0750 |
client.subscribe(topic)
|
|
|
0b0750 |
logging.info('Subscribing to topic %s', topic)
|
|
|
0b0750 |
signal = {'mqtt.setup': 'Subscribing to topic {} at {}'.format(topic, datetime.datetime.now())}
|
|
|
0b0750 |
userdata['emit'].message(json.dumps(signal))
|
|
|
0b0750 |
|
|
|
0b0750 |
##########################################
|
|
|
0b0750 |
##########################################
|
|
|
0b0750 |
if __name__ == '__main__':
|
|
|
0b0750 |
|
|
|
0b0750 |
PARSER = setup_args()
|
|
|
0b0750 |
ARGS = PARSER.parse_args()
|
|
|
0b0750 |
|
|
|
0b0750 |
if ARGS.dbus_config:
|
|
|
0b0750 |
print(DbusPermissionsConf(DBUS_INTERFACE))
|
|
|
0b0750 |
raise SystemExit
|
|
|
0b0750 |
|
|
|
0b0750 |
MYLOGGER = logging.getLogger()
|
|
|
0b0750 |
|
|
|
0b0750 |
if ARGS.debug:
|
|
|
0b0750 |
MYLOGGER.setLevel(logging.DEBUG)
|
|
|
0b0750 |
else:
|
|
|
0b0750 |
MYLOGGER.setLevel(logging.WARNING)
|
|
|
0b0750 |
|
|
|
0b0750 |
handler = logging.StreamHandler(sys.stderr)
|
|
|
0b0750 |
handler.setLevel(logging.DEBUG)
|
|
|
0b0750 |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
|
0b0750 |
handler.setFormatter(formatter)
|
|
|
0b0750 |
MYLOGGER.addHandler(handler)
|
|
|
0b0750 |
|
|
|
0b0750 |
PROGRAM_NAME = os.path.basename(sys.argv[0])
|
|
|
0b0750 |
MYLOGGER.debug('Running:%s args:%s', PROGRAM_NAME, sys.argv[1:])
|
|
|
0b0750 |
|
|
|
0b0750 |
if ARGS.client_connection_name:
|
|
|
0b0750 |
MYLOGGER.info('Attempting to connect as %s to %s:%s', ARGS.client_connection_name, ARGS.mqtt_server, ARGS.mqtt_port)
|
|
|
0b0750 |
else:
|
|
|
0b0750 |
MYLOGGER.info('Attempting to connect with random name to %s:%s', ARGS.mqtt_server, ARGS.mqtt_port)
|
|
|
0b0750 |
|
|
|
0b0750 |
CLIENT = paho.mqtt.client.Client(client_id=ARGS.client_connection_name, clean_session=True)
|
|
|
0b0750 |
|
|
|
0b0750 |
if ARGS.mqtt_ssl:
|
|
|
0b0750 |
ARGS.mqtt_server_ca = os.path.expanduser(ARGS.mqtt_server_ca)
|
|
|
0b0750 |
if not os.path.exists(ARGS.mqtt_server_ca):
|
|
|
0b0750 |
raise ValueError('No such file %s', ARGS.mqtt_server_ca)
|
|
|
0b0750 |
|
|
|
0b0750 |
ARGS.mqtt_client_cert = os.path.expanduser(ARGS.mqtt_client_cert)
|
|
|
0b0750 |
if not os.path.exists(ARGS.mqtt_client_cert):
|
|
|
0b0750 |
raise ValueError('No such file %s', ARGS.mqtt_client_cert)
|
|
|
0b0750 |
|
|
|
0b0750 |
ARGS.mqtt_client_key = os.path.expanduser(ARGS.mqtt_client_key)
|
|
|
0b0750 |
if not os.path.exists(ARGS.mqtt_client_key):
|
|
|
0b0750 |
raise ValueError('No such file %s', ARGS.mqtt_client_key)
|
|
|
0b0750 |
|
|
|
0b0750 |
MYLOGGER.info('SSL enabled CA=%s PUBKEY=%s PRIVKEY=%s', ARGS.mqtt_server_ca, ARGS.mqtt_client_cert, ARGS.mqtt_client_key)
|
|
|
0b0750 |
CLIENT.tls_set(ca_certs=ARGS.mqtt_server_ca, certfile=ARGS.mqtt_client_cert, keyfile=ARGS.mqtt_client_key)
|
|
|
0b0750 |
|
|
|
0b0750 |
try:
|
|
|
0b0750 |
CLIENT.enable_logger(logger=MYLOGGER)
|
|
|
0b0750 |
except AttributeError:
|
|
|
0b0750 |
# Added in 1.2.x of mqtt library
|
|
|
0b0750 |
pass
|
|
|
0b0750 |
|
|
|
0b0750 |
CLIENT.on_connect = on_mqtt_connect
|
|
|
0b0750 |
CLIENT.on_message = on_mqtt_message
|
|
|
0b0750 |
CLIENT.on_disconnect = on_mqtt_disconnect
|
|
|
0b0750 |
|
|
|
0b0750 |
CLIENT.connect_async(host=ARGS.mqtt_server, port=ARGS.mqtt_port, keepalive=ARGS.mqtt_keepalive, bind_address=ARGS.mqtt_source_ip)
|
|
|
0b0750 |
|
|
|
0b0750 |
DBUS_MESSAGE = BusMessage()
|
|
|
0b0750 |
|
|
|
0b0750 |
if not ARGS.mqtt_topic:
|
|
|
0b0750 |
ARGS.mqtt_topic = ['git.centos.org/#',]
|
|
|
0b0750 |
|
|
|
0b0750 |
CLIENT.user_data_set({'topics': ARGS.mqtt_topic, 'emit': DBUS_MESSAGE})
|
|
|
0b0750 |
|
|
|
0b0750 |
# loop_start will run in background async
|
|
|
0b0750 |
CLIENT.loop_start()
|
|
|
0b0750 |
|
|
|
0b0750 |
if ARGS.dbus_use_system_bus:
|
|
|
0b0750 |
BUS = SystemBus()
|
|
|
0b0750 |
else:
|
|
|
0b0750 |
BUS = SessionBus()
|
|
|
0b0750 |
|
|
|
0b0750 |
if ARGS.dbus_use_system_bus:
|
|
|
0b0750 |
MYLOGGER.debug('Publishing to system bus %s', DBUS_INTERFACE)
|
|
|
0b0750 |
else:
|
|
|
0b0750 |
MYLOGGER.debug('Publishing to session bus %s', DBUS_INTERFACE)
|
|
|
0b0750 |
|
|
|
0b0750 |
BUS.publish(DBUS_INTERFACE, DBUS_MESSAGE)
|
|
|
0b0750 |
|
|
|
0b0750 |
# loop forever, until CTRL+C, or something goes wrong
|
|
|
0b0750 |
try:
|
|
|
0b0750 |
MainLoop().run()
|
|
|
0b0750 |
except KeyboardInterrupt:
|
|
|
0b0750 |
CLIENT.disconnect()
|
|
|
0b0750 |
logging.debug('Got CTRL+C, exiting cleanly')
|
|
|
0b0750 |
raise SystemExit
|
|
|
0b0750 |
except:
|
|
|
0b0750 |
CLIENT.disconnect()
|
|
|
0b0750 |
raise
|
|
|
0b0750 |
finally:
|
|
|
0b0750 |
CLIENT.disconnect()
|