3604df
From 2827f7ad9ecfb9d6ad7f90b5cffbfb59fbd5fb8c Mon Sep 17 00:00:00 2001
3604df
From: Aravinda VK <avishwan@redhat.com>
3604df
Date: Tue, 29 Nov 2016 16:32:54 +0530
3604df
Subject: [PATCH 218/227] eventsapi: Push Messages to Webhooks in parallel
3604df
3604df
With this patch, glustereventsd will maintain one thread per
3604df
webhook. If a webhook is slow, then all the events to that worker
3604df
will be delayed but it will not affect the other webhooks.
3604df
3604df
Note: Webhook in transit may get missed if glustereventsd reloads due to
3604df
new Webhook addition or due configuration changes.
3604df
3604df
>Reviewed-on: http://review.gluster.org/15966
3604df
>Smoke: Gluster Build System <jenkins@build.gluster.org>
3604df
>Reviewed-by: Prashanth Pai <ppai@redhat.com>
3604df
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
3604df
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
3604df
3604df
BUG: 1395613
3604df
Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b
3604df
Signed-off-by: Aravinda VK <avishwan@redhat.com>
3604df
Reviewed-on: https://code.engineering.redhat.com/gerrit/92047
3604df
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
3604df
Tested-by: Atin Mukherjee <amukherj@redhat.com>
3604df
---
3604df
 events/src/glustereventsd.py |  2 ++
3604df
 events/src/utils.py          | 70 ++++++++++++++++++++++++++++++++++++++++----
3604df
 2 files changed, 67 insertions(+), 5 deletions(-)
3604df
3604df
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
3604df
index d057e09..1d01054 100644
3604df
--- a/events/src/glustereventsd.py
3604df
+++ b/events/src/glustereventsd.py
3604df
@@ -69,11 +69,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
3604df
 
3604df
 def signal_handler_sigusr2(sig, frame):
3604df
     utils.load_all()
3604df
+    utils.restart_webhook_pool()
3604df
 
3604df
 
3604df
 def init_event_server():
3604df
     utils.setup_logger()
3604df
     utils.load_all()
3604df
+    utils.init_webhook_pool()
3604df
 
3604df
     port = utils.get_config("port")
3604df
     if port is None:
3604df
diff --git a/events/src/utils.py b/events/src/utils.py
3604df
index 2e7bf0e..686fd3a 100644
3604df
--- a/events/src/utils.py
3604df
+++ b/events/src/utils.py
3604df
@@ -14,6 +14,9 @@ import os
3604df
 import logging
3604df
 import fcntl
3604df
 from errno import ESRCH, EBADF
3604df
+from threading import Thread
3604df
+import multiprocessing
3604df
+from Queue import Queue
3604df
 
3604df
 from eventsapiconf import (LOG_FILE,
3604df
                            WEBHOOKS_FILE,
3604df
@@ -34,6 +37,7 @@ _config = {}
3604df
 # Init Logger instance
3604df
 logger = logging.getLogger(__name__)
3604df
 NodeID = None
3604df
+webhooks_pool = None
3604df
 
3604df
 
3604df
 def get_node_uuid():
3604df
@@ -162,14 +166,13 @@ def autoload_webhooks():
3604df
             load_webhooks()
3604df
 
3604df
 
3604df
-def plugin_webhook(message):
3604df
+def publish_to_webhook(url, token, message_queue):
3604df
     # Import requests here since not used in any other place
3604df
     import requests
3604df
 
3604df
-    message_json = json.dumps(message, sort_keys=True)
3604df
-    logger.debug("EVENT: {0}".format(message_json))
3604df
-    for url, token in _webhooks.items():
3604df
-        http_headers = {"Content-Type": "application/json"}
3604df
+    http_headers = {"Content-Type": "application/json"}
3604df
+    while True:
3604df
+        message_json = message_queue.get()
3604df
         if token != "" and token is not None:
3604df
             http_headers["Authorization"] = "Bearer " + token
3604df
 
3604df
@@ -183,6 +186,8 @@ def plugin_webhook(message):
3604df
                             event=message_json,
3604df
                             error=e))
3604df
             continue
3604df
+        finally:
3604df
+            message_queue.task_done()
3604df
 
3604df
         if resp.status_code != 200:
3604df
             logger.warn("Event push failed to URL: {url}, "
3604df
@@ -193,6 +198,12 @@ def plugin_webhook(message):
3604df
                             status_code=resp.status_code))
3604df
 
3604df
 
3604df
+def plugin_webhook(message):
3604df
+    message_json = json.dumps(message, sort_keys=True)
3604df
+    logger.debug("EVENT: {0}".format(message_json))
3604df
+    webhooks_pool.send(message_json)
3604df
+
3604df
+
3604df
 class LockedOpen(object):
3604df
 
3604df
     def __init__(self, filename, *args, **kwargs):
3604df
@@ -266,3 +277,52 @@ class PidFile(object):
3604df
 
3604df
     def __exit__(self, _exc_type, _exc_value, _traceback):
3604df
         self.cleanup()
3604df
+
3604df
+
3604df
+def webhook_monitor(proc_queue, webhooks):
3604df
+    queues = {}
3604df
+    for url, token in webhooks.items():
3604df
+        queues[url] = Queue()
3604df
+        t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
3604df
+        t.start()
3604df
+
3604df
+    # Get the message sent to Process queue and distribute to all thread queues
3604df
+    while True:
3604df
+        message = proc_queue.get()
3604df
+        for _, q in queues.items():
3604df
+            q.put(message)
3604df
+
3604df
+
3604df
+class WebhookThreadPool(object):
3604df
+    def start(self):
3604df
+        # Seperate process to emit messages to webhooks
3604df
+        # which maintains one thread per webhook. Seperate
3604df
+        # process is required since on reload we need to stop
3604df
+        # and start the thread pool. In Python Threads can't be stopped
3604df
+        # so terminate the process and start again. Note: In transit
3604df
+        # events will be missed during reload
3604df
+        self.queue = multiprocessing.Queue()
3604df
+        self.proc = multiprocessing.Process(target=webhook_monitor,
3604df
+                                            args=(self.queue, _webhooks))
3604df
+        self.proc.start()
3604df
+
3604df
+    def restart(self):
3604df
+        # In transit messages are skipped, since webhooks monitor process
3604df
+        # is terminated.
3604df
+        self.proc.terminate()
3604df
+        self.start()
3604df
+
3604df
+    def send(self, message):
3604df
+        self.queue.put(message)
3604df
+
3604df
+
3604df
+def init_webhook_pool():
3604df
+    global webhooks_pool
3604df
+    webhooks_pool = WebhookThreadPool()
3604df
+    webhooks_pool.start()
3604df
+
3604df
+
3604df
+def restart_webhook_pool():
3604df
+    global webhooks_pool
3604df
+    if webhooks_pool is not None:
3604df
+        webhooks_pool.restart()
3604df
-- 
3604df
2.9.3
3604df