From 59c4a3b7445a464f99dcd38995a4e687a80be632 Mon Sep 17 00:00:00 2001 From: Aravinda VK Date: Wed, 12 Oct 2016 23:32:39 +0530 Subject: [PATCH 108/141] geo-rep/eventsapi: Additional Events Added following events EVENT_GEOREP_ACTIVE { "nodeid": NODEID, "ts": TIMESTAMP, "event": "GEOREP_ACTIVE", "message": { "master_volume": MASTER_VOLUME_NAME, "slave_host": SLAVE_HOST, "slave_volume": SLAVE_VOLUME, "brick_path": BRICK_PATH } } EVENT_GEOREP_PASSIVE { "nodeid": NODEID, "ts": TIMESTAMP, "event": "GEOREP_PASSIVE", "message": { "master_volume": MASTER_VOLUME_NAME, "slave_host": SLAVE_HOST, "slave_volume": SLAVE_VOLUME, "brick_path": BRICK_PATH } } EVENT_GEOREP_CHECKPOINT_COMPLETED { "nodeid": NODEID, "ts": TIMESTAMP, "event": "GEOREP_ACTIVE", "message": { "master_volume": MASTER_VOLUME_NAME, "slave_host": SLAVE_HOST, "slave_volume": SLAVE_VOLUME, "brick_path": BRICK_PATH, "checkpoint_time": CHECKPOINT_TIME, "checkpoint_completion_time": CHECKPOINT_COMPLETION_TIME } } > Reviewed-on: http://review.gluster.org/15630 > Smoke: Gluster Build System > NetBSD-regression: NetBSD Build System > CentOS-regression: Gluster Build System > Reviewed-by: Kotresh HR > Tested-by: Kotresh HR BUG: 1380257 Change-Id: I90716175868c59dd65c8d202e73e0ede90347b6a Signed-off-by: Aravinda VK Reviewed-on: https://code.engineering.redhat.com/gerrit/87745 Reviewed-by: Atin Mukherjee --- events/eventskeygen.py | 4 ++ geo-replication/syncdaemon/gsyncd.py | 4 ++ geo-replication/syncdaemon/gsyncdstatus.py | 50 +++++++++++++++++++++++++--- geo-replication/syncdaemon/monitor.py | 5 ++- geo-replication/syncdaemon/resource.py | 7 +++- geo-replication/syncdaemon/syncdutils.py | 19 ++++++++++ 6 files changed, 81 insertions(+), 8 deletions(-) diff --git a/events/eventskeygen.py b/events/eventskeygen.py index 6165b24..1b1c691 100644 --- a/events/eventskeygen.py +++ b/events/eventskeygen.py @@ -128,6 +128,10 @@ keys = ( "EVENT_EC_MIN_BRICKS_UP", #georep async events "EVENT_GEOREP_FAULTY", + "EVENT_GEOREP_CHECKPOINT_COMPLETED", + "EVENT_GEOREP_ACTIVE", + "EVENT_GEOREP_PASSIVE", + #quota async events "EVENT_QUOTA_CROSSED_SOFT_LIMIT", diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index b459abc..2c80319 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,6 +39,7 @@ from changelogagent import agent, Changelog from gsyncdstatus import set_monitor_status, GeorepStatus from libcxattr import Xattr import struct +from syncdutils import get_master_and_slave_data_from_args ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -697,8 +698,11 @@ def main_i(): status_get = rconf.get('status_get') if status_get: + master_name, slave_data = get_master_and_slave_data_from_args(args) for brick in gconf.path: brick_status = GeorepStatus(gconf.state_file, brick, + master_name, + slave_data, getattr(gconf, "pid_file", None)) checkpoint_time = int(getattr(gconf, "checkpoint", "0")) brick_status.print_status(checkpoint_time=checkpoint_time) diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py index f4d50c1..6575fcd 100644 --- a/geo-replication/syncdaemon/gsyncdstatus.py +++ b/geo-replication/syncdaemon/gsyncdstatus.py @@ -17,6 +17,8 @@ import json import time from datetime import datetime from errno import EACCES, EAGAIN, ENOENT +from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event +from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED DEFAULT_STATUS = "N/A" MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped") @@ -114,7 +116,12 @@ def set_monitor_status(status_file, status): class GeorepStatus(object): - def __init__(self, monitor_status_file, brick, monitor_pid_file=None): + def __init__(self, monitor_status_file, brick, master, slave, + monitor_pid_file=None): + self.master = master + slv_data = slave.split("::") + self.slave_host = slv_data[0] + self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID self.work_dir = os.path.dirname(monitor_status_file) self.monitor_status_file = monitor_status_file self.filename = os.path.join(self.work_dir, @@ -137,6 +144,10 @@ class GeorepStatus(object): data = self.default_values data = mergerfunc(data) + # If Data is not changed by merger func + if not data: + return False + with tempfile.NamedTemporaryFile( 'w', dir=os.path.dirname(self.filename), @@ -149,6 +160,7 @@ class GeorepStatus(object): os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + return True def reset_on_worker_start(self): def merger(data): @@ -163,10 +175,24 @@ class GeorepStatus(object): def set_field(self, key, value): def merger(data): + # Current data and prev data is same + if data[key] == value: + return {} + data[key] = value return json.dumps(data) - self._update(merger) + return self._update(merger) + + def trigger_gf_event_checkpoint_completion(self, checkpoint_time, + checkpoint_completion_time): + gf_event(EVENT_GEOREP_CHECKPOINT_COMPLETED, + master_volume=self.master, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick, + checkpoint_time=checkpoint_time, + checkpoint_completion_time=checkpoint_completion_time) def set_last_synced(self, value, checkpoint_time): def merger(data): @@ -184,9 +210,13 @@ class GeorepStatus(object): # previously then update the checkpoint completed time if checkpoint_time > 0 and checkpoint_time <= value[0]: if data["checkpoint_completed"] == "No": + curr_time = int(time.time()) data["checkpoint_time"] = checkpoint_time - data["checkpoint_completion_time"] = int(time.time()) + data["checkpoint_completion_time"] = curr_time data["checkpoint_completed"] = "Yes" + self.trigger_gf_event_checkpoint_completion( + checkpoint_time, curr_time) + return json.dumps(data) self._update(merger) @@ -221,10 +251,20 @@ class GeorepStatus(object): self._update(merger) def set_active(self): - self.set_field("worker_status", "Active") + if self.set_field("worker_status", "Active"): + gf_event(EVENT_GEOREP_ACTIVE, + master_volume=self.master, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick) def set_passive(self): - self.set_field("worker_status", "Passive") + if self.set_field("worker_status", "Passive"): + gf_event(EVENT_GEOREP_PASSIVE, + master_volume=self.master, + slave_host=self.slave_host, + slave_volume=self.slave_volume, + brick_path=self.brick) def get_monitor_status(self): data = "" diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 22cd1cc..7eddd26 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -212,7 +212,10 @@ class Monitor(object): """ if not self.status.get(w[0]['dir'], None): self.status[w[0]['dir']] = GeorepStatus(gconf.state_file, - w[0]['dir']) + w[0]['dir'], + master, + "%s::%s" % (slave_host, + slave_vol)) set_monitor_status(gconf.state_file, self.ST_STARTED) self.status[w[0]['dir']].set_worker_status(self.ST_INIT) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 3a9aa33..5cf5eb0 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -40,7 +40,7 @@ from syncdutils import ChangelogException, ChangelogHistoryNotAvailable from syncdutils import get_changelog_log_level from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from gsyncdstatus import GeorepStatus - +from syncdutils import get_master_and_slave_data_from_args UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) @@ -1474,7 +1474,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): changelog_register_failed = False (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') changelog_agent = RepceClient(int(inf), int(ouf)) - status = GeorepStatus(gconf.state_file, gconf.local_path) + master_name, slave_data = get_master_and_slave_data_from_args( + sys.argv) + status = GeorepStatus(gconf.state_file, gconf.local_path, + master_name, slave_data) status.reset_on_worker_start() rv = changelog_agent.version() if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 0bcf5ec..b8e1e6a 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -28,10 +28,17 @@ sys.path.insert(1, GLUSTERFS_LIBEXECDIR) EVENTS_ENABLED = True try: from events.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY + from events.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE + from events.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE + from events.eventtypes import GEOREP_CHECKPOINT_COMPLETED \ + as EVENT_GEOREP_CHECKPOINT_COMPLETED except ImportError: # Events APIs not installed, dummy eventtypes with None EVENTS_ENABLED = False EVENT_GEOREP_FAULTY = None + EVENT_GEOREP_ACTIVE = None + EVENT_GEOREP_PASSIVE = None + EVENT_GEOREP_CHECKPOINT_COMPLETED = None try: from cPickle import PickleError @@ -539,3 +546,15 @@ def gf_event(event_type, **kwargs): if EVENTS_ENABLED: from events.gf_event import gf_event as gfevent gfevent(event_type, **kwargs) + + +def get_master_and_slave_data_from_args(args): + master_name = None + slave_data = None + for arg in args: + if arg.startswith(":"): + master_name = arg.replace(":", "") + if "::" in arg: + slave_data = arg.replace("ssh://", "") + + return (master_name, slave_data) -- 1.7.1