3604df
From 59c4a3b7445a464f99dcd38995a4e687a80be632 Mon Sep 17 00:00:00 2001
3604df
From: Aravinda VK <avishwan@redhat.com>
3604df
Date: Wed, 12 Oct 2016 23:32:39 +0530
3604df
Subject: [PATCH 108/141] geo-rep/eventsapi: Additional Events
3604df
3604df
Added following events
3604df
EVENT_GEOREP_ACTIVE
3604df
    {
3604df
        "nodeid": NODEID,
3604df
        "ts": TIMESTAMP,
3604df
        "event": "GEOREP_ACTIVE",
3604df
        "message": {
3604df
            "master_volume": MASTER_VOLUME_NAME,
3604df
            "slave_host": SLAVE_HOST,
3604df
            "slave_volume": SLAVE_VOLUME,
3604df
            "brick_path": BRICK_PATH
3604df
        }
3604df
    }
3604df
3604df
EVENT_GEOREP_PASSIVE
3604df
    {
3604df
        "nodeid": NODEID,
3604df
        "ts": TIMESTAMP,
3604df
        "event": "GEOREP_PASSIVE",
3604df
        "message": {
3604df
            "master_volume": MASTER_VOLUME_NAME,
3604df
            "slave_host": SLAVE_HOST,
3604df
            "slave_volume": SLAVE_VOLUME,
3604df
            "brick_path": BRICK_PATH
3604df
        }
3604df
    }
3604df
EVENT_GEOREP_CHECKPOINT_COMPLETED
3604df
    {
3604df
        "nodeid": NODEID,
3604df
        "ts": TIMESTAMP,
3604df
        "event": "GEOREP_ACTIVE",
3604df
        "message": {
3604df
            "master_volume": MASTER_VOLUME_NAME,
3604df
            "slave_host": SLAVE_HOST,
3604df
            "slave_volume": SLAVE_VOLUME,
3604df
            "brick_path": BRICK_PATH,
3604df
            "checkpoint_time": CHECKPOINT_TIME,
3604df
            "checkpoint_completion_time": CHECKPOINT_COMPLETION_TIME
3604df
        }
3604df
    }
3604df
3604df
> Reviewed-on: http://review.gluster.org/15630
3604df
> Smoke: Gluster Build System <jenkins@build.gluster.org>
3604df
> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
3604df
> CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
3604df
> Reviewed-by: Kotresh HR <khiremat@redhat.com>
3604df
> Tested-by: Kotresh HR <khiremat@redhat.com>
3604df
3604df
BUG: 1380257
3604df
Change-Id: I90716175868c59dd65c8d202e73e0ede90347b6a
3604df
Signed-off-by: Aravinda VK <avishwan@redhat.com>
3604df
Reviewed-on: https://code.engineering.redhat.com/gerrit/87745
3604df
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
3604df
---
3604df
 events/eventskeygen.py                     |    4 ++
3604df
 geo-replication/syncdaemon/gsyncd.py       |    4 ++
3604df
 geo-replication/syncdaemon/gsyncdstatus.py |   50 +++++++++++++++++++++++++---
3604df
 geo-replication/syncdaemon/monitor.py      |    5 ++-
3604df
 geo-replication/syncdaemon/resource.py     |    7 +++-
3604df
 geo-replication/syncdaemon/syncdutils.py   |   19 ++++++++++
3604df
 6 files changed, 81 insertions(+), 8 deletions(-)
3604df
3604df
diff --git a/events/eventskeygen.py b/events/eventskeygen.py
3604df
index 6165b24..1b1c691 100644
3604df
--- a/events/eventskeygen.py
3604df
+++ b/events/eventskeygen.py
3604df
@@ -128,6 +128,10 @@ keys = (
3604df
     "EVENT_EC_MIN_BRICKS_UP",
3604df
     #georep async events
3604df
     "EVENT_GEOREP_FAULTY",
3604df
+    "EVENT_GEOREP_CHECKPOINT_COMPLETED",
3604df
+    "EVENT_GEOREP_ACTIVE",
3604df
+    "EVENT_GEOREP_PASSIVE",
3604df
+
3604df
     #quota async events
3604df
     "EVENT_QUOTA_CROSSED_SOFT_LIMIT",
3604df
 
3604df
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
3604df
index b459abc..2c80319 100644
3604df
--- a/geo-replication/syncdaemon/gsyncd.py
3604df
+++ b/geo-replication/syncdaemon/gsyncd.py
3604df
@@ -39,6 +39,7 @@ from changelogagent import agent, Changelog
3604df
 from gsyncdstatus import set_monitor_status, GeorepStatus
3604df
 from libcxattr import Xattr
3604df
 import struct
3604df
+from syncdutils import get_master_and_slave_data_from_args
3604df
 
3604df
 ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
3604df
 
3604df
@@ -697,8 +698,11 @@ def main_i():
3604df
 
3604df
     status_get = rconf.get('status_get')
3604df
     if status_get:
3604df
+        master_name, slave_data = get_master_and_slave_data_from_args(args)
3604df
         for brick in gconf.path:
3604df
             brick_status = GeorepStatus(gconf.state_file, brick,
3604df
+                                        master_name,
3604df
+                                        slave_data,
3604df
                                         getattr(gconf, "pid_file", None))
3604df
             checkpoint_time = int(getattr(gconf, "checkpoint", "0"))
3604df
             brick_status.print_status(checkpoint_time=checkpoint_time)
3604df
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
3604df
index f4d50c1..6575fcd 100644
3604df
--- a/geo-replication/syncdaemon/gsyncdstatus.py
3604df
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
3604df
@@ -17,6 +17,8 @@ import json
3604df
 import time
3604df
 from datetime import datetime
3604df
 from errno import EACCES, EAGAIN, ENOENT
3604df
+from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
3604df
+from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED
3604df
 
3604df
 DEFAULT_STATUS = "N/A"
3604df
 MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
3604df
@@ -114,7 +116,12 @@ def set_monitor_status(status_file, status):
3604df
 
3604df
 
3604df
 class GeorepStatus(object):
3604df
-    def __init__(self, monitor_status_file, brick, monitor_pid_file=None):
3604df
+    def __init__(self, monitor_status_file, brick, master, slave,
3604df
+                 monitor_pid_file=None):
3604df
+        self.master = master
3604df
+        slv_data = slave.split("::")
3604df
+        self.slave_host = slv_data[0]
3604df
+        self.slave_volume = slv_data[1].split(":")[0]  # Remove Slave UUID
3604df
         self.work_dir = os.path.dirname(monitor_status_file)
3604df
         self.monitor_status_file = monitor_status_file
3604df
         self.filename = os.path.join(self.work_dir,
3604df
@@ -137,6 +144,10 @@ class GeorepStatus(object):
3604df
                 data = self.default_values
3604df
 
3604df
             data = mergerfunc(data)
3604df
+            # If Data is not changed by merger func
3604df
+            if not data:
3604df
+                return False
3604df
+
3604df
             with tempfile.NamedTemporaryFile(
3604df
                     'w',
3604df
                     dir=os.path.dirname(self.filename),
3604df
@@ -149,6 +160,7 @@ class GeorepStatus(object):
3604df
                             os.O_DIRECTORY)
3604df
             os.fsync(dirfd)
3604df
             os.close(dirfd)
3604df
+            return True
3604df
 
3604df
     def reset_on_worker_start(self):
3604df
         def merger(data):
3604df
@@ -163,10 +175,24 @@ class GeorepStatus(object):
3604df
 
3604df
     def set_field(self, key, value):
3604df
         def merger(data):
3604df
+            # Current data and prev data is same
3604df
+            if data[key] == value:
3604df
+                return {}
3604df
+
3604df
             data[key] = value
3604df
             return json.dumps(data)
3604df
 
3604df
-        self._update(merger)
3604df
+        return self._update(merger)
3604df
+
3604df
+    def trigger_gf_event_checkpoint_completion(self, checkpoint_time,
3604df
+                                               checkpoint_completion_time):
3604df
+        gf_event(EVENT_GEOREP_CHECKPOINT_COMPLETED,
3604df
+                 master_volume=self.master,
3604df
+                 slave_host=self.slave_host,
3604df
+                 slave_volume=self.slave_volume,
3604df
+                 brick_path=self.brick,
3604df
+                 checkpoint_time=checkpoint_time,
3604df
+                 checkpoint_completion_time=checkpoint_completion_time)
3604df
 
3604df
     def set_last_synced(self, value, checkpoint_time):
3604df
         def merger(data):
3604df
@@ -184,9 +210,13 @@ class GeorepStatus(object):
3604df
             # previously then update the checkpoint completed time
3604df
             if checkpoint_time > 0 and checkpoint_time <= value[0]:
3604df
                 if data["checkpoint_completed"] == "No":
3604df
+                    curr_time = int(time.time())
3604df
                     data["checkpoint_time"] = checkpoint_time
3604df
-                    data["checkpoint_completion_time"] = int(time.time())
3604df
+                    data["checkpoint_completion_time"] = curr_time
3604df
                     data["checkpoint_completed"] = "Yes"
3604df
+                    self.trigger_gf_event_checkpoint_completion(
3604df
+                        checkpoint_time, curr_time)
3604df
+
3604df
             return json.dumps(data)
3604df
 
3604df
         self._update(merger)
3604df
@@ -221,10 +251,20 @@ class GeorepStatus(object):
3604df
         self._update(merger)
3604df
 
3604df
     def set_active(self):
3604df
-        self.set_field("worker_status", "Active")
3604df
+        if self.set_field("worker_status", "Active"):
3604df
+            gf_event(EVENT_GEOREP_ACTIVE,
3604df
+                     master_volume=self.master,
3604df
+                     slave_host=self.slave_host,
3604df
+                     slave_volume=self.slave_volume,
3604df
+                     brick_path=self.brick)
3604df
 
3604df
     def set_passive(self):
3604df
-        self.set_field("worker_status", "Passive")
3604df
+        if self.set_field("worker_status", "Passive"):
3604df
+            gf_event(EVENT_GEOREP_PASSIVE,
3604df
+                     master_volume=self.master,
3604df
+                     slave_host=self.slave_host,
3604df
+                     slave_volume=self.slave_volume,
3604df
+                     brick_path=self.brick)
3604df
 
3604df
     def get_monitor_status(self):
3604df
         data = ""
3604df
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
3604df
index 22cd1cc..7eddd26 100644
3604df
--- a/geo-replication/syncdaemon/monitor.py
3604df
+++ b/geo-replication/syncdaemon/monitor.py
3604df
@@ -212,7 +212,10 @@ class Monitor(object):
3604df
         """
3604df
         if not self.status.get(w[0]['dir'], None):
3604df
             self.status[w[0]['dir']] = GeorepStatus(gconf.state_file,
3604df
-                                                    w[0]['dir'])
3604df
+                                                    w[0]['dir'],
3604df
+                                                    master,
3604df
+                                                    "%s::%s" % (slave_host,
3604df
+                                                                slave_vol))
3604df
 
3604df
         set_monitor_status(gconf.state_file, self.ST_STARTED)
3604df
         self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
3604df
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
3604df
index 3a9aa33..5cf5eb0 100644
3604df
--- a/geo-replication/syncdaemon/resource.py
3604df
+++ b/geo-replication/syncdaemon/resource.py
3604df
@@ -40,7 +40,7 @@ from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
3604df
 from syncdutils import get_changelog_log_level
3604df
 from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
3604df
 from gsyncdstatus import GeorepStatus
3604df
-
3604df
+from syncdutils import get_master_and_slave_data_from_args
3604df
 
3604df
 UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
3604df
 HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
3604df
@@ -1474,7 +1474,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
3604df
             changelog_register_failed = False
3604df
             (inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
3604df
             changelog_agent = RepceClient(int(inf), int(ouf))
3604df
-            status = GeorepStatus(gconf.state_file, gconf.local_path)
3604df
+            master_name, slave_data = get_master_and_slave_data_from_args(
3604df
+                sys.argv)
3604df
+            status = GeorepStatus(gconf.state_file, gconf.local_path,
3604df
+                                  master_name, slave_data)
3604df
             status.reset_on_worker_start()
3604df
             rv = changelog_agent.version()
3604df
             if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION:
3604df
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
3604df
index 0bcf5ec..b8e1e6a 100644
3604df
--- a/geo-replication/syncdaemon/syncdutils.py
3604df
+++ b/geo-replication/syncdaemon/syncdutils.py
3604df
@@ -28,10 +28,17 @@ sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
3604df
 EVENTS_ENABLED = True
3604df
 try:
3604df
     from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
3604df
+    from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
3604df
+    from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
3604df
+    from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
3604df
+        as EVENT_GEOREP_CHECKPOINT_COMPLETED
3604df
 except ImportError:
3604df
     # Events APIs not installed, dummy eventtypes with None
3604df
     EVENTS_ENABLED = False
3604df
     EVENT_GEOREP_FAULTY = None
3604df
+    EVENT_GEOREP_ACTIVE = None
3604df
+    EVENT_GEOREP_PASSIVE = None
3604df
+    EVENT_GEOREP_CHECKPOINT_COMPLETED = None
3604df
 
3604df
 try:
3604df
     from cPickle import PickleError
3604df
@@ -539,3 +546,15 @@ def gf_event(event_type, **kwargs):
3604df
     if EVENTS_ENABLED:
3604df
         from events.gf_event import gf_event as gfevent
3604df
         gfevent(event_type, **kwargs)
3604df
+
3604df
+
3604df
+def get_master_and_slave_data_from_args(args):
3604df
+    master_name = None
3604df
+    slave_data = None
3604df
+    for arg in args:
3604df
+        if arg.startswith(":"):
3604df
+            master_name = arg.replace(":", "")
3604df
+        if "::" in arg:
3604df
+            slave_data = arg.replace("ssh://", "")
3604df
+
3604df
+    return (master_name, slave_data)
3604df
-- 
3604df
1.7.1
3604df