Blob Blame History Raw
From a9db68fc1f05639cb79defef6ed7da58572113ea Mon Sep 17 00:00:00 2001
From: Kotresh HR <khiremat@redhat.com>
Date: Thu, 5 Jul 2018 07:07:38 -0400
Subject: [PATCH 328/333] geo-rep: Fix issues with gfid conflict handling

1. MKDIR/RMDIR is recorded on all bricks. So if
   one brick succeeds creating it, other bricks
   should ignore it. But this was not happening.
   The fix rename of directories in hybrid crawl,
   was trying to rename the directory to itself
   and in the process crashing with ENOENT if the
   directory is removed.

2. If file is created, deleted and a directory is
   created with same name, it was failing to sync.
   Again the issue is around the fix for rename
   of directories in hybrid crawl. Fixed the same.

   If the same case was done with hardlink present
   for the file, it was failing. This patch fixes
   that too.

Backport of
  > Patch: https://review.gluster.org/#/c/20473/
  > fixes: bz#1598884
  > Change-Id: I6f3bca44e194e415a3d4de3b9d03cc8976439284
  > Signed-off-by: Kotresh HR <khiremat@redhat.com>

BUG: 1598384
Change-Id: I6f3bca44e194e415a3d4de3b9d03cc8976439284
Signed-off-by: Kotresh HR <khiremat@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/143400
Tested-by: RHGS Build Bot <nigelb@redhat.com>
Reviewed-by: Aravinda Vishwanathapura Krishna Murthy <avishwan@redhat.com>
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
---
 geo-replication/syncdaemon/master.py     | 157 ++++++++++++++++++++++---------
 geo-replication/syncdaemon/resource.py   |  57 ++++++-----
 geo-replication/syncdaemon/syncdutils.py |  35 +++++++
 3 files changed, 180 insertions(+), 69 deletions(-)

diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 64e9836..1399378 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -692,7 +692,8 @@ class GMasterChangelogMixin(GMasterCommon):
     TYPE_GFID = "D "
     TYPE_ENTRY = "E "
 
-    MAX_EF_RETRIES = 15
+    MAX_EF_RETRIES = 10
+    MAX_OE_RETRIES = 5
 
     # flat directory hierarchy for gfid based access
     FLAT_DIR_HIERARCHY = '.'
@@ -788,38 +789,53 @@ class GMasterChangelogMixin(GMasterCommon):
 
         self.status.inc_value("failures", num_failures)
 
-    def fix_possible_entry_failures(self, failures, retry_count):
+    def fix_possible_entry_failures(self, failures, retry_count, entries):
         pfx = gauxpfx()
         fix_entry_ops = []
         failures1 = []
         for failure in failures:
-            if failure[2]['dst']:
+            if failure[2]['name_mismatch']:
+                pbname = failure[2]['slave_entry']
+            elif failure[2]['dst']:
                 pbname = failure[0]['entry1']
             else:
                 pbname = failure[0]['entry']
-            if failure[2]['gfid_mismatch']:
+
+            op = failure[0]['op']
+            # name exists but gfid is different
+            if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
                 slave_gfid = failure[2]['slave_gfid']
                 st = lstat(os.path.join(pfx, slave_gfid))
+                # Takes care of scenarios with no hardlinks
                 if isinstance(st, int) and st == ENOENT:
-                    logging.info(lf('Fixing gfid mismatch in slave. Deleting'
-                                    ' the entry', retry_count=retry_count,
+                    logging.info(lf('Entry not present on master. Fixing gfid '
+                                    'mismatch in slave. Deleting the entry',
+                                    retry_count=retry_count,
                                     entry=repr(failure)))
-                    #Add deletion to fix_entry_ops list
+                    # Add deletion to fix_entry_ops list
                     if failure[2]['slave_isdir']:
-                        fix_entry_ops.append(edct('RMDIR',
-                                                  gfid=failure[2]['slave_gfid'],
-                                                  entry=pbname))
+                        fix_entry_ops.append(
+                            edct('RMDIR',
+                                 gfid=failure[2]['slave_gfid'],
+                                 entry=pbname))
                     else:
-                        fix_entry_ops.append(edct('UNLINK',
-                                                  gfid=failure[2]['slave_gfid'],
-                                                  entry=pbname))
+                        fix_entry_ops.append(
+                            edct('UNLINK',
+                                 gfid=failure[2]['slave_gfid'],
+                                 entry=pbname))
+                # Takes care of scenarios of hardlinks/renames on master
                 elif not isinstance(st, int):
-                    #The file exists on master but with different name.
-                    #Probabaly renamed and got missed during xsync crawl.
-                    if failure[2]['slave_isdir']:
-                        logging.info(lf('Fixing gfid mismatch in slave',
+                    if matching_disk_gfid(slave_gfid, pbname):
+                        # Safe to ignore the failure as master contains same
+                        # file with same gfid. Remove entry from entries list
+                        logging.info(lf('Fixing gfid mismatch in slave. '
+                                        ' Safe to ignore, take out entry',
                                         retry_count=retry_count,
                                         entry=repr(failure)))
+                        entries.remove(failure[0])
+                    # The file exists on master but with different name.
+                    # Probably renamed and got missed during xsync crawl.
+                    elif failure[2]['slave_isdir']:
                         realpath = os.readlink(os.path.join(gconf.local_path,
                                                             ".glusterfs",
                                                             slave_gfid[0:2],
@@ -827,64 +843,99 @@ class GMasterChangelogMixin(GMasterCommon):
                                                             slave_gfid))
                         dst_entry = os.path.join(pfx, realpath.split('/')[-2],
                                                  realpath.split('/')[-1])
-                        rename_dict = edct('RENAME', gfid=slave_gfid,
-                                           entry=failure[0]['entry'],
-                                           entry1=dst_entry, stat=st,
-                                           link=None)
-                        logging.info(lf('Fixing gfid mismatch in slave. '
-                                        'Renaming', retry_count=retry_count,
-                                        entry=repr(rename_dict)))
-                        fix_entry_ops.append(rename_dict)
+                        src_entry = pbname
+                        logging.info(lf('Fixing dir name/gfid mismatch in '
+                                        'slave', retry_count=retry_count,
+                                        entry=repr(failure)))
+                        if src_entry == dst_entry:
+                            # Safe to ignore the failure as master contains
+                            # same directory as in slave with same gfid.
+                            # Remove the failure entry from entries list
+                            logging.info(lf('Fixing dir name/gfid mismatch'
+                                            ' in slave. Safe to ignore, '
+                                            'take out entry',
+                                            retry_count=retry_count,
+                                            entry=repr(failure)))
+                            entries.remove(failure[0])
+                        else:
+                            rename_dict = edct('RENAME', gfid=slave_gfid,
+                                               entry=src_entry,
+                                               entry1=dst_entry, stat=st,
+                                               link=None)
+                            logging.info(lf('Fixing dir name/gfid mismatch'
+                                            ' in slave. Renaming',
+                                            retry_count=retry_count,
+                                            entry=repr(rename_dict)))
+                            fix_entry_ops.append(rename_dict)
                     else:
-                        logging.info(lf('Fixing gfid mismatch in slave. '
-                                        ' Deleting the entry',
+                        # A hardlink file exists with different name or
+                        # renamed file exists and we are sure from
+                        # matching_disk_gfid check that the entry doesn't
+                        # exist with same gfid so we can safely delete on slave
+                        logging.info(lf('Fixing file gfid mismatch in slave. '
+                                        'Hardlink/Rename Case. Deleting entry',
+                                        retry_count=retry_count,
+                                        entry=repr(failure)))
+                        fix_entry_ops.append(
+                            edct('UNLINK',
+                                 gfid=failure[2]['slave_gfid'],
+                                 entry=pbname))
+            elif failure[1] == ENOENT:
+                # Ignore ENOENT error for fix_entry_ops aka retry_count > 1
+                if retry_count > 1:
+                    logging.info(lf('ENOENT error while fixing entry ops. '
+                                    'Safe to ignore, take out entry',
+                                    retry_count=retry_count,
+                                    entry=repr(failure)))
+                    entries.remove(failure[0])
+                elif op in ('MKNOD', 'CREATE', 'MKDIR'):
+                    pargfid = pbname.split('/')[1]
+                    st = lstat(os.path.join(pfx, pargfid))
+                    # Safe to ignore the failure as master doesn't contain
+                    # parent directory.
+                    if isinstance(st, int):
+                        logging.info(lf('Fixing ENOENT error in slave. Parent '
+                                        'does not exist on master. Safe to '
+                                        'ignore, take out entry',
                                         retry_count=retry_count,
                                         entry=repr(failure)))
-                        fix_entry_ops.append(edct('UNLINK',
-                                                  gfid=failure[2]['slave_gfid'],
-                                                  entry=pbname))
-                        logging.error(lf('Entry cannot be fixed in slave due '
-                                         'to GFID mismatch, find respective '
-                                         'path for the GFID and trigger sync',
-                                         gfid=slave_gfid))
+                        entries.remove(failure[0])
 
         if fix_entry_ops:
-            #Process deletions of entries whose gfids are mismatched
+            # Process deletions of entries whose gfids are mismatched
             failures1 = self.slave.server.entry_ops(fix_entry_ops)
-            if not failures1:
-                logging.info ("Sucessfully fixed entry ops with gfid mismatch")
 
-        return failures1
+        return (failures1, fix_entry_ops)
 
     def handle_entry_failures(self, failures, entries):
         retries = 0
         pending_failures = False
         failures1 = []
         failures2 = []
+        entry_ops1 = []
+        entry_ops2 = []
 
         if failures:
             pending_failures = True
             failures1 = failures
+            entry_ops1 = entries
 
             while pending_failures and retries < self.MAX_EF_RETRIES:
                 retries += 1
-                failures2 = self.fix_possible_entry_failures(failures1,
-                                                             retries)
+                (failures2, entry_ops2) = self.fix_possible_entry_failures(
+                    failures1, retries, entry_ops1)
                 if not failures2:
                     pending_failures = False
+                    logging.info(lf('Sucessfully fixed entry ops with gfid '
+                                 'mismatch', retry_count=retries))
                 else:
                     pending_failures = True
                     failures1 = failures2
+                    entry_ops1 = entry_ops2
 
             if pending_failures:
                 for failure in failures1:
                     logging.error("Failed to fix entry ops %s", repr(failure))
-            else:
-                #Retry original entry list 5 times
-                failures = self.slave.server.entry_ops(entries)
-
-            self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
-
 
     def process_change(self, change, done, retry):
         pfx = gauxpfx()
@@ -1112,7 +1163,19 @@ class GMasterChangelogMixin(GMasterCommon):
             self.status.inc_value("entry", len(entries))
 
             failures = self.slave.server.entry_ops(entries)
-            self.handle_entry_failures(failures, entries)
+            count = 0
+            while failures and count < self.MAX_OE_RETRIES:
+                count += 1
+                self.handle_entry_failures(failures, entries)
+                logging.info("Retry original entries. count = %s" % count)
+                failures = self.slave.server.entry_ops(entries)
+                if not failures:
+                    logging.info("Sucessfully fixed all entry ops with gfid "
+                                 "mismatch")
+                    break
+
+            self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+
             self.status.dec_value("entry", len(entries))
 
             # Update Entry stime in Brick Root only in case of Changelog mode
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 0d5462a..eb696f3 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -38,9 +38,9 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
 from syncdutils import GX_GFID_CANONICAL_LEN
 from gsyncdstatus import GeorepStatus
 from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import lf, Popen, sup, Volinfo
+from syncdutils import lf, Popen, sup
 from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
-from syncdutils import unshare_propagation_supported
+from syncdutils import unshare_propagation_supported, get_slv_dir_path
 
 UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
 HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
@@ -50,7 +50,6 @@ ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
 
 slv_volume = None
 slv_host = None
-slv_bricks = None
 
 def desugar(ustr):
     """transform sugared url strings to standard <scheme>://<urlbody> form
@@ -463,13 +462,23 @@ class Server(object):
             # to be purged is the GFID gotten from the changelog.
             # (a stat(changelog_gfid) would also be valid here)
             # The race here is between the GFID check and the purge.
+
+            # If the entry or the gfid of the file to be deleted is not present
+            # on slave, we can ignore the unlink/rmdir
+            if isinstance(lstat(entry), int) or \
+               isinstance(lstat(os.path.join(pfx, gfid)), int):
+                return
+
             if not matching_disk_gfid(gfid, entry):
                 collect_failure(e, EEXIST)
                 return
 
             if op == 'UNLINK':
                 er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY])
-                return er
+                # EISDIR is safe error, ignore. This can only happen when
+                # unlink is sent from master while fixing gfid conflicts.
+                if er != EISDIR:
+                    return er
 
             elif op == 'RMDIR':
                 er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,
@@ -480,7 +489,11 @@ class Server(object):
         def collect_failure(e, cmd_ret, dst=False):
             slv_entry_info = {}
             slv_entry_info['gfid_mismatch'] = False
+            slv_entry_info['name_mismatch'] = False
             slv_entry_info['dst'] = dst
+            slv_entry_info['slave_isdir'] = False
+            slv_entry_info['slave_name'] = None
+            slv_entry_info['slave_gfid'] = None
             # We do this for failing fops on Slave
             # Master should be logging this
             if cmd_ret is None:
@@ -498,6 +511,9 @@ class Server(object):
                     if not isinstance(st, int):
                         if st and stat.S_ISDIR(st.st_mode):
                             slv_entry_info['slave_isdir'] = True
+                            dir_name = get_slv_dir_path(slv_host, slv_volume,
+                                                        disk_gfid)
+                            slv_entry_info['slave_name'] = dir_name
                         else:
                             slv_entry_info['slave_isdir'] = False
                     slv_entry_info['slave_gfid'] = disk_gfid
@@ -618,37 +634,34 @@ class Server(object):
                                          [ENOENT, EEXIST], [ESTALE])
                     collect_failure(e, cmd_ret)
             elif op == 'MKDIR':
+                en = e['entry']
                 slink = os.path.join(pfx, gfid)
                 st = lstat(slink)
                 # don't create multiple entries with same gfid
                 if isinstance(st, int):
                     blob = entry_pack_mkdir(
                         gfid, bname, e['mode'], e['uid'], e['gid'])
-                else:
+                elif (isinstance(lstat(en), int) or
+                      not matching_disk_gfid(gfid, en)):
                     # If gfid of a directory exists on slave but path based
                     # create is getting EEXIST. This means the directory is
                     # renamed in master but recorded as MKDIR during hybrid
                     # crawl. Get the directory path by reading the backend
                     # symlink and trying to rename to new name as said by
                     # master.
-                    global slv_bricks
-                    global slv_volume
-                    global slv_host
-                    if not slv_bricks:
-                        slv_info = Volinfo (slv_volume, slv_host)
-                        slv_bricks = slv_info.bricks
-                    # Result of readlink would be of format as below.
-                    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
-                    realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
-                                                        ".glusterfs", gfid[0:2],
-                                                        gfid[2:4], gfid))
-                    realpath_parts = realpath.split('/')
-                    src_pargfid = realpath_parts[-2]
-                    src_basename = realpath_parts[-1]
-                    src_entry = os.path.join(pfx, src_pargfid, src_basename)
                     logging.info(lf("Special case: rename on mkdir",
-                                   gfid=gfid, entry=repr(entry)))
-                    rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
+                                    gfid=gfid, entry=repr(entry)))
+                    src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+                    if src_entry is not None and src_entry != entry:
+                        slv_entry_info = {}
+                        slv_entry_info['gfid_mismatch'] = False
+                        slv_entry_info['name_mismatch'] = True
+                        slv_entry_info['dst'] = False
+                        slv_entry_info['slave_isdir'] = True
+                        slv_entry_info['slave_gfid'] = gfid
+                        slv_entry_info['slave_entry'] = src_entry
+
+                        failures.append((e, EEXIST, slv_entry_info))
             elif op == 'LINK':
                 slink = os.path.join(pfx, gfid)
                 st = lstat(slink)
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 6dafb0a..d798356 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -77,6 +77,7 @@ CHANGELOG_AGENT_CLIENT_VERSION = 1.0
 NodeID = None
 rsync_version = None
 unshare_mnt_propagation = None
+slv_bricks = None
 SPACE_ESCAPE_CHAR = "%20"
 NEWLINE_ESCAPE_CHAR = "%0A"
 PERCENTAGE_ESCAPE_CHAR = "%25"
@@ -671,6 +672,40 @@ def get_rsync_version(rsync_cmd):
     return rsync_version
 
 
+def get_slv_dir_path(slv_host, slv_volume, gfid):
+    global slv_bricks
+
+    dir_path = ENOENT
+
+    if not slv_bricks:
+        slv_info = Volinfo(slv_volume, slv_host)
+        slv_bricks = slv_info.bricks
+    # Result of readlink would be of format as below.
+    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+    for brick in slv_bricks:
+        dir_path = errno_wrap(os.path.join,
+                              [brick['dir'],
+                               ".glusterfs", gfid[0:2],
+                               gfid[2:4],
+                               gfid], [ENOENT], [ESTALE])
+        if dir_path != ENOENT:
+            break
+
+    if not isinstance(dir_path, int):
+        realpath = errno_wrap(os.readlink, [dir_path],
+                              [ENOENT], [ESTALE])
+
+        if not isinstance(realpath, int):
+            realpath_parts = realpath.split('/')
+            pargfid = realpath_parts[-2]
+            basename = realpath_parts[-1]
+            pfx = gauxpfx()
+            dir_entry = os.path.join(pfx, pargfid, basename)
+            return dir_entry
+
+    return None
+
+
 def lf(event, **kwargs):
     """
     Log Format helper function, log messages can be
-- 
1.8.3.1