|
|
3604df |
From 2827f7ad9ecfb9d6ad7f90b5cffbfb59fbd5fb8c Mon Sep 17 00:00:00 2001
|
|
|
3604df |
From: Aravinda VK <avishwan@redhat.com>
|
|
|
3604df |
Date: Tue, 29 Nov 2016 16:32:54 +0530
|
|
|
3604df |
Subject: [PATCH 218/227] eventsapi: Push Messages to Webhooks in parallel
|
|
|
3604df |
|
|
|
3604df |
With this patch, glustereventsd will maintain one thread per
|
|
|
3604df |
webhook. If a webhook is slow, then all the events to that worker
|
|
|
3604df |
will be delayed but it will not affect the other webhooks.
|
|
|
3604df |
|
|
|
3604df |
Note: Webhook in transit may get missed if glustereventsd reloads due to
|
|
|
3604df |
new Webhook addition or due configuration changes.
|
|
|
3604df |
|
|
|
3604df |
>Reviewed-on: http://review.gluster.org/15966
|
|
|
3604df |
>Smoke: Gluster Build System <jenkins@build.gluster.org>
|
|
|
3604df |
>Reviewed-by: Prashanth Pai <ppai@redhat.com>
|
|
|
3604df |
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
|
|
|
3604df |
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
|
|
|
3604df |
|
|
|
3604df |
BUG: 1395613
|
|
|
3604df |
Change-Id: I2d11e01c7ac434355bc356ff75396252f51b339b
|
|
|
3604df |
Signed-off-by: Aravinda VK <avishwan@redhat.com>
|
|
|
3604df |
Reviewed-on: https://code.engineering.redhat.com/gerrit/92047
|
|
|
3604df |
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
|
|
|
3604df |
Tested-by: Atin Mukherjee <amukherj@redhat.com>
|
|
|
3604df |
---
|
|
|
3604df |
events/src/glustereventsd.py | 2 ++
|
|
|
3604df |
events/src/utils.py | 70 ++++++++++++++++++++++++++++++++++++++++----
|
|
|
3604df |
2 files changed, 67 insertions(+), 5 deletions(-)
|
|
|
3604df |
|
|
|
3604df |
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
|
|
|
3604df |
index d057e09..1d01054 100644
|
|
|
3604df |
--- a/events/src/glustereventsd.py
|
|
|
3604df |
+++ b/events/src/glustereventsd.py
|
|
|
3604df |
@@ -69,11 +69,13 @@ class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
|
|
|
3604df |
|
|
|
3604df |
def signal_handler_sigusr2(sig, frame):
|
|
|
3604df |
utils.load_all()
|
|
|
3604df |
+ utils.restart_webhook_pool()
|
|
|
3604df |
|
|
|
3604df |
|
|
|
3604df |
def init_event_server():
|
|
|
3604df |
utils.setup_logger()
|
|
|
3604df |
utils.load_all()
|
|
|
3604df |
+ utils.init_webhook_pool()
|
|
|
3604df |
|
|
|
3604df |
port = utils.get_config("port")
|
|
|
3604df |
if port is None:
|
|
|
3604df |
diff --git a/events/src/utils.py b/events/src/utils.py
|
|
|
3604df |
index 2e7bf0e..686fd3a 100644
|
|
|
3604df |
--- a/events/src/utils.py
|
|
|
3604df |
+++ b/events/src/utils.py
|
|
|
3604df |
@@ -14,6 +14,9 @@ import os
|
|
|
3604df |
import logging
|
|
|
3604df |
import fcntl
|
|
|
3604df |
from errno import ESRCH, EBADF
|
|
|
3604df |
+from threading import Thread
|
|
|
3604df |
+import multiprocessing
|
|
|
3604df |
+from Queue import Queue
|
|
|
3604df |
|
|
|
3604df |
from eventsapiconf import (LOG_FILE,
|
|
|
3604df |
WEBHOOKS_FILE,
|
|
|
3604df |
@@ -34,6 +37,7 @@ _config = {}
|
|
|
3604df |
# Init Logger instance
|
|
|
3604df |
logger = logging.getLogger(__name__)
|
|
|
3604df |
NodeID = None
|
|
|
3604df |
+webhooks_pool = None
|
|
|
3604df |
|
|
|
3604df |
|
|
|
3604df |
def get_node_uuid():
|
|
|
3604df |
@@ -162,14 +166,13 @@ def autoload_webhooks():
|
|
|
3604df |
load_webhooks()
|
|
|
3604df |
|
|
|
3604df |
|
|
|
3604df |
-def plugin_webhook(message):
|
|
|
3604df |
+def publish_to_webhook(url, token, message_queue):
|
|
|
3604df |
# Import requests here since not used in any other place
|
|
|
3604df |
import requests
|
|
|
3604df |
|
|
|
3604df |
- message_json = json.dumps(message, sort_keys=True)
|
|
|
3604df |
- logger.debug("EVENT: {0}".format(message_json))
|
|
|
3604df |
- for url, token in _webhooks.items():
|
|
|
3604df |
- http_headers = {"Content-Type": "application/json"}
|
|
|
3604df |
+ http_headers = {"Content-Type": "application/json"}
|
|
|
3604df |
+ while True:
|
|
|
3604df |
+ message_json = message_queue.get()
|
|
|
3604df |
if token != "" and token is not None:
|
|
|
3604df |
http_headers["Authorization"] = "Bearer " + token
|
|
|
3604df |
|
|
|
3604df |
@@ -183,6 +186,8 @@ def plugin_webhook(message):
|
|
|
3604df |
event=message_json,
|
|
|
3604df |
error=e))
|
|
|
3604df |
continue
|
|
|
3604df |
+ finally:
|
|
|
3604df |
+ message_queue.task_done()
|
|
|
3604df |
|
|
|
3604df |
if resp.status_code != 200:
|
|
|
3604df |
logger.warn("Event push failed to URL: {url}, "
|
|
|
3604df |
@@ -193,6 +198,12 @@ def plugin_webhook(message):
|
|
|
3604df |
status_code=resp.status_code))
|
|
|
3604df |
|
|
|
3604df |
|
|
|
3604df |
+def plugin_webhook(message):
|
|
|
3604df |
+ message_json = json.dumps(message, sort_keys=True)
|
|
|
3604df |
+ logger.debug("EVENT: {0}".format(message_json))
|
|
|
3604df |
+ webhooks_pool.send(message_json)
|
|
|
3604df |
+
|
|
|
3604df |
+
|
|
|
3604df |
class LockedOpen(object):
|
|
|
3604df |
|
|
|
3604df |
def __init__(self, filename, *args, **kwargs):
|
|
|
3604df |
@@ -266,3 +277,52 @@ class PidFile(object):
|
|
|
3604df |
|
|
|
3604df |
def __exit__(self, _exc_type, _exc_value, _traceback):
|
|
|
3604df |
self.cleanup()
|
|
|
3604df |
+
|
|
|
3604df |
+
|
|
|
3604df |
+def webhook_monitor(proc_queue, webhooks):
|
|
|
3604df |
+ queues = {}
|
|
|
3604df |
+ for url, token in webhooks.items():
|
|
|
3604df |
+ queues[url] = Queue()
|
|
|
3604df |
+ t = Thread(target=publish_to_webhook, args=(url, token, queues[url]))
|
|
|
3604df |
+ t.start()
|
|
|
3604df |
+
|
|
|
3604df |
+ # Get the message sent to Process queue and distribute to all thread queues
|
|
|
3604df |
+ while True:
|
|
|
3604df |
+ message = proc_queue.get()
|
|
|
3604df |
+ for _, q in queues.items():
|
|
|
3604df |
+ q.put(message)
|
|
|
3604df |
+
|
|
|
3604df |
+
|
|
|
3604df |
+class WebhookThreadPool(object):
|
|
|
3604df |
+ def start(self):
|
|
|
3604df |
+ # Seperate process to emit messages to webhooks
|
|
|
3604df |
+ # which maintains one thread per webhook. Seperate
|
|
|
3604df |
+ # process is required since on reload we need to stop
|
|
|
3604df |
+ # and start the thread pool. In Python Threads can't be stopped
|
|
|
3604df |
+ # so terminate the process and start again. Note: In transit
|
|
|
3604df |
+ # events will be missed during reload
|
|
|
3604df |
+ self.queue = multiprocessing.Queue()
|
|
|
3604df |
+ self.proc = multiprocessing.Process(target=webhook_monitor,
|
|
|
3604df |
+ args=(self.queue, _webhooks))
|
|
|
3604df |
+ self.proc.start()
|
|
|
3604df |
+
|
|
|
3604df |
+ def restart(self):
|
|
|
3604df |
+ # In transit messages are skipped, since webhooks monitor process
|
|
|
3604df |
+ # is terminated.
|
|
|
3604df |
+ self.proc.terminate()
|
|
|
3604df |
+ self.start()
|
|
|
3604df |
+
|
|
|
3604df |
+ def send(self, message):
|
|
|
3604df |
+ self.queue.put(message)
|
|
|
3604df |
+
|
|
|
3604df |
+
|
|
|
3604df |
+def init_webhook_pool():
|
|
|
3604df |
+ global webhooks_pool
|
|
|
3604df |
+ webhooks_pool = WebhookThreadPool()
|
|
|
3604df |
+ webhooks_pool.start()
|
|
|
3604df |
+
|
|
|
3604df |
+
|
|
|
3604df |
+def restart_webhook_pool():
|
|
|
3604df |
+ global webhooks_pool
|
|
|
3604df |
+ if webhooks_pool is not None:
|
|
|
3604df |
+ webhooks_pool.restart()
|
|
|
3604df |
--
|
|
|
3604df |
2.9.3
|
|
|
3604df |
|