a3470f
From d354eb1abb2160495e205c87e1b2ecd8778c70ed Mon Sep 17 00:00:00 2001
a3470f
From: Kotresh HR <khiremat@redhat.com>
a3470f
Date: Thu, 21 Sep 2017 18:11:15 -0400
a3470f
Subject: [PATCH 292/305] geo-rep: Fix rename of directory in hybrid crawl
a3470f
a3470f
In hybrid crawl, renames and unlink can't be
a3470f
synced but directory renames can be detected.
a3470f
While syncing the directory on slave, if the
a3470f
gfid already exists, it should be rename.
a3470f
Hence if directory gfid already exists, rename
a3470f
it.
a3470f
a3470f
Backport of:
a3470f
 > Patch: https://review.gluster.org/18448
a3470f
 > Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
a3470f
 > BUG: 1499566
a3470f
 > Signed-off-by: Kotresh HR <khiremat@redhat.com>
a3470f
a3470f
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
a3470f
BUG: 1582417
a3470f
Signed-off-by: Kotresh HR <khiremat@redhat.com>
a3470f
Reviewed-on: https://code.engineering.redhat.com/gerrit/140285
a3470f
Tested-by: RHGS Build Bot <nigelb@redhat.com>
a3470f
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
a3470f
---
a3470f
 geo-replication/syncdaemon/gsyncd.py     |   4 +-
a3470f
 geo-replication/syncdaemon/monitor.py    |  85 +----------
a3470f
 geo-replication/syncdaemon/resource.py   | 191 +++++--------------------
a3470f
 geo-replication/syncdaemon/syncdutils.py | 237 +++++++++++++++++++++++++++++++
a3470f
 4 files changed, 276 insertions(+), 241 deletions(-)
a3470f
a3470f
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
a3470f
index 629e8b7..b0ed0ae 100644
a3470f
--- a/geo-replication/syncdaemon/gsyncd.py
a3470f
+++ b/geo-replication/syncdaemon/gsyncd.py
a3470f
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
a3470f
 from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
a3470f
 from libcxattr import Xattr
a3470f
 import struct
a3470f
-from syncdutils import get_master_and_slave_data_from_args, lf
a3470f
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
a3470f
 
a3470f
 ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
a3470f
 
a3470f
@@ -778,7 +778,7 @@ def main_i():
a3470f
     else:
a3470f
         gconf.label = 'slave'
a3470f
     startup(go_daemon=go_daemon, log_file=log_file, label=gconf.label)
a3470f
-    resource.Popen.init_errhandler()
a3470f
+    Popen.init_errhandler()
a3470f
 
a3470f
     if be_agent:
a3470f
         os.setsid()
a3470f
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
a3470f
index 0f43c4f..55f8330 100644
a3470f
--- a/geo-replication/syncdaemon/monitor.py
a3470f
+++ b/geo-replication/syncdaemon/monitor.py
a3470f
@@ -16,7 +16,7 @@ import logging
a3470f
 import uuid
a3470f
 import xml.etree.ElementTree as XET
a3470f
 from subprocess import PIPE
a3470f
-from resource import Popen, FILE, GLUSTER, SSH
a3470f
+from resource import FILE, GLUSTER, SSH
a3470f
 from threading import Lock
a3470f
 from errno import ECHILD, ESRCH
a3470f
 import re
a3470f
@@ -24,8 +24,9 @@ import random
a3470f
 from gconf import gconf
a3470f
 from syncdutils import select, waitpid, errno_wrap, lf
a3470f
 from syncdutils import set_term_handler, is_host_local, GsyncdError
a3470f
-from syncdutils import escape, Thread, finalize, memoize, boolify
a3470f
+from syncdutils import escape, Thread, finalize, boolify
a3470f
 from syncdutils import gf_event, EVENT_GEOREP_FAULTY
a3470f
+from syncdutils import Volinfo, Popen
a3470f
 
a3470f
 from gsyncdstatus import GeorepStatus, set_monitor_status
a3470f
 
a3470f
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
a3470f
     return list(up_hosts)
a3470f
 
a3470f
 
a3470f
-class Volinfo(object):
a3470f
-
a3470f
-    def __init__(self, vol, host='localhost', prelude=[]):
a3470f
-        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
a3470f
-                              'volume', 'info', vol],
a3470f
-                   stdout=PIPE, stderr=PIPE)
a3470f
-        vix = po.stdout.read()
a3470f
-        po.wait()
a3470f
-        po.terminate_geterr()
a3470f
-        vi = XET.fromstring(vix)
a3470f
-        if vi.find('opRet').text != '0':
a3470f
-            if prelude:
a3470f
-                via = '(via %s) ' % prelude.join(' ')
a3470f
-            else:
a3470f
-                via = ' '
a3470f
-            raise GsyncdError('getting volume info of %s%s '
a3470f
-                              'failed with errorcode %s' %
a3470f
-                              (vol, via, vi.find('opErrno').text))
a3470f
-        self.tree = vi
a3470f
-        self.volume = vol
a3470f
-        self.host = host
a3470f
-
a3470f
-    def get(self, elem):
a3470f
-        return self.tree.findall('.//' + elem)
a3470f
-
a3470f
-    def is_tier(self):
a3470f
-        return (self.get('typeStr')[0].text == 'Tier')
a3470f
-
a3470f
-    def is_hot(self, brickpath):
a3470f
-        logging.debug('brickpath: ' + repr(brickpath))
a3470f
-        return brickpath in self.hot_bricks
a3470f
-
a3470f
-    @property
a3470f
-    @memoize
a3470f
-    def bricks(self):
a3470f
-        def bparse(b):
a3470f
-            host, dirp = b.find("name").text.split(':', 2)
a3470f
-            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
a3470f
-        return [bparse(b) for b in self.get('brick')]
a3470f
-
a3470f
-    @property
a3470f
-    @memoize
a3470f
-    def uuid(self):
a3470f
-        ids = self.get('id')
a3470f
-        if len(ids) != 1:
a3470f
-            raise GsyncdError("volume info of %s obtained from %s: "
a3470f
-                              "ambiguous uuid" % (self.volume, self.host))
a3470f
-        return ids[0].text
a3470f
-
a3470f
-    def replica_count(self, tier, hot):
a3470f
-        if (tier and hot):
a3470f
-            return int(self.get('hotBricks/hotreplicaCount')[0].text)
a3470f
-        elif (tier and not hot):
a3470f
-            return int(self.get('coldBricks/coldreplicaCount')[0].text)
a3470f
-        else:
a3470f
-            return int(self.get('replicaCount')[0].text)
a3470f
-
a3470f
-    def disperse_count(self, tier, hot):
a3470f
-        if (tier and hot):
a3470f
-            # Tiering doesn't support disperse volume as hot brick,
a3470f
-            # hence no xml output, so returning 0. In case, if it's
a3470f
-            # supported later, we should change here.
a3470f
-            return 0
a3470f
-        elif (tier and not hot):
a3470f
-            return int(self.get('coldBricks/colddisperseCount')[0].text)
a3470f
-        else:
a3470f
-            return int(self.get('disperseCount')[0].text)
a3470f
-
a3470f
-    @property
a3470f
-    @memoize
a3470f
-    def hot_bricks(self):
a3470f
-        return [b.text for b in self.get('hotBricks/brick')]
a3470f
-
a3470f
-    def get_hot_bricks_count(self, tier):
a3470f
-        if (tier):
a3470f
-            return int(self.get('hotBricks/hotbrickCount')[0].text)
a3470f
-        else:
a3470f
-            return 0
a3470f
-
a3470f
-
a3470f
 class Monitor(object):
a3470f
 
a3470f
     """class which spawns and manages gsyncd workers"""
a3470f
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
a3470f
index d6618c1..c4b5b53 100644
a3470f
--- a/geo-replication/syncdaemon/resource.py
a3470f
+++ b/geo-replication/syncdaemon/resource.py
a3470f
@@ -13,21 +13,16 @@ import os
a3470f
 import sys
a3470f
 import stat
a3470f
 import time
a3470f
-import signal
a3470f
 import fcntl
a3470f
-import errno
a3470f
 import types
a3470f
 import struct
a3470f
 import socket
a3470f
 import logging
a3470f
 import tempfile
a3470f
-import threading
a3470f
 import subprocess
a3470f
 import errno
a3470f
 from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
a3470f
 from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
a3470f
-from select import error as SelectError
a3470f
-import shutil
a3470f
 
a3470f
 from gconf import gconf
a3470f
 import repce
a3470f
@@ -43,7 +38,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
a3470f
 from syncdutils import GX_GFID_CANONICAL_LEN
a3470f
 from gsyncdstatus import GeorepStatus
a3470f
 from syncdutils import get_master_and_slave_data_from_args
a3470f
-from syncdutils import lf
a3470f
+from syncdutils import lf, Popen, sup, Volinfo
a3470f
 from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
a3470f
 
a3470f
 UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
a3470f
@@ -52,14 +47,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
a3470f
 
a3470f
 ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
a3470f
 
a3470f
-def sup(x, *a, **kw):
a3470f
-    """a rubyesque "super" for python ;)
a3470f
-
a3470f
-    invoke caller method in parent class with given args.
a3470f
-    """
a3470f
-    return getattr(super(type(x), x),
a3470f
-                   sys._getframe(1).f_code.co_name)(*a, **kw)
a3470f
-
a3470f
+slv_volume = None
a3470f
+slv_host = None
a3470f
+slv_bricks = None
a3470f
 
a3470f
 def desugar(ustr):
a3470f
     """transform sugared url strings to standard <scheme>://<urlbody> form
a3470f
@@ -114,149 +104,6 @@ def parse_url(ustr):
a3470f
     return getattr(this, sch.upper())(path)
a3470f
 
a3470f
 
a3470f
-class Popen(subprocess.Popen):
a3470f
-
a3470f
-    """customized subclass of subprocess.Popen with a ring
a3470f
-    buffer for children error output"""
a3470f
-
a3470f
-    @classmethod
a3470f
-    def init_errhandler(cls):
a3470f
-        """start the thread which handles children's error output"""
a3470f
-        cls.errstore = {}
a3470f
-
a3470f
-        def tailer():
a3470f
-            while True:
a3470f
-                errstore = cls.errstore.copy()
a3470f
-                try:
a3470f
-                    poe, _, _ = select(
a3470f
-                        [po.stderr for po in errstore], [], [], 1)
a3470f
-                except (ValueError, SelectError):
a3470f
-                    # stderr is already closed wait for some time before
a3470f
-                    # checking next error
a3470f
-                    time.sleep(0.5)
a3470f
-                    continue
a3470f
-                for po in errstore:
a3470f
-                    if po.stderr not in poe:
a3470f
-                        continue
a3470f
-                    po.lock.acquire()
a3470f
-                    try:
a3470f
-                        if po.on_death_row:
a3470f
-                            continue
a3470f
-                        la = errstore[po]
a3470f
-                        try:
a3470f
-                            fd = po.stderr.fileno()
a3470f
-                        except ValueError:  # file is already closed
a3470f
-                            time.sleep(0.5)
a3470f
-                            continue
a3470f
-
a3470f
-                        try:
a3470f
-                            l = os.read(fd, 1024)
a3470f
-                        except OSError:
a3470f
-                            time.sleep(0.5)
a3470f
-                            continue
a3470f
-
a3470f
-                        if not l:
a3470f
-                            continue
a3470f
-                        tots = len(l)
a3470f
-                        for lx in la:
a3470f
-                            tots += len(lx)
a3470f
-                        while tots > 1 << 20 and la:
a3470f
-                            tots -= len(la.pop(0))
a3470f
-                        la.append(l)
a3470f
-                    finally:
a3470f
-                        po.lock.release()
a3470f
-        t = syncdutils.Thread(target=tailer)
a3470f
-        t.start()
a3470f
-        cls.errhandler = t
a3470f
-
a3470f
-    @classmethod
a3470f
-    def fork(cls):
a3470f
-        """fork wrapper that restarts errhandler thread in child"""
a3470f
-        pid = os.fork()
a3470f
-        if not pid:
a3470f
-            cls.init_errhandler()
a3470f
-        return pid
a3470f
-
a3470f
-    def __init__(self, args, *a, **kw):
a3470f
-        """customizations for subprocess.Popen instantiation
a3470f
-
a3470f
-        - 'close_fds' is taken to be the default
a3470f
-        - if child's stderr is chosen to be managed,
a3470f
-          register it with the error handler thread
a3470f
-        """
a3470f
-        self.args = args
a3470f
-        if 'close_fds' not in kw:
a3470f
-            kw['close_fds'] = True
a3470f
-        self.lock = threading.Lock()
a3470f
-        self.on_death_row = False
a3470f
-        self.elines = []
a3470f
-        try:
a3470f
-            sup(self, args, *a, **kw)
a3470f
-        except:
a3470f
-            ex = sys.exc_info()[1]
a3470f
-            if not isinstance(ex, OSError):
a3470f
-                raise
a3470f
-            raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
a3470f
-                              (args[0], errno.errorcode[ex.errno],
a3470f
-                               os.strerror(ex.errno)))
a3470f
-        if kw.get('stderr') == subprocess.PIPE:
a3470f
-            assert(getattr(self, 'errhandler', None))
a3470f
-            self.errstore[self] = []
a3470f
-
a3470f
-    def errlog(self):
a3470f
-        """make a log about child's failure event"""
a3470f
-        logging.error(lf("command returned error",
a3470f
-                         cmd=" ".join(self.args),
a3470f
-                         error=self.returncode))
a3470f
-        lp = ''
a3470f
-
a3470f
-        def logerr(l):
a3470f
-            logging.error(self.args[0] + "> " + l)
a3470f
-        for l in self.elines:
a3470f
-            ls = l.split('\n')
a3470f
-            ls[0] = lp + ls[0]
a3470f
-            lp = ls.pop()
a3470f
-            for ll in ls:
a3470f
-                logerr(ll)
a3470f
-        if lp:
a3470f
-            logerr(lp)
a3470f
-
a3470f
-    def errfail(self):
a3470f
-        """fail nicely if child did not terminate with success"""
a3470f
-        self.errlog()
a3470f
-        syncdutils.finalize(exval=1)
a3470f
-
a3470f
-    def terminate_geterr(self, fail_on_err=True):
a3470f
-        """kill child, finalize stderr harvesting (unregister
a3470f
-        from errhandler, set up .elines), fail on error if
a3470f
-        asked for
a3470f
-        """
a3470f
-        self.lock.acquire()
a3470f
-        try:
a3470f
-            self.on_death_row = True
a3470f
-        finally:
a3470f
-            self.lock.release()
a3470f
-        elines = self.errstore.pop(self)
a3470f
-        if self.poll() is None:
a3470f
-            self.terminate()
a3470f
-            if self.poll() is None:
a3470f
-                time.sleep(0.1)
a3470f
-                self.kill()
a3470f
-                self.wait()
a3470f
-        while True:
a3470f
-            if not select([self.stderr], [], [], 0.1)[0]:
a3470f
-                break
a3470f
-            b = os.read(self.stderr.fileno(), 1024)
a3470f
-            if b:
a3470f
-                elines.append(b)
a3470f
-            else:
a3470f
-                break
a3470f
-        self.stderr.close()
a3470f
-        self.elines = elines
a3470f
-        if fail_on_err and self.returncode != 0:
a3470f
-            self.errfail()
a3470f
-
a3470f
-
a3470f
 class Server(object):
a3470f
 
a3470f
     """singleton implemening those filesystem access primitives
a3470f
@@ -776,6 +623,31 @@ class Server(object):
a3470f
                 if isinstance(st, int):
a3470f
                     blob = entry_pack_mkdir(
a3470f
                         gfid, bname, e['mode'], e['uid'], e['gid'])
a3470f
+                else:
a3470f
+                    # If gfid of a directory exists on slave but path based
a3470f
+                    # create is getting EEXIST. This means the directory is
a3470f
+                    # renamed in master but recorded as MKDIR during hybrid
a3470f
+                    # crawl. Get the directory path by reading the backend
a3470f
+                    # symlink and trying to rename to new name as said by
a3470f
+                    # master.
a3470f
+                    global slv_bricks
a3470f
+                    global slv_volume
a3470f
+                    global slv_host
a3470f
+                    if not slv_bricks:
a3470f
+                        slv_info = Volinfo (slv_volume, slv_host)
a3470f
+                        slv_bricks = slv_info.bricks
a3470f
+                    # Result of readlink would be of format as below.
a3470f
+                    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
a3470f
+                    realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
a3470f
+                                                        ".glusterfs", gfid[0:2],
a3470f
+                                                        gfid[2:4], gfid))
a3470f
+                    realpath_parts = realpath.split('/')
a3470f
+                    src_pargfid = realpath_parts[-2]
a3470f
+                    src_basename = realpath_parts[-1]
a3470f
+                    src_entry = os.path.join(pfx, src_pargfid, src_basename)
a3470f
+                    logging.info(lf("Special case: rename on mkdir",
a3470f
+                                   gfid=gfid, entry=repr(entry)))
a3470f
+                    rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
a3470f
             elif op == 'LINK':
a3470f
                 slink = os.path.join(pfx, gfid)
a3470f
                 st = lstat(slink)
a3470f
@@ -1318,6 +1190,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
a3470f
     def __init__(self, path):
a3470f
         self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
a3470f
 
a3470f
+        global slv_volume
a3470f
+        global slv_host
a3470f
+        slv_volume = self.volume
a3470f
+        slv_host = self.host
a3470f
+
a3470f
     def canonical_path(self):
a3470f
         return ':'.join([gethostbyname(self.host), self.volume])
a3470f
 
a3470f
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
a3470f
index 2b57f83..a493c37 100644
a3470f
--- a/geo-replication/syncdaemon/syncdutils.py
a3470f
+++ b/geo-replication/syncdaemon/syncdutils.py
a3470f
@@ -16,13 +16,18 @@ import fcntl
a3470f
 import shutil
a3470f
 import logging
a3470f
 import socket
a3470f
+import errno
a3470f
+import threading
a3470f
 import subprocess
a3470f
+from subprocess import PIPE
a3470f
 from threading import Lock, Thread as baseThread
a3470f
 from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
a3470f
 from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
a3470f
 from signal import signal, SIGTERM
a3470f
 import select as oselect
a3470f
 from os import waitpid as owaitpid
a3470f
+import xml.etree.ElementTree as XET
a3470f
+from select import error as SelectError
a3470f
 
a3470f
 from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
a3470f
 sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
a3470f
@@ -76,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
a3470f
 PERCENTAGE_ESCAPE_CHAR = "%25"
a3470f
 
a3470f
 
a3470f
+def sup(x, *a, **kw):
a3470f
+    """a rubyesque "super" for python ;)
a3470f
+
a3470f
+    invoke caller method in parent class with given args.
a3470f
+    """
a3470f
+    return getattr(super(type(x), x),
a3470f
+                   sys._getframe(1).f_code.co_name)(*a, **kw)
a3470f
+
a3470f
+
a3470f
 def escape(s):
a3470f
     """the chosen flavor of string escaping, used all over
a3470f
        to turn whatever data to creatable representation"""
a3470f
@@ -650,3 +664,226 @@ def lf(event, **kwargs):
a3470f
     for k, v in kwargs.items():
a3470f
         msg += "\t{0}={1}".format(k, v)
a3470f
     return msg
a3470f
+
a3470f
+
a3470f
+class Popen(subprocess.Popen):
a3470f
+
a3470f
+    """customized subclass of subprocess.Popen with a ring
a3470f
+    buffer for children error output"""
a3470f
+
a3470f
+    @classmethod
a3470f
+    def init_errhandler(cls):
a3470f
+        """start the thread which handles children's error output"""
a3470f
+        cls.errstore = {}
a3470f
+
a3470f
+        def tailer():
a3470f
+            while True:
a3470f
+                errstore = cls.errstore.copy()
a3470f
+                try:
a3470f
+                    poe, _, _ = select(
a3470f
+                        [po.stderr for po in errstore], [], [], 1)
a3470f
+                except (ValueError, SelectError):
a3470f
+                    # stderr is already closed wait for some time before
a3470f
+                    # checking next error
a3470f
+                    time.sleep(0.5)
a3470f
+                    continue
a3470f
+                for po in errstore:
a3470f
+                    if po.stderr not in poe:
a3470f
+                        continue
a3470f
+                    po.lock.acquire()
a3470f
+                    try:
a3470f
+                        if po.on_death_row:
a3470f
+                            continue
a3470f
+                        la = errstore[po]
a3470f
+                        try:
a3470f
+                            fd = po.stderr.fileno()
a3470f
+                        except ValueError:  # file is already closed
a3470f
+                            time.sleep(0.5)
a3470f
+                            continue
a3470f
+
a3470f
+                        try:
a3470f
+                            l = os.read(fd, 1024)
a3470f
+                        except OSError:
a3470f
+                            time.sleep(0.5)
a3470f
+                            continue
a3470f
+
a3470f
+                        if not l:
a3470f
+                            continue
a3470f
+                        tots = len(l)
a3470f
+                        for lx in la:
a3470f
+                            tots += len(lx)
a3470f
+                        while tots > 1 << 20 and la:
a3470f
+                            tots -= len(la.pop(0))
a3470f
+                        la.append(l)
a3470f
+                    finally:
a3470f
+                        po.lock.release()
a3470f
+        t = Thread(target=tailer)
a3470f
+        t.start()
a3470f
+        cls.errhandler = t
a3470f
+
a3470f
+    @classmethod
a3470f
+    def fork(cls):
a3470f
+        """fork wrapper that restarts errhandler thread in child"""
a3470f
+        pid = os.fork()
a3470f
+        if not pid:
a3470f
+            cls.init_errhandler()
a3470f
+        return pid
a3470f
+
a3470f
+    def __init__(self, args, *a, **kw):
a3470f
+        """customizations for subprocess.Popen instantiation
a3470f
+
a3470f
+        - 'close_fds' is taken to be the default
a3470f
+        - if child's stderr is chosen to be managed,
a3470f
+          register it with the error handler thread
a3470f
+        """
a3470f
+        self.args = args
a3470f
+        if 'close_fds' not in kw:
a3470f
+            kw['close_fds'] = True
a3470f
+        self.lock = threading.Lock()
a3470f
+        self.on_death_row = False
a3470f
+        self.elines = []
a3470f
+        try:
a3470f
+            sup(self, args, *a, **kw)
a3470f
+        except:
a3470f
+            ex = sys.exc_info()[1]
a3470f
+            if not isinstance(ex, OSError):
a3470f
+                raise
a3470f
+            raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
a3470f
+                              (args[0], errno.errorcode[ex.errno],
a3470f
+                               os.strerror(ex.errno)))
a3470f
+        if kw.get('stderr') == subprocess.PIPE:
a3470f
+            assert(getattr(self, 'errhandler', None))
a3470f
+            self.errstore[self] = []
a3470f
+
a3470f
+    def errlog(self):
a3470f
+        """make a log about child's failure event"""
a3470f
+        logging.error(lf("command returned error",
a3470f
+                         cmd=" ".join(self.args),
a3470f
+                         error=self.returncode))
a3470f
+        lp = ''
a3470f
+
a3470f
+        def logerr(l):
a3470f
+            logging.error(self.args[0] + "> " + l)
a3470f
+        for l in self.elines:
a3470f
+            ls = l.split('\n')
a3470f
+            ls[0] = lp + ls[0]
a3470f
+            lp = ls.pop()
a3470f
+            for ll in ls:
a3470f
+                logerr(ll)
a3470f
+        if lp:
a3470f
+            logerr(lp)
a3470f
+
a3470f
+    def errfail(self):
a3470f
+        """fail nicely if child did not terminate with success"""
a3470f
+        self.errlog()
a3470f
+        finalize(exval=1)
a3470f
+
a3470f
+    def terminate_geterr(self, fail_on_err=True):
a3470f
+        """kill child, finalize stderr harvesting (unregister
a3470f
+        from errhandler, set up .elines), fail on error if
a3470f
+        asked for
a3470f
+        """
a3470f
+        self.lock.acquire()
a3470f
+        try:
a3470f
+            self.on_death_row = True
a3470f
+        finally:
a3470f
+            self.lock.release()
a3470f
+        elines = self.errstore.pop(self)
a3470f
+        if self.poll() is None:
a3470f
+            self.terminate()
a3470f
+            if self.poll() is None:
a3470f
+                time.sleep(0.1)
a3470f
+                self.kill()
a3470f
+                self.wait()
a3470f
+        while True:
a3470f
+            if not select([self.stderr], [], [], 0.1)[0]:
a3470f
+                break
a3470f
+            b = os.read(self.stderr.fileno(), 1024)
a3470f
+            if b:
a3470f
+                elines.append(b)
a3470f
+            else:
a3470f
+                break
a3470f
+        self.stderr.close()
a3470f
+        self.elines = elines
a3470f
+        if fail_on_err and self.returncode != 0:
a3470f
+            self.errfail()
a3470f
+
a3470f
+
a3470f
+class Volinfo(object):
a3470f
+
a3470f
+    def __init__(self, vol, host='localhost', prelude=[]):
a3470f
+        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
a3470f
+                              'volume', 'info', vol],
a3470f
+                   stdout=PIPE, stderr=PIPE)
a3470f
+        vix = po.stdout.read()
a3470f
+        po.wait()
a3470f
+        po.terminate_geterr()
a3470f
+        vi = XET.fromstring(vix)
a3470f
+        if vi.find('opRet').text != '0':
a3470f
+            if prelude:
a3470f
+                via = '(via %s) ' % prelude.join(' ')
a3470f
+            else:
a3470f
+                via = ' '
a3470f
+            raise GsyncdError('getting volume info of %s%s '
a3470f
+                              'failed with errorcode %s' %
a3470f
+                              (vol, via, vi.find('opErrno').text))
a3470f
+        self.tree = vi
a3470f
+        self.volume = vol
a3470f
+        self.host = host
a3470f
+
a3470f
+    def get(self, elem):
a3470f
+        return self.tree.findall('.//' + elem)
a3470f
+
a3470f
+    def is_tier(self):
a3470f
+        return (self.get('typeStr')[0].text == 'Tier')
a3470f
+
a3470f
+    def is_hot(self, brickpath):
a3470f
+        logging.debug('brickpath: ' + repr(brickpath))
a3470f
+        return brickpath in self.hot_bricks
a3470f
+
a3470f
+    @property
a3470f
+    @memoize
a3470f
+    def bricks(self):
a3470f
+        def bparse(b):
a3470f
+            host, dirp = b.find("name").text.split(':', 2)
a3470f
+            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
a3470f
+        return [bparse(b) for b in self.get('brick')]
a3470f
+
a3470f
+    @property
a3470f
+    @memoize
a3470f
+    def uuid(self):
a3470f
+        ids = self.get('id')
a3470f
+        if len(ids) != 1:
a3470f
+            raise GsyncdError("volume info of %s obtained from %s: "
a3470f
+                              "ambiguous uuid" % (self.volume, self.host))
a3470f
+        return ids[0].text
a3470f
+
a3470f
+    def replica_count(self, tier, hot):
a3470f
+        if (tier and hot):
a3470f
+            return int(self.get('hotBricks/hotreplicaCount')[0].text)
a3470f
+        elif (tier and not hot):
a3470f
+            return int(self.get('coldBricks/coldreplicaCount')[0].text)
a3470f
+        else:
a3470f
+            return int(self.get('replicaCount')[0].text)
a3470f
+
a3470f
+    def disperse_count(self, tier, hot):
a3470f
+        if (tier and hot):
a3470f
+            # Tiering doesn't support disperse volume as hot brick,
a3470f
+            # hence no xml output, so returning 0. In case, if it's
a3470f
+            # supported later, we should change here.
a3470f
+            return 0
a3470f
+        elif (tier and not hot):
a3470f
+            return int(self.get('coldBricks/colddisperseCount')[0].text)
a3470f
+        else:
a3470f
+            return int(self.get('disperseCount')[0].text)
a3470f
+
a3470f
+    @property
a3470f
+    @memoize
a3470f
+    def hot_bricks(self):
a3470f
+        return [b.text for b in self.get('hotBricks/brick')]
a3470f
+
a3470f
+    def get_hot_bricks_count(self, tier):
a3470f
+        if (tier):
a3470f
+            return int(self.get('hotBricks/hotbrickCount')[0].text)
a3470f
+        else:
a3470f
+            return 0
a3470f
-- 
a3470f
1.8.3.1
a3470f