From d354eb1abb2160495e205c87e1b2ecd8778c70ed Mon Sep 17 00:00:00 2001
From: Kotresh HR <khiremat@redhat.com>
Date: Thu, 21 Sep 2017 18:11:15 -0400
Subject: [PATCH 292/305] geo-rep: Fix rename of directory in hybrid crawl
In hybrid crawl, renames and unlink can't be
synced but directory renames can be detected.
While syncing the directory on slave, if the
gfid already exists, it should be rename.
Hence if directory gfid already exists, rename
it.
Backport of:
> Patch: https://review.gluster.org/18448
> Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
> BUG: 1499566
> Signed-off-by: Kotresh HR <khiremat@redhat.com>
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
BUG: 1582417
Signed-off-by: Kotresh HR <khiremat@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/140285
Tested-by: RHGS Build Bot <nigelb@redhat.com>
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
---
geo-replication/syncdaemon/gsyncd.py | 4 +-
geo-replication/syncdaemon/monitor.py | 85 +----------
geo-replication/syncdaemon/resource.py | 191 +++++--------------------
geo-replication/syncdaemon/syncdutils.py | 237 +++++++++++++++++++++++++++++++
4 files changed, 276 insertions(+), 241 deletions(-)
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 629e8b7..b0ed0ae 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
from libcxattr import Xattr
import struct
-from syncdutils import get_master_and_slave_data_from_args, lf
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -778,7 +778,7 @@ def main_i():
else:
gconf.label = 'slave'
startup(go_daemon=go_daemon, log_file=log_file, label=gconf.label)
- resource.Popen.init_errhandler()
+ Popen.init_errhandler()
if be_agent:
os.setsid()
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 0f43c4f..55f8330 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -16,7 +16,7 @@ import logging
import uuid
import xml.etree.ElementTree as XET
from subprocess import PIPE
-from resource import Popen, FILE, GLUSTER, SSH
+from resource import FILE, GLUSTER, SSH
from threading import Lock
from errno import ECHILD, ESRCH
import re
@@ -24,8 +24,9 @@ import random
from gconf import gconf
from syncdutils import select, waitpid, errno_wrap, lf
from syncdutils import set_term_handler, is_host_local, GsyncdError
-from syncdutils import escape, Thread, finalize, memoize, boolify
+from syncdutils import escape, Thread, finalize, boolify
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
+from syncdutils import Volinfo, Popen
from gsyncdstatus import GeorepStatus, set_monitor_status
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
return list(up_hosts)
-class Volinfo(object):
-
- def __init__(self, vol, host='localhost', prelude=[]):
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
- 'volume', 'info', vol],
- stdout=PIPE, stderr=PIPE)
- vix = po.stdout.read()
- po.wait()
- po.terminate_geterr()
- vi = XET.fromstring(vix)
- if vi.find('opRet').text != '0':
- if prelude:
- via = '(via %s) ' % prelude.join(' ')
- else:
- via = ' '
- raise GsyncdError('getting volume info of %s%s '
- 'failed with errorcode %s' %
- (vol, via, vi.find('opErrno').text))
- self.tree = vi
- self.volume = vol
- self.host = host
-
- def get(self, elem):
- return self.tree.findall('.//' + elem)
-
- def is_tier(self):
- return (self.get('typeStr')[0].text == 'Tier')
-
- def is_hot(self, brickpath):
- logging.debug('brickpath: ' + repr(brickpath))
- return brickpath in self.hot_bricks
-
- @property
- @memoize
- def bricks(self):
- def bparse(b):
- host, dirp = b.find("name").text.split(':', 2)
- return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
- return [bparse(b) for b in self.get('brick')]
-
- @property
- @memoize
- def uuid(self):
- ids = self.get('id')
- if len(ids) != 1:
- raise GsyncdError("volume info of %s obtained from %s: "
- "ambiguous uuid" % (self.volume, self.host))
- return ids[0].text
-
- def replica_count(self, tier, hot):
- if (tier and hot):
- return int(self.get('hotBricks/hotreplicaCount')[0].text)
- elif (tier and not hot):
- return int(self.get('coldBricks/coldreplicaCount')[0].text)
- else:
- return int(self.get('replicaCount')[0].text)
-
- def disperse_count(self, tier, hot):
- if (tier and hot):
- # Tiering doesn't support disperse volume as hot brick,
- # hence no xml output, so returning 0. In case, if it's
- # supported later, we should change here.
- return 0
- elif (tier and not hot):
- return int(self.get('coldBricks/colddisperseCount')[0].text)
- else:
- return int(self.get('disperseCount')[0].text)
-
- @property
- @memoize
- def hot_bricks(self):
- return [b.text for b in self.get('hotBricks/brick')]
-
- def get_hot_bricks_count(self, tier):
- if (tier):
- return int(self.get('hotBricks/hotbrickCount')[0].text)
- else:
- return 0
-
-
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index d6618c1..c4b5b53 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -13,21 +13,16 @@ import os
import sys
import stat
import time
-import signal
import fcntl
-import errno
import types
import struct
import socket
import logging
import tempfile
-import threading
import subprocess
import errno
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
-from select import error as SelectError
-import shutil
from gconf import gconf
import repce
@@ -43,7 +38,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import lf
+from syncdutils import lf, Popen, sup, Volinfo
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -52,14 +47,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
-def sup(x, *a, **kw):
- """a rubyesque "super" for python ;)
-
- invoke caller method in parent class with given args.
- """
- return getattr(super(type(x), x),
- sys._getframe(1).f_code.co_name)(*a, **kw)
-
+slv_volume = None
+slv_host = None
+slv_bricks = None
def desugar(ustr):
"""transform sugared url strings to standard <scheme>://<urlbody> form
@@ -114,149 +104,6 @@ def parse_url(ustr):
return getattr(this, sch.upper())(path)
-class Popen(subprocess.Popen):
-
- """customized subclass of subprocess.Popen with a ring
- buffer for children error output"""
-
- @classmethod
- def init_errhandler(cls):
- """start the thread which handles children's error output"""
- cls.errstore = {}
-
- def tailer():
- while True:
- errstore = cls.errstore.copy()
- try:
- poe, _, _ = select(
- [po.stderr for po in errstore], [], [], 1)
- except (ValueError, SelectError):
- # stderr is already closed wait for some time before
- # checking next error
- time.sleep(0.5)
- continue
- for po in errstore:
- if po.stderr not in poe:
- continue
- po.lock.acquire()
- try:
- if po.on_death_row:
- continue
- la = errstore[po]
- try:
- fd = po.stderr.fileno()
- except ValueError: # file is already closed
- time.sleep(0.5)
- continue
-
- try:
- l = os.read(fd, 1024)
- except OSError:
- time.sleep(0.5)
- continue
-
- if not l:
- continue
- tots = len(l)
- for lx in la:
- tots += len(lx)
- while tots > 1 << 20 and la:
- tots -= len(la.pop(0))
- la.append(l)
- finally:
- po.lock.release()
- t = syncdutils.Thread(target=tailer)
- t.start()
- cls.errhandler = t
-
- @classmethod
- def fork(cls):
- """fork wrapper that restarts errhandler thread in child"""
- pid = os.fork()
- if not pid:
- cls.init_errhandler()
- return pid
-
- def __init__(self, args, *a, **kw):
- """customizations for subprocess.Popen instantiation
-
- - 'close_fds' is taken to be the default
- - if child's stderr is chosen to be managed,
- register it with the error handler thread
- """
- self.args = args
- if 'close_fds' not in kw:
- kw['close_fds'] = True
- self.lock = threading.Lock()
- self.on_death_row = False
- self.elines = []
- try:
- sup(self, args, *a, **kw)
- except:
- ex = sys.exc_info()[1]
- if not isinstance(ex, OSError):
- raise
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
- (args[0], errno.errorcode[ex.errno],
- os.strerror(ex.errno)))
- if kw.get('stderr') == subprocess.PIPE:
- assert(getattr(self, 'errhandler', None))
- self.errstore[self] = []
-
- def errlog(self):
- """make a log about child's failure event"""
- logging.error(lf("command returned error",
- cmd=" ".join(self.args),
- error=self.returncode))
- lp = ''
-
- def logerr(l):
- logging.error(self.args[0] + "> " + l)
- for l in self.elines:
- ls = l.split('\n')
- ls[0] = lp + ls[0]
- lp = ls.pop()
- for ll in ls:
- logerr(ll)
- if lp:
- logerr(lp)
-
- def errfail(self):
- """fail nicely if child did not terminate with success"""
- self.errlog()
- syncdutils.finalize(exval=1)
-
- def terminate_geterr(self, fail_on_err=True):
- """kill child, finalize stderr harvesting (unregister
- from errhandler, set up .elines), fail on error if
- asked for
- """
- self.lock.acquire()
- try:
- self.on_death_row = True
- finally:
- self.lock.release()
- elines = self.errstore.pop(self)
- if self.poll() is None:
- self.terminate()
- if self.poll() is None:
- time.sleep(0.1)
- self.kill()
- self.wait()
- while True:
- if not select([self.stderr], [], [], 0.1)[0]:
- break
- b = os.read(self.stderr.fileno(), 1024)
- if b:
- elines.append(b)
- else:
- break
- self.stderr.close()
- self.elines = elines
- if fail_on_err and self.returncode != 0:
- self.errfail()
-
-
class Server(object):
"""singleton implemening those filesystem access primitives
@@ -776,6 +623,31 @@ class Server(object):
if isinstance(st, int):
blob = entry_pack_mkdir(
gfid, bname, e['mode'], e['uid'], e['gid'])
+ else:
+ # If gfid of a directory exists on slave but path based
+ # create is getting EEXIST. This means the directory is
+ # renamed in master but recorded as MKDIR during hybrid
+ # crawl. Get the directory path by reading the backend
+ # symlink and trying to rename to new name as said by
+ # master.
+ global slv_bricks
+ global slv_volume
+ global slv_host
+ if not slv_bricks:
+ slv_info = Volinfo (slv_volume, slv_host)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4], gfid))
+ realpath_parts = realpath.split('/')
+ src_pargfid = realpath_parts[-2]
+ src_basename = realpath_parts[-1]
+ src_entry = os.path.join(pfx, src_pargfid, src_basename)
+ logging.info(lf("Special case: rename on mkdir",
+ gfid=gfid, entry=repr(entry)))
+ rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -1318,6 +1190,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def __init__(self, path):
self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+ global slv_volume
+ global slv_host
+ slv_volume = self.volume
+ slv_host = self.host
+
def canonical_path(self):
return ':'.join([gethostbyname(self.host), self.volume])
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 2b57f83..a493c37 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -16,13 +16,18 @@ import fcntl
import shutil
import logging
import socket
+import errno
+import threading
import subprocess
+from subprocess import PIPE
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
+import xml.etree.ElementTree as XET
+from select import error as SelectError
from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
@@ -76,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
PERCENTAGE_ESCAPE_CHAR = "%25"
+def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
+ return getattr(super(type(x), x),
+ sys._getframe(1).f_code.co_name)(*a, **kw)
+
+
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
@@ -650,3 +664,226 @@ def lf(event, **kwargs):
for k, v in kwargs.items():
msg += "\t{0}={1}".format(k, v)
return msg
+
+
+class Popen(subprocess.Popen):
+
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error output"""
+
+ @classmethod
+ def init_errhandler(cls):
+ """start the thread which handles children's error output"""
+ cls.errstore = {}
+
+ def tailer():
+ while True:
+ errstore = cls.errstore.copy()
+ try:
+ poe, _, _ = select(
+ [po.stderr for po in errstore], [], [], 1)
+ except (ValueError, SelectError):
+ # stderr is already closed wait for some time before
+ # checking next error
+ time.sleep(0.5)
+ continue
+ for po in errstore:
+ if po.stderr not in poe:
+ continue
+ po.lock.acquire()
+ try:
+ if po.on_death_row:
+ continue
+ la = errstore[po]
+ try:
+ fd = po.stderr.fileno()
+ except ValueError: # file is already closed
+ time.sleep(0.5)
+ continue
+
+ try:
+ l = os.read(fd, 1024)
+ except OSError:
+ time.sleep(0.5)
+ continue
+
+ if not l:
+ continue
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1 << 20 and la:
+ tots -= len(la.pop(0))
+ la.append(l)
+ finally:
+ po.lock.release()
+ t = Thread(target=tailer)
+ t.start()
+ cls.errhandler = t
+
+ @classmethod
+ def fork(cls):
+ """fork wrapper that restarts errhandler thread in child"""
+ pid = os.fork()
+ if not pid:
+ cls.init_errhandler()
+ return pid
+
+ def __init__(self, args, *a, **kw):
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self.lock = threading.Lock()
+ self.on_death_row = False
+ self.elines = []
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+ (args[0], errno.errorcode[ex.errno],
+ os.strerror(ex.errno)))
+ if kw.get('stderr') == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errlog(self):
+ """make a log about child's failure event"""
+ logging.error(lf("command returned error",
+ cmd=" ".join(self.args),
+ error=self.returncode))
+ lp = ''
+
+ def logerr(l):
+ logging.error(self.args[0] + "> " + l)
+ for l in self.elines:
+ ls = l.split('\n')
+ ls[0] = lp + ls[0]
+ lp = ls.pop()
+ for ll in ls:
+ logerr(ll)
+ if lp:
+ logerr(lp)
+
+ def errfail(self):
+ """fail nicely if child did not terminate with success"""
+ self.errlog()
+ finalize(exval=1)
+
+ def terminate_geterr(self, fail_on_err=True):
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
+ try:
+ self.on_death_row = True
+ finally:
+ self.lock.release()
+ elines = self.errstore.pop(self)
+ if self.poll() is None:
+ self.terminate()
+ if self.poll() is None:
+ time.sleep(0.1)
+ self.kill()
+ self.wait()
+ while True:
+ if not select([self.stderr], [], [], 0.1)[0]:
+ break
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b)
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
+class Volinfo(object):
+
+ def __init__(self, vol, host='localhost', prelude=[]):
+ po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+ 'volume', 'info', vol],
+ stdout=PIPE, stderr=PIPE)
+ vix = po.stdout.read()
+ po.wait()
+ po.terminate_geterr()
+ vi = XET.fromstring(vix)
+ if vi.find('opRet').text != '0':
+ if prelude:
+ via = '(via %s) ' % prelude.join(' ')
+ else:
+ via = ' '
+ raise GsyncdError('getting volume info of %s%s '
+ 'failed with errorcode %s' %
+ (vol, via, vi.find('opErrno').text))
+ self.tree = vi
+ self.volume = vol
+ self.host = host
+
+ def get(self, elem):
+ return self.tree.findall('.//' + elem)
+
+ def is_tier(self):
+ return (self.get('typeStr')[0].text == 'Tier')
+
+ def is_hot(self, brickpath):
+ logging.debug('brickpath: ' + repr(brickpath))
+ return brickpath in self.hot_bricks
+
+ @property
+ @memoize
+ def bricks(self):
+ def bparse(b):
+ host, dirp = b.find("name").text.split(':', 2)
+ return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
+ return [bparse(b) for b in self.get('brick')]
+
+ @property
+ @memoize
+ def uuid(self):
+ ids = self.get('id')
+ if len(ids) != 1:
+ raise GsyncdError("volume info of %s obtained from %s: "
+ "ambiguous uuid" % (self.volume, self.host))
+ return ids[0].text
+
+ def replica_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotreplicaCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/coldreplicaCount')[0].text)
+ else:
+ return int(self.get('replicaCount')[0].text)
+
+ def disperse_count(self, tier, hot):
+ if (tier and hot):
+ # Tiering doesn't support disperse volume as hot brick,
+ # hence no xml output, so returning 0. In case, if it's
+ # supported later, we should change here.
+ return 0
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddisperseCount')[0].text)
+ else:
+ return int(self.get('disperseCount')[0].text)
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return [b.text for b in self.get('hotBricks/brick')]
+
+ def get_hot_bricks_count(self, tier):
+ if (tier):
+ return int(self.get('hotBricks/hotbrickCount')[0].text)
+ else:
+ return 0
--
1.8.3.1