From 2827f7ad9ecfb9d6ad7f90b5cffbfb59fbd5fb8c Mon Sep 17 00:00:00 2001 From: Aravinda VK Date: Tue, 29 Nov 2016 16:32:54 +0530 Subject: [PATCH 218/227] eventsapi: Push Messages to Webhooks in parallel With this patch, glustereventsd will maintain one thread per webhook. If a webhook is slow, then all the events to that worker will be delayed but it will not affect the other webhooks. Note: Webhook in transit may get missed if glustereventsd reloads due to new Webhook addition or due configuration changes. >Reviewed-on: http://review.gluster.org/15966 >Smoke: Gluster Build System >Reviewed-by: Prashanth Pai >NetBSD-regression: NetBSD Build System >CentOS-regression: Gluster Build System BUG: 1395613 Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b Signed-off-by: Aravinda VK Reviewed-on: https://code.engineering.redhat.com/gerrit/92047 Reviewed-by: Atin Mukherjee Tested-by: Atin Mukherjee --- events/src/glustereventsd.py | 2 ++ events/src/utils.py | 70 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py index d057e09..1d01054 100644 --- a/events/src/glustereventsd.py +++ b/events/src/glustereventsd.py @@ -69,11 +69,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler): def signal_handler_sigusr2(sig, frame): utils.load_all() + utils.restart_webhook_pool() def init_event_server(): utils.setup_logger() utils.load_all() + utils.init_webhook_pool() port = utils.get_config("port") if port is None: diff --git a/events/src/utils.py b/events/src/utils.py index 2e7bf0e..686fd3a 100644 --- a/events/src/utils.py +++ b/events/src/utils.py @@ -14,6 +14,9 @@ import os import logging import fcntl from errno import ESRCH, EBADF +from threading import Thread +import multiprocessing +from Queue import Queue from eventsapiconf import (LOG_FILE, WEBHOOKS_FILE, @@ -34,6 +37,7 @@ _config = {} # Init Logger instance logger = logging.getLogger(__name__) NodeID = None +webhooks_pool = None def get_node_uuid(): @@ -162,14 +166,13 @@ def autoload_webhooks(): load_webhooks() -def plugin_webhook(message): +def publish_to_webhook(url, token, message_queue): # Import requests here since not used in any other place import requests - message_json = json.dumps(message, sort_keys=True) - logger.debug("EVENT: {0}".format(message_json)) - for url, token in _webhooks.items(): - http_headers = {"Content-Type": "application/json"} + http_headers = {"Content-Type": "application/json"} + while True: + message_json = message_queue.get() if token != "" and token is not None: http_headers["Authorization"] = "Bearer " + token @@ -183,6 +186,8 @@ def plugin_webhook(message): event=message_json, error=e)) continue + finally: + message_queue.task_done() if resp.status_code != 200: logger.warn("Event push failed to URL: {url}, " @@ -193,6 +198,12 @@ def plugin_webhook(message): status_code=resp.status_code)) +def plugin_webhook(message): + message_json = json.dumps(message, sort_keys=True) + logger.debug("EVENT: {0}".format(message_json)) + webhooks_pool.send(message_json) + + class LockedOpen(object): def __init__(self, filename, *args, **kwargs): @@ -266,3 +277,52 @@ class PidFile(object): def __exit__(self, _exc_type, _exc_value, _traceback): self.cleanup() + + +def webhook_monitor(proc_queue, webhooks): + queues = {} + for url, token in webhooks.items(): + queues[url] = Queue() + t = Thread(target=publish_to_webhook, args=(url, token, queues[url])) + t.start() + + # Get the message sent to Process queue and distribute to all thread queues + while True: + message = proc_queue.get() + for _, q in queues.items(): + q.put(message) + + +class WebhookThreadPool(object): + def start(self): + # Seperate process to emit messages to webhooks + # which maintains one thread per webhook. Seperate + # process is required since on reload we need to stop + # and start the thread pool. In Python Threads can't be stopped + # so terminate the process and start again. Note: In transit + # events will be missed during reload + self.queue = multiprocessing.Queue() + self.proc = multiprocessing.Process(target=webhook_monitor, + args=(self.queue, _webhooks)) + self.proc.start() + + def restart(self): + # In transit messages are skipped, since webhooks monitor process + # is terminated. + self.proc.terminate() + self.start() + + def send(self, message): + self.queue.put(message) + + +def init_webhook_pool(): + global webhooks_pool + webhooks_pool = WebhookThreadPool() + webhooks_pool.start() + + +def restart_webhook_pool(): + global webhooks_pool + if webhooks_pool is not None: + webhooks_pool.restart() -- 2.9.3