Blob Blame History Raw
From 432870b71d90e80f5a3d66d222c169171a44ec4e Mon Sep 17 00:00:00 2001
From: Aravinda VK <avishwan@redhat.com>
Date: Wed, 17 Aug 2016 13:46:00 +0530
Subject: [PATCH 41/86] eventsapi: Add support for Client side Events

Client side gf_event uses ctx->cmd_args.volfile_server to push
notifications to the eventsd.

Socket server changed from Unix domain socket to UDP to support
external events.

Following to be addressed in different patch
- Port used for eventsd is 24009. Make it configurable
  Already configurable in Server side. Configurable in gf_event API
  is required.
- Auth Token yet to be added as discussed in
  https://www.gluster.org/pipermail/gluster-devel/2016-August/050324.html

> Reviewed-on: http://review.gluster.org/15189
> Smoke: Gluster Build System <jenkins@build.gluster.org>
> Reviewed-by: Prashanth Pai <ppai@redhat.com>
> Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
> CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>

Change-Id: I159acf80b681d10b82d52cfb3ffdf85cb896542d
BUG: 1351589
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/84745
Reviewed-by: Milind Changire <mchangir@redhat.com>
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
---
 events/eventskeygen.py         |    4 +-
 events/src/eventsapiconf.py.in |    8 ++-
 events/src/eventsconfig.json   |    3 +-
 events/src/gf_event.py         |   20 ++++++--
 events/src/glustereventsd.py   |   95 ++++++++++------------------------------
 events/src/peer_eventsapi.py   |    4 ++
 events/src/utils.py            |   28 ++++++++++--
 extras/firewalld/glusterfs.xml |    1 +
 libglusterfs/src/events.c      |   90 ++++++++++++++++++++++++++------------
 9 files changed, 139 insertions(+), 114 deletions(-)

diff --git a/events/eventskeygen.py b/events/eventskeygen.py
index 7357d8e..468c795 100644
--- a/events/eventskeygen.py
+++ b/events/eventskeygen.py
@@ -48,7 +48,9 @@ ERRORS = (
     "EVENT_ERROR_INVALID_INPUTS",
     "EVENT_ERROR_SOCKET",
     "EVENT_ERROR_CONNECT",
-    "EVENT_ERROR_SEND"
+    "EVENT_ERROR_SEND",
+    "EVENT_ERROR_RESOLVE",
+    "EVENT_ERROR_MSG_FORMAT",
 )
 
 if gen_header_type == "C_HEADER":
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
index 03dd0e8..fad96ca 100644
--- a/events/src/eventsapiconf.py.in
+++ b/events/src/eventsapiconf.py.in
@@ -9,7 +9,7 @@
 #  cases as published by the Free Software Foundation.
 #
 
-SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
+SERVER_ADDRESS = "0.0.0.0"
 DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
 CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
 CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
@@ -17,7 +17,9 @@ WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
 WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
 LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
 EVENTSD = "glustereventsd"
-CONFIG_KEYS = ["log_level"]
+CONFIG_KEYS = ["log_level", "port"]
 BOOL_CONFIGS = []
-RESTART_CONFIGS = []
+INT_CONFIGS = ["port"]
+RESTART_CONFIGS = ["port"]
 EVENTS_ENABLED = @EVENTS_ENABLED@
+UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json
index ce2c775..45730f9 100644
--- a/events/src/eventsconfig.json
+++ b/events/src/eventsconfig.json
@@ -1,3 +1,4 @@
 {
-    "log_level": "INFO"
+    "log_level": "INFO",
+    "port": 24009
 }
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
index 20dfc8a..f9ece6a 100644
--- a/events/src/gf_event.py
+++ b/events/src/gf_event.py
@@ -16,7 +16,7 @@ import time
 from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
 from eventtypes import all_events
 
-from utils import logger, setup_logger
+from utils import logger, setup_logger, get_config
 
 # Run this when this lib loads
 setup_logger()
@@ -31,10 +31,9 @@ def gf_event(event_type, **kwargs):
         return
 
     try:
-        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        client.connect(SERVER_ADDRESS)
+        client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     except socket.error as e:
-        logger.error("Unable to connect to events.sock: {0}".format(e))
+        logger.error("Unable to connect to events Server: {0}".format(e))
         return
 
     # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
@@ -45,7 +44,18 @@ def gf_event(event_type, **kwargs):
     # <TIMESTAMP> <EVENT_TYPE> <MSG>
     msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
 
+    port = get_config("port")
+    if port is None:
+        logger.error("Unable to get eventsd port details")
+        return
+
     try:
-        client.sendall(msg)
+        sent = client.sendto(msg, (SERVER_ADDRESS, port))
+        assert sent == len(msg)
     except socket.error as e:
         logger.error("Unable to Send message: {0}".format(e))
+    except AssertionError:
+        logger.error("Unable to send message. Sent: {0}, Actual: {1}".format(
+            sent, len(msg)))
+    finally:
+        client.close()
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
index 3fa5768..91a0743 100644
--- a/events/src/glustereventsd.py
+++ b/events/src/glustereventsd.py
@@ -11,12 +11,10 @@
 #
 
 from __future__ import print_function
-import asyncore
-import socket
-import os
-from multiprocessing import Process, Queue
 import sys
 import signal
+import SocketServer
+import socket
 
 from eventtypes import all_events
 import handlers
@@ -24,26 +22,19 @@ import utils
 from eventsapiconf import SERVER_ADDRESS
 from utils import logger
 
-# Global Queue, EventsHandler will add items to the queue
-# and process_event will gets each item and handles it
-events_queue = Queue()
-events_server_pid = None
 
+class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
 
-def process_event():
-    """
-    Seperate process which handles all the incoming events from Gluster
-    processes.
-    """
-    while True:
-        data = events_queue.get()
-        logger.debug("EVENT: {0}".format(repr(data)))
+    def handle(self):
+        data = self.request[0].strip()
+        logger.debug("EVENT: {0} from {1}".format(repr(data),
+                                                  self.client_address[0]))
         try:
             # Event Format <TIMESTAMP> <TYPE> <DETAIL>
             ts, key, value = data.split(" ", 2)
         except ValueError:
             logger.warn("Invalid Event Format {0}".format(data))
-            continue
+            return
 
         data_dict = {}
         try:
@@ -51,7 +42,7 @@ def process_event():
             data_dict = dict(x.split('=') for x in value.split(';'))
         except ValueError:
             logger.warn("Unable to parse Event {0}".format(data))
-            continue
+            return
 
         try:
             # Event Type to Function Map, Recieved event data will be in
@@ -75,68 +66,28 @@ def process_event():
                 handlers.generic_handler(ts, int(key), data_dict)
 
 
-def process_event_wrapper():
-    try:
-        process_event()
-    except KeyboardInterrupt:
-        return
-
-
-class GlusterEventsHandler(asyncore.dispatcher_with_send):
-
-    def handle_read(self):
-        data = self.recv(8192)
-        if data:
-            events_queue.put(data)
-            self.send(data)
-
-
-class GlusterEventsServer(asyncore.dispatcher):
-
-    def __init__(self):
-        global events_server_pid
-        asyncore.dispatcher.__init__(self)
-        # Start the Events listener process which listens to
-        # the global queue
-        p = Process(target=process_event_wrapper)
-        p.start()
-        events_server_pid = p.pid
-
-        # Create UNIX Domain Socket, bind to path
-        self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        self.bind(SERVER_ADDRESS)
-        self.listen(5)
-
-    def handle_accept(self):
-        pair = self.accept()
-        if pair is not None:
-            sock, addr = pair
-            GlusterEventsHandler(sock)
-
-
 def signal_handler_sigusr2(sig, frame):
-    if events_server_pid is not None:
-        os.kill(events_server_pid, signal.SIGUSR2)
     utils.load_all()
 
 
 def init_event_server():
     utils.setup_logger()
-
-    # Delete Socket file if Exists
-    try:
-        os.unlink(SERVER_ADDRESS)
-    except OSError:
-        if os.path.exists(SERVER_ADDRESS):
-            print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
-                   file=sys.stderr)
-            sys.exit(1)
-
     utils.load_all()
 
-    # Start the Eventing Server, UNIX DOMAIN SOCKET Server
-    GlusterEventsServer()
-    asyncore.loop()
+    port = utils.get_config("port")
+    if port is None:
+        sys.stderr.write("Unable to get Port details from Config\n")
+        sys.exit(1)
+
+    # Start the Eventing Server, UDP Server
+    try:
+        server = SocketServer.ThreadingUDPServer(
+            (SERVER_ADDRESS, port),
+            GlusterEventsRequestHandler)
+    except socket.error as e:
+        sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
+        sys.exit(1)
+    server.serve_forever()
 
 
 def main():
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
index 7887d77..f444778 100644
--- a/events/src/peer_eventsapi.py
+++ b/events/src/peer_eventsapi.py
@@ -31,6 +31,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
                                   EVENTSD,
                                   CONFIG_KEYS,
                                   BOOL_CONFIGS,
+                                  INT_CONFIGS,
                                   RESTART_CONFIGS)
 
 
@@ -462,6 +463,9 @@ class ConfigSetCmd(Cmd):
             if args.name in BOOL_CONFIGS:
                 v = boolify(args.value)
 
+            if args.name in INT_CONFIGS:
+                v = int(args.value)
+
             new_data[args.name] = v
             file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
 
diff --git a/events/src/utils.py b/events/src/utils.py
index 772221a..386e8f2 100644
--- a/events/src/utils.py
+++ b/events/src/utils.py
@@ -17,11 +17,10 @@ import requests
 from eventsapiconf import (LOG_FILE,
                            WEBHOOKS_FILE,
                            DEFAULT_CONFIG_FILE,
-                           CUSTOM_CONFIG_FILE)
+                           CUSTOM_CONFIG_FILE,
+                           UUID_FILE)
 import eventtypes
 
-from gluster.cliutils import get_node_uuid
-
 
 # Webhooks list
 _webhooks = {}
@@ -32,6 +31,23 @@ _config = {}
 
 # Init Logger instance
 logger = logging.getLogger(__name__)
+NodeID = None
+
+
+def get_node_uuid():
+    val = None
+    with open(UUID_FILE) as f:
+        for line in f:
+            if line.startswith("UUID="):
+                val = line.strip().split("=")[-1]
+                break
+    return val
+
+
+def get_config(key):
+    if not _config:
+        load_config()
+    return _config.get(key, None)
 
 
 def get_event_type_name(idx):
@@ -109,8 +125,12 @@ def load_all():
 
 
 def publish(ts, event_key, data):
+    global NodeID
+    if NodeID is None:
+        NodeID = get_node_uuid()
+
     message = {
-        "nodeid": get_node_uuid(),
+        "nodeid": NodeID,
         "ts": int(ts),
         "event": get_event_type_name(event_key),
         "message": data
diff --git a/extras/firewalld/glusterfs.xml b/extras/firewalld/glusterfs.xml
index f8efd90..7e17644 100644
--- a/extras/firewalld/glusterfs.xml
+++ b/extras/firewalld/glusterfs.xml
@@ -4,6 +4,7 @@
 <description>Default ports for gluster-distributed storage</description>
 <port protocol="tcp" port="24007"/>    <!--For glusterd -->
 <port protocol="tcp" port="24008"/>    <!--For glusterd RDMA port management -->
+<port protocol="tcp" port="24009"/>    <!--For glustereventsd -->
 <port protocol="tcp" port="38465"/>    <!--Gluster NFS service -->
 <port protocol="tcp" port="38466"/>    <!--Gluster NFS service -->
 <port protocol="tcp" port="38467"/>    <!--Gluster NFS service -->
diff --git a/libglusterfs/src/events.c b/libglusterfs/src/events.c
index f93934d..b7b513e 100644
--- a/libglusterfs/src/events.c
+++ b/libglusterfs/src/events.c
@@ -16,73 +16,107 @@
 #include <time.h>
 #include <stdarg.h>
 #include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
 #include "syscall.h"
 #include "mem-pool.h"
+#include "glusterfs.h"
+#include "globals.h"
 #include "events.h"
 
 
-#define EVENT_PATH DATADIR "/run/gluster/events.sock"
-#define EVENTS_MSG_MAX 2048
+#define EVENT_HOST "127.0.0.1"
+#define EVENT_PORT 24009
 
 
 int
 gf_event (eventtypes_t event, char *fmt, ...)
 {
-        int      ret                      = 0;
-        int      sock                     = -1;
-        char     eventstr[EVENTS_MSG_MAX] = "";
-        struct   sockaddr_un server;
-        va_list  arguments;
-        char     *msg                     = NULL;
-        size_t   eventstr_size            = 0;
+        int                ret                   = 0;
+        int                sock                  = -1;
+        char              *eventstr              = NULL;
+        struct             sockaddr_in server;
+        va_list            arguments;
+        char              *msg                   = NULL;
+        glusterfs_ctx_t   *ctx                   = NULL;
+        struct hostent    *host_data;
+        char              *host                  = NULL;
+
+        /* Global context */
+        ctx = THIS->ctx;
 
         if (event < 0 || event >= EVENT_LAST) {
                 ret = EVENT_ERROR_INVALID_INPUTS;
                 goto out;
         }
 
-        sock = socket(AF_UNIX, SOCK_STREAM, 0);
+        /* Initialize UDP socket */
+        sock = socket (AF_INET, SOCK_DGRAM, 0);
         if (sock < 0) {
                 ret = EVENT_ERROR_SOCKET;
                 goto out;
         }
-        server.sun_family = AF_UNIX;
-        strcpy(server.sun_path, EVENT_PATH);
 
-        if (connect(sock,
-                    (struct sockaddr *) &server,
-                    sizeof(struct sockaddr_un)) < 0) {
-                ret = EVENT_ERROR_CONNECT;
-                goto out;
+        /* Get Host name to send message */
+        if (ctx && ctx->cmd_args.volfile_server) {
+                /* If it is client code then volfile_server is set
+                   use that information to push the events. */
+                host_data = gethostbyname (ctx->cmd_args.volfile_server);
+                if (host_data == NULL) {
+                        ret = EVENT_ERROR_RESOLVE;
+                        goto out;
+                }
+                host = inet_ntoa (*(struct in_addr *)(host_data->h_addr));
+        } else {
+                /* Localhost, Use the defined IP for localhost */
+                host = EVENT_HOST;
         }
 
+        /* Socket Configurations */
+        server.sin_family = AF_INET;
+        server.sin_port = htons (EVENT_PORT);
+        server.sin_addr.s_addr = inet_addr (host);
+        memset (&server.sin_zero, '\0', sizeof (server.sin_zero));
+
         va_start (arguments, fmt);
         ret = gf_vasprintf (&msg, fmt, arguments);
         va_end (arguments);
+
         if (ret < 0) {
                 ret = EVENT_ERROR_INVALID_INPUTS;
                 goto out;
         }
 
-        eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL),
-                                 event, msg);
+        ret = gf_asprintf (&eventstr, "%u %d %s",
+                            (unsigned)time(NULL), event, msg);
 
-        if (eventstr_size + 1 > EVENTS_MSG_MAX) {
-                eventstr_size = EVENTS_MSG_MAX - 1;
+        if (ret <= 0) {
+                ret = EVENT_ERROR_MSG_FORMAT;
+                goto out;
         }
 
-        snprintf(eventstr, eventstr_size+1, "%u %d %s",
-                 (unsigned)time(NULL), event, msg);
-
-        if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) {
+        /* Send Message */
+        if (sendto (sock, eventstr, strlen (eventstr),
+                    0, (struct sockaddr *)&server, sizeof (server)) <= 0) {
                 ret = EVENT_ERROR_SEND;
-                goto out;
         }
 
         ret = EVENT_SEND_OK;
 
  out:
-        sys_close(sock);
-        GF_FREE(msg);
+        if (sock >= 0) {
+                sys_close (sock);
+        }
+
+        /* Allocated by gf_vasprintf */
+        if (msg)
+                GF_FREE (msg);
+
+        /* Allocated by gf_asprintf */
+        if (eventstr)
+                GF_FREE (eventstr);
+
         return ret;
 }
-- 
1.7.1