From 2827f7ad9ecfb9d6ad7f90b5cffbfb59fbd5fb8c Mon Sep 17 00:00:00 2001
From: Aravinda VK <avishwan@redhat.com>
Date: Tue, 29 Nov 2016 16:32:54 +0530
Subject: [PATCH 218/227] eventsapi: Push Messages to Webhooks in parallel
With this patch, glustereventsd will maintain one thread per
webhook. If a webhook is slow, then all the events to that worker
will be delayed but it will not affect the other webhooks.
Note: Webhook in transit may get missed if glustereventsd reloads due to
new Webhook addition or due configuration changes.
>Reviewed-on: http://review.gluster.org/15966
>Smoke: Gluster Build System <jenkins@build.gluster.org>
>Reviewed-by: Prashanth Pai <ppai@redhat.com>
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
BUG: 1395613
Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/92047
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
Tested-by: Atin Mukherjee <amukherj@redhat.com>
---
events/src/glustereventsd.py | 2 ++
events/src/utils.py | 70 ++++++++++++++++++++++++++++++++++++++++----
2 files changed, 67 insertions(+), 5 deletions(-)
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index d057e09..1d01054 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -69,11 +69,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
def signal_handler_sigusr2(sig, frame):
utils.load_all()
+ utils.restart_webhook_pool()
def init_event_server():
utils.setup_logger()
utils.load_all()
+ utils.init_webhook_pool()
port = utils.get_config("port")
if port is None:
diff --git a/events/src/utils.py b/events/src/utils.py
index 2e7bf0e..686fd3a 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -14,6 +14,9 @@ import os
import logging
import fcntl
from errno import ESRCH, EBADF
+from threading import Thread
+import multiprocessing
+from Queue import Queue
from eventsapiconf import (LOG_FILE,
WEBHOOKS_FILE,
@@ -34,6 +37,7 @@ _config = {}
# Init Logger instance
logger = logging.getLogger(__name__)
NodeID = None
+webhooks_pool = None
def get_node_uuid():
@@ -162,14 +166,13 @@ def autoload_webhooks():
load_webhooks()
-def plugin_webhook(message):
+def publish_to_webhook(url, token, message_queue):
# Import requests here since not used in any other place
import requests
- message_json = json.dumps(message, sort_keys=True)
- logger.debug("EVENT: {0}".format(message_json))
- for url, token in _webhooks.items():
- http_headers = {"Content-Type": "application/json"}
+ http_headers = {"Content-Type": "application/json"}
+ while True:
+ message_json = message_queue.get()
if token != "" and token is not None:
http_headers["Authorization"] = "Bearer " + token
@@ -183,6 +186,8 @@ def plugin_webhook(message):
event=message_json,
error=e))
continue
+ finally:
+ message_queue.task_done()
if resp.status_code != 200:
logger.warn("Event push failed to URL: {url}, "
@@ -193,6 +198,12 @@ def plugin_webhook(message):
status_code=resp.status_code))
+def plugin_webhook(message):
+ message_json = json.dumps(message, sort_keys=True)
+ logger.debug("EVENT: {0}".format(message_json))
+ webhooks_pool.send(message_json)
+
+
class LockedOpen(object):
def __init__(self, filename, *args, **kwargs):
@@ -266,3 +277,52 @@ class PidFile(object):
def __exit__(self, _exc_type, _exc_value, _traceback):
self.cleanup()
+
+
+def webhook_monitor(proc_queue, webhooks):
+ queues = {}
+ for url, token in webhooks.items():
+ queues[url] = Queue()
+ t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
+ t.start()
+
+ # Get the message sent to Process queue and distribute to all thread queues
+ while True:
+ message = proc_queue.get()
+ for _, q in queues.items():
+ q.put(message)
+
+
+class WebhookThreadPool(object):
+ def start(self):
+ # Seperate process to emit messages to webhooks
+ # which maintains one thread per webhook. Seperate
+ # process is required since on reload we need to stop
+ # and start the thread pool. In Python Threads can't be stopped
+ # so terminate the process and start again. Note: In transit
+ # events will be missed during reload
+ self.queue = multiprocessing.Queue()
+ self.proc = multiprocessing.Process(target=webhook_monitor,
+ args=(self.queue, _webhooks))
+ self.proc.start()
+
+ def restart(self):
+ # In transit messages are skipped, since webhooks monitor process
+ # is terminated.
+ self.proc.terminate()
+ self.start()
+
+ def send(self, message):
+ self.queue.put(message)
+
+
+def init_webhook_pool():
+ global webhooks_pool
+ webhooks_pool = WebhookThreadPool()
+ webhooks_pool.start()
+
+
+def restart_webhook_pool():
+ global webhooks_pool
+ if webhooks_pool is not None:
+ webhooks_pool.restart()
--
2.9.3