Blob Blame History Raw
From d354eb1abb2160495e205c87e1b2ecd8778c70ed Mon Sep 17 00:00:00 2001
From: Kotresh HR <khiremat@redhat.com>
Date: Thu, 21 Sep 2017 18:11:15 -0400
Subject: [PATCH 292/305] geo-rep: Fix rename of directory in hybrid crawl

In hybrid crawl, renames and unlink can't be
synced but directory renames can be detected.
While syncing the directory on slave, if the
gfid already exists, it should be rename.
Hence if directory gfid already exists, rename
it.

Backport of:
 > Patch: https://review.gluster.org/18448
 > Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
 > BUG: 1499566
 > Signed-off-by: Kotresh HR <khiremat@redhat.com>

Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
BUG: 1582417
Signed-off-by: Kotresh HR <khiremat@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/140285
Tested-by: RHGS Build Bot <nigelb@redhat.com>
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
---
 geo-replication/syncdaemon/gsyncd.py     |   4 +-
 geo-replication/syncdaemon/monitor.py    |  85 +----------
 geo-replication/syncdaemon/resource.py   | 191 +++++--------------------
 geo-replication/syncdaemon/syncdutils.py | 237 +++++++++++++++++++++++++++++++
 4 files changed, 276 insertions(+), 241 deletions(-)

diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 629e8b7..b0ed0ae 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
 from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
 from libcxattr import Xattr
 import struct
-from syncdutils import get_master_and_slave_data_from_args, lf
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
 
 ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
 
@@ -778,7 +778,7 @@ def main_i():
     else:
         gconf.label = 'slave'
     startup(go_daemon=go_daemon, log_file=log_file, label=gconf.label)
-    resource.Popen.init_errhandler()
+    Popen.init_errhandler()
 
     if be_agent:
         os.setsid()
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 0f43c4f..55f8330 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -16,7 +16,7 @@ import logging
 import uuid
 import xml.etree.ElementTree as XET
 from subprocess import PIPE
-from resource import Popen, FILE, GLUSTER, SSH
+from resource import FILE, GLUSTER, SSH
 from threading import Lock
 from errno import ECHILD, ESRCH
 import re
@@ -24,8 +24,9 @@ import random
 from gconf import gconf
 from syncdutils import select, waitpid, errno_wrap, lf
 from syncdutils import set_term_handler, is_host_local, GsyncdError
-from syncdutils import escape, Thread, finalize, memoize, boolify
+from syncdutils import escape, Thread, finalize, boolify
 from syncdutils import gf_event, EVENT_GEOREP_FAULTY
+from syncdutils import Volinfo, Popen
 
 from gsyncdstatus import GeorepStatus, set_monitor_status
 
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
     return list(up_hosts)
 
 
-class Volinfo(object):
-
-    def __init__(self, vol, host='localhost', prelude=[]):
-        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
-                              'volume', 'info', vol],
-                   stdout=PIPE, stderr=PIPE)
-        vix = po.stdout.read()
-        po.wait()
-        po.terminate_geterr()
-        vi = XET.fromstring(vix)
-        if vi.find('opRet').text != '0':
-            if prelude:
-                via = '(via %s) ' % prelude.join(' ')
-            else:
-                via = ' '
-            raise GsyncdError('getting volume info of %s%s '
-                              'failed with errorcode %s' %
-                              (vol, via, vi.find('opErrno').text))
-        self.tree = vi
-        self.volume = vol
-        self.host = host
-
-    def get(self, elem):
-        return self.tree.findall('.//' + elem)
-
-    def is_tier(self):
-        return (self.get('typeStr')[0].text == 'Tier')
-
-    def is_hot(self, brickpath):
-        logging.debug('brickpath: ' + repr(brickpath))
-        return brickpath in self.hot_bricks
-
-    @property
-    @memoize
-    def bricks(self):
-        def bparse(b):
-            host, dirp = b.find("name").text.split(':', 2)
-            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
-        return [bparse(b) for b in self.get('brick')]
-
-    @property
-    @memoize
-    def uuid(self):
-        ids = self.get('id')
-        if len(ids) != 1:
-            raise GsyncdError("volume info of %s obtained from %s: "
-                              "ambiguous uuid" % (self.volume, self.host))
-        return ids[0].text
-
-    def replica_count(self, tier, hot):
-        if (tier and hot):
-            return int(self.get('hotBricks/hotreplicaCount')[0].text)
-        elif (tier and not hot):
-            return int(self.get('coldBricks/coldreplicaCount')[0].text)
-        else:
-            return int(self.get('replicaCount')[0].text)
-
-    def disperse_count(self, tier, hot):
-        if (tier and hot):
-            # Tiering doesn't support disperse volume as hot brick,
-            # hence no xml output, so returning 0. In case, if it's
-            # supported later, we should change here.
-            return 0
-        elif (tier and not hot):
-            return int(self.get('coldBricks/colddisperseCount')[0].text)
-        else:
-            return int(self.get('disperseCount')[0].text)
-
-    @property
-    @memoize
-    def hot_bricks(self):
-        return [b.text for b in self.get('hotBricks/brick')]
-
-    def get_hot_bricks_count(self, tier):
-        if (tier):
-            return int(self.get('hotBricks/hotbrickCount')[0].text)
-        else:
-            return 0
-
-
 class Monitor(object):
 
     """class which spawns and manages gsyncd workers"""
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index d6618c1..c4b5b53 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -13,21 +13,16 @@ import os
 import sys
 import stat
 import time
-import signal
 import fcntl
-import errno
 import types
 import struct
 import socket
 import logging
 import tempfile
-import threading
 import subprocess
 import errno
 from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
 from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
-from select import error as SelectError
-import shutil
 
 from gconf import gconf
 import repce
@@ -43,7 +38,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
 from syncdutils import GX_GFID_CANONICAL_LEN
 from gsyncdstatus import GeorepStatus
 from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import lf
+from syncdutils import lf, Popen, sup, Volinfo
 from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
 
 UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -52,14 +47,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
 
 ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
 
-def sup(x, *a, **kw):
-    """a rubyesque "super" for python ;)
-
-    invoke caller method in parent class with given args.
-    """
-    return getattr(super(type(x), x),
-                   sys._getframe(1).f_code.co_name)(*a, **kw)
-
+slv_volume = None
+slv_host = None
+slv_bricks = None
 
 def desugar(ustr):
     """transform sugared url strings to standard <scheme>://<urlbody> form
@@ -114,149 +104,6 @@ def parse_url(ustr):
     return getattr(this, sch.upper())(path)
 
 
-class Popen(subprocess.Popen):
-
-    """customized subclass of subprocess.Popen with a ring
-    buffer for children error output"""
-
-    @classmethod
-    def init_errhandler(cls):
-        """start the thread which handles children's error output"""
-        cls.errstore = {}
-
-        def tailer():
-            while True:
-                errstore = cls.errstore.copy()
-                try:
-                    poe, _, _ = select(
-                        [po.stderr for po in errstore], [], [], 1)
-                except (ValueError, SelectError):
-                    # stderr is already closed wait for some time before
-                    # checking next error
-                    time.sleep(0.5)
-                    continue
-                for po in errstore:
-                    if po.stderr not in poe:
-                        continue
-                    po.lock.acquire()
-                    try:
-                        if po.on_death_row:
-                            continue
-                        la = errstore[po]
-                        try:
-                            fd = po.stderr.fileno()
-                        except ValueError:  # file is already closed
-                            time.sleep(0.5)
-                            continue
-
-                        try:
-                            l = os.read(fd, 1024)
-                        except OSError:
-                            time.sleep(0.5)
-                            continue
-
-                        if not l:
-                            continue
-                        tots = len(l)
-                        for lx in la:
-                            tots += len(lx)
-                        while tots > 1 << 20 and la:
-                            tots -= len(la.pop(0))
-                        la.append(l)
-                    finally:
-                        po.lock.release()
-        t = syncdutils.Thread(target=tailer)
-        t.start()
-        cls.errhandler = t
-
-    @classmethod
-    def fork(cls):
-        """fork wrapper that restarts errhandler thread in child"""
-        pid = os.fork()
-        if not pid:
-            cls.init_errhandler()
-        return pid
-
-    def __init__(self, args, *a, **kw):
-        """customizations for subprocess.Popen instantiation
-
-        - 'close_fds' is taken to be the default
-        - if child's stderr is chosen to be managed,
-          register it with the error handler thread
-        """
-        self.args = args
-        if 'close_fds' not in kw:
-            kw['close_fds'] = True
-        self.lock = threading.Lock()
-        self.on_death_row = False
-        self.elines = []
-        try:
-            sup(self, args, *a, **kw)
-        except:
-            ex = sys.exc_info()[1]
-            if not isinstance(ex, OSError):
-                raise
-            raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
-                              (args[0], errno.errorcode[ex.errno],
-                               os.strerror(ex.errno)))
-        if kw.get('stderr') == subprocess.PIPE:
-            assert(getattr(self, 'errhandler', None))
-            self.errstore[self] = []
-
-    def errlog(self):
-        """make a log about child's failure event"""
-        logging.error(lf("command returned error",
-                         cmd=" ".join(self.args),
-                         error=self.returncode))
-        lp = ''
-
-        def logerr(l):
-            logging.error(self.args[0] + "> " + l)
-        for l in self.elines:
-            ls = l.split('\n')
-            ls[0] = lp + ls[0]
-            lp = ls.pop()
-            for ll in ls:
-                logerr(ll)
-        if lp:
-            logerr(lp)
-
-    def errfail(self):
-        """fail nicely if child did not terminate with success"""
-        self.errlog()
-        syncdutils.finalize(exval=1)
-
-    def terminate_geterr(self, fail_on_err=True):
-        """kill child, finalize stderr harvesting (unregister
-        from errhandler, set up .elines), fail on error if
-        asked for
-        """
-        self.lock.acquire()
-        try:
-            self.on_death_row = True
-        finally:
-            self.lock.release()
-        elines = self.errstore.pop(self)
-        if self.poll() is None:
-            self.terminate()
-            if self.poll() is None:
-                time.sleep(0.1)
-                self.kill()
-                self.wait()
-        while True:
-            if not select([self.stderr], [], [], 0.1)[0]:
-                break
-            b = os.read(self.stderr.fileno(), 1024)
-            if b:
-                elines.append(b)
-            else:
-                break
-        self.stderr.close()
-        self.elines = elines
-        if fail_on_err and self.returncode != 0:
-            self.errfail()
-
-
 class Server(object):
 
     """singleton implemening those filesystem access primitives
@@ -776,6 +623,31 @@ class Server(object):
                 if isinstance(st, int):
                     blob = entry_pack_mkdir(
                         gfid, bname, e['mode'], e['uid'], e['gid'])
+                else:
+                    # If gfid of a directory exists on slave but path based
+                    # create is getting EEXIST. This means the directory is
+                    # renamed in master but recorded as MKDIR during hybrid
+                    # crawl. Get the directory path by reading the backend
+                    # symlink and trying to rename to new name as said by
+                    # master.
+                    global slv_bricks
+                    global slv_volume
+                    global slv_host
+                    if not slv_bricks:
+                        slv_info = Volinfo (slv_volume, slv_host)
+                        slv_bricks = slv_info.bricks
+                    # Result of readlink would be of format as below.
+                    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+                    realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
+                                                        ".glusterfs", gfid[0:2],
+                                                        gfid[2:4], gfid))
+                    realpath_parts = realpath.split('/')
+                    src_pargfid = realpath_parts[-2]
+                    src_basename = realpath_parts[-1]
+                    src_entry = os.path.join(pfx, src_pargfid, src_basename)
+                    logging.info(lf("Special case: rename on mkdir",
+                                   gfid=gfid, entry=repr(entry)))
+                    rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
             elif op == 'LINK':
                 slink = os.path.join(pfx, gfid)
                 st = lstat(slink)
@@ -1318,6 +1190,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
     def __init__(self, path):
         self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
 
+        global slv_volume
+        global slv_host
+        slv_volume = self.volume
+        slv_host = self.host
+
     def canonical_path(self):
         return ':'.join([gethostbyname(self.host), self.volume])
 
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 2b57f83..a493c37 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -16,13 +16,18 @@ import fcntl
 import shutil
 import logging
 import socket
+import errno
+import threading
 import subprocess
+from subprocess import PIPE
 from threading import Lock, Thread as baseThread
 from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
 from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
 from signal import signal, SIGTERM
 import select as oselect
 from os import waitpid as owaitpid
+import xml.etree.ElementTree as XET
+from select import error as SelectError
 
 from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
 sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
@@ -76,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
 PERCENTAGE_ESCAPE_CHAR = "%25"
 
 
+def sup(x, *a, **kw):
+    """a rubyesque "super" for python ;)
+
+    invoke caller method in parent class with given args.
+    """
+    return getattr(super(type(x), x),
+                   sys._getframe(1).f_code.co_name)(*a, **kw)
+
+
 def escape(s):
     """the chosen flavor of string escaping, used all over
        to turn whatever data to creatable representation"""
@@ -650,3 +664,226 @@ def lf(event, **kwargs):
     for k, v in kwargs.items():
         msg += "\t{0}={1}".format(k, v)
     return msg
+
+
+class Popen(subprocess.Popen):
+
+    """customized subclass of subprocess.Popen with a ring
+    buffer for children error output"""
+
+    @classmethod
+    def init_errhandler(cls):
+        """start the thread which handles children's error output"""
+        cls.errstore = {}
+
+        def tailer():
+            while True:
+                errstore = cls.errstore.copy()
+                try:
+                    poe, _, _ = select(
+                        [po.stderr for po in errstore], [], [], 1)
+                except (ValueError, SelectError):
+                    # stderr is already closed wait for some time before
+                    # checking next error
+                    time.sleep(0.5)
+                    continue
+                for po in errstore:
+                    if po.stderr not in poe:
+                        continue
+                    po.lock.acquire()
+                    try:
+                        if po.on_death_row:
+                            continue
+                        la = errstore[po]
+                        try:
+                            fd = po.stderr.fileno()
+                        except ValueError:  # file is already closed
+                            time.sleep(0.5)
+                            continue
+
+                        try:
+                            l = os.read(fd, 1024)
+                        except OSError:
+                            time.sleep(0.5)
+                            continue
+
+                        if not l:
+                            continue
+                        tots = len(l)
+                        for lx in la:
+                            tots += len(lx)
+                        while tots > 1 << 20 and la:
+                            tots -= len(la.pop(0))
+                        la.append(l)
+                    finally:
+                        po.lock.release()
+        t = Thread(target=tailer)
+        t.start()
+        cls.errhandler = t
+
+    @classmethod
+    def fork(cls):
+        """fork wrapper that restarts errhandler thread in child"""
+        pid = os.fork()
+        if not pid:
+            cls.init_errhandler()
+        return pid
+
+    def __init__(self, args, *a, **kw):
+        """customizations for subprocess.Popen instantiation
+
+        - 'close_fds' is taken to be the default
+        - if child's stderr is chosen to be managed,
+          register it with the error handler thread
+        """
+        self.args = args
+        if 'close_fds' not in kw:
+            kw['close_fds'] = True
+        self.lock = threading.Lock()
+        self.on_death_row = False
+        self.elines = []
+        try:
+            sup(self, args, *a, **kw)
+        except:
+            ex = sys.exc_info()[1]
+            if not isinstance(ex, OSError):
+                raise
+            raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+                              (args[0], errno.errorcode[ex.errno],
+                               os.strerror(ex.errno)))
+        if kw.get('stderr') == subprocess.PIPE:
+            assert(getattr(self, 'errhandler', None))
+            self.errstore[self] = []
+
+    def errlog(self):
+        """make a log about child's failure event"""
+        logging.error(lf("command returned error",
+                         cmd=" ".join(self.args),
+                         error=self.returncode))
+        lp = ''
+
+        def logerr(l):
+            logging.error(self.args[0] + "> " + l)
+        for l in self.elines:
+            ls = l.split('\n')
+            ls[0] = lp + ls[0]
+            lp = ls.pop()
+            for ll in ls:
+                logerr(ll)
+        if lp:
+            logerr(lp)
+
+    def errfail(self):
+        """fail nicely if child did not terminate with success"""
+        self.errlog()
+        finalize(exval=1)
+
+    def terminate_geterr(self, fail_on_err=True):
+        """kill child, finalize stderr harvesting (unregister
+        from errhandler, set up .elines), fail on error if
+        asked for
+        """
+        self.lock.acquire()
+        try:
+            self.on_death_row = True
+        finally:
+            self.lock.release()
+        elines = self.errstore.pop(self)
+        if self.poll() is None:
+            self.terminate()
+            if self.poll() is None:
+                time.sleep(0.1)
+                self.kill()
+                self.wait()
+        while True:
+            if not select([self.stderr], [], [], 0.1)[0]:
+                break
+            b = os.read(self.stderr.fileno(), 1024)
+            if b:
+                elines.append(b)
+            else:
+                break
+        self.stderr.close()
+        self.elines = elines
+        if fail_on_err and self.returncode != 0:
+            self.errfail()
+
+
+class Volinfo(object):
+
+    def __init__(self, vol, host='localhost', prelude=[]):
+        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+                              'volume', 'info', vol],
+                   stdout=PIPE, stderr=PIPE)
+        vix = po.stdout.read()
+        po.wait()
+        po.terminate_geterr()
+        vi = XET.fromstring(vix)
+        if vi.find('opRet').text != '0':
+            if prelude:
+                via = '(via %s) ' % prelude.join(' ')
+            else:
+                via = ' '
+            raise GsyncdError('getting volume info of %s%s '
+                              'failed with errorcode %s' %
+                              (vol, via, vi.find('opErrno').text))
+        self.tree = vi
+        self.volume = vol
+        self.host = host
+
+    def get(self, elem):
+        return self.tree.findall('.//' + elem)
+
+    def is_tier(self):
+        return (self.get('typeStr')[0].text == 'Tier')
+
+    def is_hot(self, brickpath):
+        logging.debug('brickpath: ' + repr(brickpath))
+        return brickpath in self.hot_bricks
+
+    @property
+    @memoize
+    def bricks(self):
+        def bparse(b):
+            host, dirp = b.find("name").text.split(':', 2)
+            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
+        return [bparse(b) for b in self.get('brick')]
+
+    @property
+    @memoize
+    def uuid(self):
+        ids = self.get('id')
+        if len(ids) != 1:
+            raise GsyncdError("volume info of %s obtained from %s: "
+                              "ambiguous uuid" % (self.volume, self.host))
+        return ids[0].text
+
+    def replica_count(self, tier, hot):
+        if (tier and hot):
+            return int(self.get('hotBricks/hotreplicaCount')[0].text)
+        elif (tier and not hot):
+            return int(self.get('coldBricks/coldreplicaCount')[0].text)
+        else:
+            return int(self.get('replicaCount')[0].text)
+
+    def disperse_count(self, tier, hot):
+        if (tier and hot):
+            # Tiering doesn't support disperse volume as hot brick,
+            # hence no xml output, so returning 0. In case, if it's
+            # supported later, we should change here.
+            return 0
+        elif (tier and not hot):
+            return int(self.get('coldBricks/colddisperseCount')[0].text)
+        else:
+            return int(self.get('disperseCount')[0].text)
+
+    @property
+    @memoize
+    def hot_bricks(self):
+        return [b.text for b in self.get('hotBricks/brick')]
+
+    def get_hot_bricks_count(self, tier):
+        if (tier):
+            return int(self.get('hotBricks/hotbrickCount')[0].text)
+        else:
+            return 0
-- 
1.8.3.1