3604df
From 432870b71d90e80f5a3d66d222c169171a44ec4e Mon Sep 17 00:00:00 2001
3604df
From: Aravinda VK <avishwan@redhat.com>
3604df
Date: Wed, 17 Aug 2016 13:46:00 +0530
3604df
Subject: [PATCH 41/86] eventsapi: Add support for Client side Events
3604df
3604df
Client side gf_event uses ctx->cmd_args.volfile_server to push
3604df
notifications to the eventsd.
3604df
3604df
Socket server changed from Unix domain socket to UDP to support
3604df
external events.
3604df
3604df
Following to be addressed in different patch
3604df
- Port used for eventsd is 24009. Make it configurable
3604df
  Already configurable in Server side. Configurable in gf_event API
3604df
  is required.
3604df
- Auth Token yet to be added as discussed in
3604df
  https://www.gluster.org/pipermail/gluster-devel/2016-August/050324.html
3604df
3604df
> Reviewed-on: http://review.gluster.org/15189
3604df
> Smoke: Gluster Build System <jenkins@build.gluster.org>
3604df
> Reviewed-by: Prashanth Pai <ppai@redhat.com>
3604df
> Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
3604df
> CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
3604df
> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
3604df
3604df
Change-Id: I159acf80b681d10b82d52cfb3ffdf85cb896542d
3604df
BUG: 1351589
3604df
Signed-off-by: Aravinda VK <avishwan@redhat.com>
3604df
Reviewed-on: https://code.engineering.redhat.com/gerrit/84745
3604df
Reviewed-by: Milind Changire <mchangir@redhat.com>
3604df
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
3604df
---
3604df
 events/eventskeygen.py         |    4 +-
3604df
 events/src/eventsapiconf.py.in |    8 ++-
3604df
 events/src/eventsconfig.json   |    3 +-
3604df
 events/src/gf_event.py         |   20 ++++++--
3604df
 events/src/glustereventsd.py   |   95 ++++++++++------------------------------
3604df
 events/src/peer_eventsapi.py   |    4 ++
3604df
 events/src/utils.py            |   28 ++++++++++--
3604df
 extras/firewalld/glusterfs.xml |    1 +
3604df
 libglusterfs/src/events.c      |   90 ++++++++++++++++++++++++++------------
3604df
 9 files changed, 139 insertions(+), 114 deletions(-)
3604df
3604df
diff --git a/events/eventskeygen.py b/events/eventskeygen.py
3604df
index 7357d8e..468c795 100644
3604df
--- a/events/eventskeygen.py
3604df
+++ b/events/eventskeygen.py
3604df
@@ -48,7 +48,9 @@ ERRORS = (
3604df
     "EVENT_ERROR_INVALID_INPUTS",
3604df
     "EVENT_ERROR_SOCKET",
3604df
     "EVENT_ERROR_CONNECT",
3604df
-    "EVENT_ERROR_SEND"
3604df
+    "EVENT_ERROR_SEND",
3604df
+    "EVENT_ERROR_RESOLVE",
3604df
+    "EVENT_ERROR_MSG_FORMAT",
3604df
 )
3604df
 
3604df
 if gen_header_type == "C_HEADER":
3604df
diff --git a/events/src/eventsapiconf.py.in b/events/src/eventsapiconf.py.in
3604df
index 03dd0e8..fad96ca 100644
3604df
--- a/events/src/eventsapiconf.py.in
3604df
+++ b/events/src/eventsapiconf.py.in
3604df
@@ -9,7 +9,7 @@
3604df
 #  cases as published by the Free Software Foundation.
3604df
 #
3604df
 
3604df
-SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
3604df
+SERVER_ADDRESS = "0.0.0.0"
3604df
 DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
3604df
 CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
3604df
 CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
3604df
@@ -17,7 +17,9 @@ WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
3604df
 WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
3604df
 LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
3604df
 EVENTSD = "glustereventsd"
3604df
-CONFIG_KEYS = ["log_level"]
3604df
+CONFIG_KEYS = ["log_level", "port"]
3604df
 BOOL_CONFIGS = []
3604df
-RESTART_CONFIGS = []
3604df
+INT_CONFIGS = ["port"]
3604df
+RESTART_CONFIGS = ["port"]
3604df
 EVENTS_ENABLED = @EVENTS_ENABLED@
3604df
+UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
3604df
diff --git a/events/src/eventsconfig.json b/events/src/eventsconfig.json
3604df
index ce2c775..45730f9 100644
3604df
--- a/events/src/eventsconfig.json
3604df
+++ b/events/src/eventsconfig.json
3604df
@@ -1,3 +1,4 @@
3604df
 {
3604df
-    "log_level": "INFO"
3604df
+    "log_level": "INFO",
3604df
+    "port": 24009
3604df
 }
3604df
diff --git a/events/src/gf_event.py b/events/src/gf_event.py
3604df
index 20dfc8a..f9ece6a 100644
3604df
--- a/events/src/gf_event.py
3604df
+++ b/events/src/gf_event.py
3604df
@@ -16,7 +16,7 @@ import time
3604df
 from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
3604df
 from eventtypes import all_events
3604df
 
3604df
-from utils import logger, setup_logger
3604df
+from utils import logger, setup_logger, get_config
3604df
 
3604df
 # Run this when this lib loads
3604df
 setup_logger()
3604df
@@ -31,10 +31,9 @@ def gf_event(event_type, **kwargs):
3604df
         return
3604df
 
3604df
     try:
3604df
-        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
3604df
-        client.connect(SERVER_ADDRESS)
3604df
+        client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
3604df
     except socket.error as e:
3604df
-        logger.error("Unable to connect to events.sock: {0}".format(e))
3604df
+        logger.error("Unable to connect to events Server: {0}".format(e))
3604df
         return
3604df
 
3604df
     # Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
3604df
@@ -45,7 +44,18 @@ def gf_event(event_type, **kwargs):
3604df
     # <TIMESTAMP> <EVENT_TYPE> <MSG>
3604df
     msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
3604df
 
3604df
+    port = get_config("port")
3604df
+    if port is None:
3604df
+        logger.error("Unable to get eventsd port details")
3604df
+        return
3604df
+
3604df
     try:
3604df
-        client.sendall(msg)
3604df
+        sent = client.sendto(msg, (SERVER_ADDRESS, port))
3604df
+        assert sent == len(msg)
3604df
     except socket.error as e:
3604df
         logger.error("Unable to Send message: {0}".format(e))
3604df
+    except AssertionError:
3604df
+        logger.error("Unable to send message. Sent: {0}, Actual: {1}".format(
3604df
+            sent, len(msg)))
3604df
+    finally:
3604df
+        client.close()
3604df
diff --git a/events/src/glustereventsd.py b/events/src/glustereventsd.py
3604df
index 3fa5768..91a0743 100644
3604df
--- a/events/src/glustereventsd.py
3604df
+++ b/events/src/glustereventsd.py
3604df
@@ -11,12 +11,10 @@
3604df
 #
3604df
 
3604df
 from __future__ import print_function
3604df
-import asyncore
3604df
-import socket
3604df
-import os
3604df
-from multiprocessing import Process, Queue
3604df
 import sys
3604df
 import signal
3604df
+import SocketServer
3604df
+import socket
3604df
 
3604df
 from eventtypes import all_events
3604df
 import handlers
3604df
@@ -24,26 +22,19 @@ import utils
3604df
 from eventsapiconf import SERVER_ADDRESS
3604df
 from utils import logger
3604df
 
3604df
-# Global Queue, EventsHandler will add items to the queue
3604df
-# and process_event will gets each item and handles it
3604df
-events_queue = Queue()
3604df
-events_server_pid = None
3604df
 
3604df
+class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
3604df
 
3604df
-def process_event():
3604df
-    """
3604df
-    Seperate process which handles all the incoming events from Gluster
3604df
-    processes.
3604df
-    """
3604df
-    while True:
3604df
-        data = events_queue.get()
3604df
-        logger.debug("EVENT: {0}".format(repr(data)))
3604df
+    def handle(self):
3604df
+        data = self.request[0].strip()
3604df
+        logger.debug("EVENT: {0} from {1}".format(repr(data),
3604df
+                                                  self.client_address[0]))
3604df
         try:
3604df
             # Event Format <TIMESTAMP> <TYPE> <DETAIL>
3604df
             ts, key, value = data.split(" ", 2)
3604df
         except ValueError:
3604df
             logger.warn("Invalid Event Format {0}".format(data))
3604df
-            continue
3604df
+            return
3604df
 
3604df
         data_dict = {}
3604df
         try:
3604df
@@ -51,7 +42,7 @@ def process_event():
3604df
             data_dict = dict(x.split('=') for x in value.split(';'))
3604df
         except ValueError:
3604df
             logger.warn("Unable to parse Event {0}".format(data))
3604df
-            continue
3604df
+            return
3604df
 
3604df
         try:
3604df
             # Event Type to Function Map, Recieved event data will be in
3604df
@@ -75,68 +66,28 @@ def process_event():
3604df
                 handlers.generic_handler(ts, int(key), data_dict)
3604df
 
3604df
 
3604df
-def process_event_wrapper():
3604df
-    try:
3604df
-        process_event()
3604df
-    except KeyboardInterrupt:
3604df
-        return
3604df
-
3604df
-
3604df
-class GlusterEventsHandler(asyncore.dispatcher_with_send):
3604df
-
3604df
-    def handle_read(self):
3604df
-        data = self.recv(8192)
3604df
-        if data:
3604df
-            events_queue.put(data)
3604df
-            self.send(data)
3604df
-
3604df
-
3604df
-class GlusterEventsServer(asyncore.dispatcher):
3604df
-
3604df
-    def __init__(self):
3604df
-        global events_server_pid
3604df
-        asyncore.dispatcher.__init__(self)
3604df
-        # Start the Events listener process which listens to
3604df
-        # the global queue
3604df
-        p = Process(target=process_event_wrapper)
3604df
-        p.start()
3604df
-        events_server_pid = p.pid
3604df
-
3604df
-        # Create UNIX Domain Socket, bind to path
3604df
-        self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
3604df
-        self.bind(SERVER_ADDRESS)
3604df
-        self.listen(5)
3604df
-
3604df
-    def handle_accept(self):
3604df
-        pair = self.accept()
3604df
-        if pair is not None:
3604df
-            sock, addr = pair
3604df
-            GlusterEventsHandler(sock)
3604df
-
3604df
-
3604df
 def signal_handler_sigusr2(sig, frame):
3604df
-    if events_server_pid is not None:
3604df
-        os.kill(events_server_pid, signal.SIGUSR2)
3604df
     utils.load_all()
3604df
 
3604df
 
3604df
 def init_event_server():
3604df
     utils.setup_logger()
3604df
-
3604df
-    # Delete Socket file if Exists
3604df
-    try:
3604df
-        os.unlink(SERVER_ADDRESS)
3604df
-    except OSError:
3604df
-        if os.path.exists(SERVER_ADDRESS):
3604df
-            print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
3604df
-                   file=sys.stderr)
3604df
-            sys.exit(1)
3604df
-
3604df
     utils.load_all()
3604df
 
3604df
-    # Start the Eventing Server, UNIX DOMAIN SOCKET Server
3604df
-    GlusterEventsServer()
3604df
-    asyncore.loop()
3604df
+    port = utils.get_config("port")
3604df
+    if port is None:
3604df
+        sys.stderr.write("Unable to get Port details from Config\n")
3604df
+        sys.exit(1)
3604df
+
3604df
+    # Start the Eventing Server, UDP Server
3604df
+    try:
3604df
+        server = SocketServer.ThreadingUDPServer(
3604df
+            (SERVER_ADDRESS, port),
3604df
+            GlusterEventsRequestHandler)
3604df
+    except socket.error as e:
3604df
+        sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
3604df
+        sys.exit(1)
3604df
+    server.serve_forever()
3604df
 
3604df
 
3604df
 def main():
3604df
diff --git a/events/src/peer_eventsapi.py b/events/src/peer_eventsapi.py
3604df
index 7887d77..f444778 100644
3604df
--- a/events/src/peer_eventsapi.py
3604df
+++ b/events/src/peer_eventsapi.py
3604df
@@ -31,6 +31,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
3604df
                                   EVENTSD,
3604df
                                   CONFIG_KEYS,
3604df
                                   BOOL_CONFIGS,
3604df
+                                  INT_CONFIGS,
3604df
                                   RESTART_CONFIGS)
3604df
 
3604df
 
3604df
@@ -462,6 +463,9 @@ class ConfigSetCmd(Cmd):
3604df
             if args.name in BOOL_CONFIGS:
3604df
                 v = boolify(args.value)
3604df
 
3604df
+            if args.name in INT_CONFIGS:
3604df
+                v = int(args.value)
3604df
+
3604df
             new_data[args.name] = v
3604df
             file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
3604df
 
3604df
diff --git a/events/src/utils.py b/events/src/utils.py
3604df
index 772221a..386e8f2 100644
3604df
--- a/events/src/utils.py
3604df
+++ b/events/src/utils.py
3604df
@@ -17,11 +17,10 @@ import requests
3604df
 from eventsapiconf import (LOG_FILE,
3604df
                            WEBHOOKS_FILE,
3604df
                            DEFAULT_CONFIG_FILE,
3604df
-                           CUSTOM_CONFIG_FILE)
3604df
+                           CUSTOM_CONFIG_FILE,
3604df
+                           UUID_FILE)
3604df
 import eventtypes
3604df
 
3604df
-from gluster.cliutils import get_node_uuid
3604df
-
3604df
 
3604df
 # Webhooks list
3604df
 _webhooks = {}
3604df
@@ -32,6 +31,23 @@ _config = {}
3604df
 
3604df
 # Init Logger instance
3604df
 logger = logging.getLogger(__name__)
3604df
+NodeID = None
3604df
+
3604df
+
3604df
+def get_node_uuid():
3604df
+    val = None
3604df
+    with open(UUID_FILE) as f:
3604df
+        for line in f:
3604df
+            if line.startswith("UUID="):
3604df
+                val = line.strip().split("=")[-1]
3604df
+                break
3604df
+    return val
3604df
+
3604df
+
3604df
+def get_config(key):
3604df
+    if not _config:
3604df
+        load_config()
3604df
+    return _config.get(key, None)
3604df
 
3604df
 
3604df
 def get_event_type_name(idx):
3604df
@@ -109,8 +125,12 @@ def load_all():
3604df
 
3604df
 
3604df
 def publish(ts, event_key, data):
3604df
+    global NodeID
3604df
+    if NodeID is None:
3604df
+        NodeID = get_node_uuid()
3604df
+
3604df
     message = {
3604df
-        "nodeid": get_node_uuid(),
3604df
+        "nodeid": NodeID,
3604df
         "ts": int(ts),
3604df
         "event": get_event_type_name(event_key),
3604df
         "message": data
3604df
diff --git a/extras/firewalld/glusterfs.xml b/extras/firewalld/glusterfs.xml
3604df
index f8efd90..7e17644 100644
3604df
--- a/extras/firewalld/glusterfs.xml
3604df
+++ b/extras/firewalld/glusterfs.xml
3604df
@@ -4,6 +4,7 @@
3604df
 <description>Default ports for gluster-distributed storage</description>
3604df
 <port protocol="tcp" port="24007"/>    
3604df
 <port protocol="tcp" port="24008"/>    
3604df
+<port protocol="tcp" port="24009"/>    
3604df
 <port protocol="tcp" port="38465"/>    
3604df
 <port protocol="tcp" port="38466"/>    
3604df
 <port protocol="tcp" port="38467"/>    
3604df
diff --git a/libglusterfs/src/events.c b/libglusterfs/src/events.c
3604df
index f93934d..b7b513e 100644
3604df
--- a/libglusterfs/src/events.c
3604df
+++ b/libglusterfs/src/events.c
3604df
@@ -16,73 +16,107 @@
3604df
 #include <time.h>
3604df
 #include <stdarg.h>
3604df
 #include <string.h>
3604df
+#include <netinet/in.h>
3604df
+#include <arpa/inet.h>
3604df
+#include <netdb.h>
3604df
+
3604df
 #include "syscall.h"
3604df
 #include "mem-pool.h"
3604df
+#include "glusterfs.h"
3604df
+#include "globals.h"
3604df
 #include "events.h"
3604df
 
3604df
 
3604df
-#define EVENT_PATH DATADIR "/run/gluster/events.sock"
3604df
-#define EVENTS_MSG_MAX 2048
3604df
+#define EVENT_HOST "127.0.0.1"
3604df
+#define EVENT_PORT 24009
3604df
 
3604df
 
3604df
 int
3604df
 gf_event (eventtypes_t event, char *fmt, ...)
3604df
 {
3604df
-        int      ret                      = 0;
3604df
-        int      sock                     = -1;
3604df
-        char     eventstr[EVENTS_MSG_MAX] = "";
3604df
-        struct   sockaddr_un server;
3604df
-        va_list  arguments;
3604df
-        char     *msg                     = NULL;
3604df
-        size_t   eventstr_size            = 0;
3604df
+        int                ret                   = 0;
3604df
+        int                sock                  = -1;
3604df
+        char              *eventstr              = NULL;
3604df
+        struct             sockaddr_in server;
3604df
+        va_list            arguments;
3604df
+        char              *msg                   = NULL;
3604df
+        glusterfs_ctx_t   *ctx                   = NULL;
3604df
+        struct hostent    *host_data;
3604df
+        char              *host                  = NULL;
3604df
+
3604df
+        /* Global context */
3604df
+        ctx = THIS->ctx;
3604df
 
3604df
         if (event < 0 || event >= EVENT_LAST) {
3604df
                 ret = EVENT_ERROR_INVALID_INPUTS;
3604df
                 goto out;
3604df
         }
3604df
 
3604df
-        sock = socket(AF_UNIX, SOCK_STREAM, 0);
3604df
+        /* Initialize UDP socket */
3604df
+        sock = socket (AF_INET, SOCK_DGRAM, 0);
3604df
         if (sock < 0) {
3604df
                 ret = EVENT_ERROR_SOCKET;
3604df
                 goto out;
3604df
         }
3604df
-        server.sun_family = AF_UNIX;
3604df
-        strcpy(server.sun_path, EVENT_PATH);
3604df
 
3604df
-        if (connect(sock,
3604df
-                    (struct sockaddr *) &server,
3604df
-                    sizeof(struct sockaddr_un)) < 0) {
3604df
-                ret = EVENT_ERROR_CONNECT;
3604df
-                goto out;
3604df
+        /* Get Host name to send message */
3604df
+        if (ctx && ctx->cmd_args.volfile_server) {
3604df
+                /* If it is client code then volfile_server is set
3604df
+                   use that information to push the events. */
3604df
+                host_data = gethostbyname (ctx->cmd_args.volfile_server);
3604df
+                if (host_data == NULL) {
3604df
+                        ret = EVENT_ERROR_RESOLVE;
3604df
+                        goto out;
3604df
+                }
3604df
+                host = inet_ntoa (*(struct in_addr *)(host_data->h_addr));
3604df
+        } else {
3604df
+                /* Localhost, Use the defined IP for localhost */
3604df
+                host = EVENT_HOST;
3604df
         }
3604df
 
3604df
+        /* Socket Configurations */
3604df
+        server.sin_family = AF_INET;
3604df
+        server.sin_port = htons (EVENT_PORT);
3604df
+        server.sin_addr.s_addr = inet_addr (host);
3604df
+        memset (&server.sin_zero, '\0', sizeof (server.sin_zero));
3604df
+
3604df
         va_start (arguments, fmt);
3604df
         ret = gf_vasprintf (&msg, fmt, arguments);
3604df
         va_end (arguments);
3604df
+
3604df
         if (ret < 0) {
3604df
                 ret = EVENT_ERROR_INVALID_INPUTS;
3604df
                 goto out;
3604df
         }
3604df
 
3604df
-        eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL),
3604df
-                                 event, msg);
3604df
+        ret = gf_asprintf (&eventstr, "%u %d %s",
3604df
+                            (unsigned)time(NULL), event, msg);
3604df
 
3604df
-        if (eventstr_size + 1 > EVENTS_MSG_MAX) {
3604df
-                eventstr_size = EVENTS_MSG_MAX - 1;
3604df
+        if (ret <= 0) {
3604df
+                ret = EVENT_ERROR_MSG_FORMAT;
3604df
+                goto out;
3604df
         }
3604df
 
3604df
-        snprintf(eventstr, eventstr_size+1, "%u %d %s",
3604df
-                 (unsigned)time(NULL), event, msg);
3604df
-
3604df
-        if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) {
3604df
+        /* Send Message */
3604df
+        if (sendto (sock, eventstr, strlen (eventstr),
3604df
+                    0, (struct sockaddr *)&server, sizeof (server)) <= 0) {
3604df
                 ret = EVENT_ERROR_SEND;
3604df
-                goto out;
3604df
         }
3604df
 
3604df
         ret = EVENT_SEND_OK;
3604df
 
3604df
  out:
3604df
-        sys_close(sock);
3604df
-        GF_FREE(msg);
3604df
+        if (sock >= 0) {
3604df
+                sys_close (sock);
3604df
+        }
3604df
+
3604df
+        /* Allocated by gf_vasprintf */
3604df
+        if (msg)
3604df
+                GF_FREE (msg);
3604df
+
3604df
+        /* Allocated by gf_asprintf */
3604df
+        if (eventstr)
3604df
+                GF_FREE (eventstr);
3604df
+
3604df
         return ret;
3604df
 }
3604df
-- 
3604df
1.7.1
3604df