Blob Blame History Raw
From 2827f7ad9ecfb9d6ad7f90b5cffbfb59fbd5fb8c Mon Sep 17 00:00:00 2001
From: Aravinda VK <avishwan@redhat.com>
Date: Tue, 29 Nov 2016 16:32:54 +0530
Subject: [PATCH 218/227] eventsapi: Push Messages to Webhooks in parallel

With this patch, glustereventsd will maintain one thread per
webhook. If a webhook is slow, then all the events to that worker
will be delayed but it will not affect the other webhooks.

Note: Webhook in transit may get missed if glustereventsd reloads due to
new Webhook addition or due configuration changes.

>Reviewed-on: http://review.gluster.org/15966
>Smoke: Gluster Build System <jenkins@build.gluster.org>
>Reviewed-by: Prashanth Pai <ppai@redhat.com>
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>

BUG: 1395613
Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/92047
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
Tested-by: Atin Mukherjee <amukherj@redhat.com>
---
 events/src/glustereventsd.py |  2 ++
 events/src/utils.py          | 70 ++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index d057e09..1d01054 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -69,11 +69,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
 
 def signal_handler_sigusr2(sig, frame):
     utils.load_all()
+    utils.restart_webhook_pool()
 
 
 def init_event_server():
     utils.setup_logger()
     utils.load_all()
+    utils.init_webhook_pool()
 
     port = utils.get_config("port")
     if port is None:
diff --git a/events/src/utils.py b/events/src/utils.py
index 2e7bf0e..686fd3a 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -14,6 +14,9 @@ import os
 import logging
 import fcntl
 from errno import ESRCH, EBADF
+from threading import Thread
+import multiprocessing
+from Queue import Queue
 
 from eventsapiconf import (LOG_FILE,
                            WEBHOOKS_FILE,
@@ -34,6 +37,7 @@ _config = {}
 # Init Logger instance
 logger = logging.getLogger(__name__)
 NodeID = None
+webhooks_pool = None
 
 
 def get_node_uuid():
@@ -162,14 +166,13 @@ def autoload_webhooks():
             load_webhooks()
 
 
-def plugin_webhook(message):
+def publish_to_webhook(url, token, message_queue):
     # Import requests here since not used in any other place
     import requests
 
-    message_json = json.dumps(message, sort_keys=True)
-    logger.debug("EVENT: {0}".format(message_json))
-    for url, token in _webhooks.items():
-        http_headers = {"Content-Type": "application/json"}
+    http_headers = {"Content-Type": "application/json"}
+    while True:
+        message_json = message_queue.get()
         if token != "" and token is not None:
             http_headers["Authorization"] = "Bearer " + token
 
@@ -183,6 +186,8 @@ def plugin_webhook(message):
                             event=message_json,
                             error=e))
             continue
+        finally:
+            message_queue.task_done()
 
         if resp.status_code != 200:
             logger.warn("Event push failed to URL: {url}, "
@@ -193,6 +198,12 @@ def plugin_webhook(message):
                             status_code=resp.status_code))
 
 
+def plugin_webhook(message):
+    message_json = json.dumps(message, sort_keys=True)
+    logger.debug("EVENT: {0}".format(message_json))
+    webhooks_pool.send(message_json)
+
+
 class LockedOpen(object):
 
     def __init__(self, filename, *args, **kwargs):
@@ -266,3 +277,52 @@ class PidFile(object):
 
     def __exit__(self, _exc_type, _exc_value, _traceback):
         self.cleanup()
+
+
+def webhook_monitor(proc_queue, webhooks):
+    queues = {}
+    for url, token in webhooks.items():
+        queues[url] = Queue()
+        t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
+        t.start()
+
+    # Get the message sent to Process queue and distribute to all thread queues
+    while True:
+        message = proc_queue.get()
+        for _, q in queues.items():
+            q.put(message)
+
+
+class WebhookThreadPool(object):
+    def start(self):
+        # Seperate process to emit messages to webhooks
+        # which maintains one thread per webhook. Seperate
+        # process is required since on reload we need to stop
+        # and start the thread pool. In Python Threads can't be stopped
+        # so terminate the process and start again. Note: In transit
+        # events will be missed during reload
+        self.queue = multiprocessing.Queue()
+        self.proc = multiprocessing.Process(target=webhook_monitor,
+                                            args=(self.queue, _webhooks))
+        self.proc.start()
+
+    def restart(self):
+        # In transit messages are skipped, since webhooks monitor process
+        # is terminated.
+        self.proc.terminate()
+        self.start()
+
+    def send(self, message):
+        self.queue.put(message)
+
+
+def init_webhook_pool():
+    global webhooks_pool
+    webhooks_pool = WebhookThreadPool()
+    webhooks_pool.start()
+
+
+def restart_webhook_pool():
+    global webhooks_pool
+    if webhooks_pool is not None:
+        webhooks_pool.restart()
-- 
2.9.3