e7a346
From d354eb1abb2160495e205c87e1b2ecd8778c70ed Mon Sep 17 00:00:00 2001
e7a346
From: Kotresh HR <khiremat@redhat.com>
e7a346
Date: Thu, 21 Sep 2017 18:11:15 -0400
e7a346
Subject: [PATCH 292/305] geo-rep: Fix rename of directory in hybrid crawl
e7a346
e7a346
In hybrid crawl, renames and unlink can't be
e7a346
synced but directory renames can be detected.
e7a346
While syncing the directory on slave, if the
e7a346
gfid already exists, it should be rename.
e7a346
Hence if directory gfid already exists, rename
e7a346
it.
e7a346
e7a346
Backport of:
e7a346
 > Patch: https://review.gluster.org/18448
e7a346
 > Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
e7a346
 > BUG: 1499566
e7a346
 > Signed-off-by: Kotresh HR <khiremat@redhat.com>
e7a346
e7a346
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
e7a346
BUG: 1582417
e7a346
Signed-off-by: Kotresh HR <khiremat@redhat.com>
e7a346
Reviewed-on: https://code.engineering.redhat.com/gerrit/140285
e7a346
Tested-by: RHGS Build Bot <nigelb@redhat.com>
e7a346
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
e7a346
---
e7a346
 geo-replication/syncdaemon/gsyncd.py     |   4 +-
e7a346
 geo-replication/syncdaemon/monitor.py    |  85 +----------
e7a346
 geo-replication/syncdaemon/resource.py   | 191 +++++--------------------
e7a346
 geo-replication/syncdaemon/syncdutils.py | 237 +++++++++++++++++++++++++++++++
e7a346
 4 files changed, 276 insertions(+), 241 deletions(-)
e7a346
e7a346
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
e7a346
index 629e8b7..b0ed0ae 100644
e7a346
--- a/geo-replication/syncdaemon/gsyncd.py
e7a346
+++ b/geo-replication/syncdaemon/gsyncd.py
e7a346
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
e7a346
 from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
e7a346
 from libcxattr import Xattr
e7a346
 import struct
e7a346
-from syncdutils import get_master_and_slave_data_from_args, lf
e7a346
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
e7a346
 
e7a346
 ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
e7a346
 
e7a346
@@ -778,7 +778,7 @@ def main_i():
e7a346
     else:
e7a346
         gconf.label = 'slave'
e7a346
     startup(go_daemon=go_daemon, log_file=log_file, label=gconf.label)
e7a346
-    resource.Popen.init_errhandler()
e7a346
+    Popen.init_errhandler()
e7a346
 
e7a346
     if be_agent:
e7a346
         os.setsid()
e7a346
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
e7a346
index 0f43c4f..55f8330 100644
e7a346
--- a/geo-replication/syncdaemon/monitor.py
e7a346
+++ b/geo-replication/syncdaemon/monitor.py
e7a346
@@ -16,7 +16,7 @@ import logging
e7a346
 import uuid
e7a346
 import xml.etree.ElementTree as XET
e7a346
 from subprocess import PIPE
e7a346
-from resource import Popen, FILE, GLUSTER, SSH
e7a346
+from resource import FILE, GLUSTER, SSH
e7a346
 from threading import Lock
e7a346
 from errno import ECHILD, ESRCH
e7a346
 import re
e7a346
@@ -24,8 +24,9 @@ import random
e7a346
 from gconf import gconf
e7a346
 from syncdutils import select, waitpid, errno_wrap, lf
e7a346
 from syncdutils import set_term_handler, is_host_local, GsyncdError
e7a346
-from syncdutils import escape, Thread, finalize, memoize, boolify
e7a346
+from syncdutils import escape, Thread, finalize, boolify
e7a346
 from syncdutils import gf_event, EVENT_GEOREP_FAULTY
e7a346
+from syncdutils import Volinfo, Popen
e7a346
 
e7a346
 from gsyncdstatus import GeorepStatus, set_monitor_status
e7a346
 
e7a346
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
e7a346
     return list(up_hosts)
e7a346
 
e7a346
 
e7a346
-class Volinfo(object):
e7a346
-
e7a346
-    def __init__(self, vol, host='localhost', prelude=[]):
e7a346
-        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
e7a346
-                              'volume', 'info', vol],
e7a346
-                   stdout=PIPE, stderr=PIPE)
e7a346
-        vix = po.stdout.read()
e7a346
-        po.wait()
e7a346
-        po.terminate_geterr()
e7a346
-        vi = XET.fromstring(vix)
e7a346
-        if vi.find('opRet').text != '0':
e7a346
-            if prelude:
e7a346
-                via = '(via %s) ' % prelude.join(' ')
e7a346
-            else:
e7a346
-                via = ' '
e7a346
-            raise GsyncdError('getting volume info of %s%s '
e7a346
-                              'failed with errorcode %s' %
e7a346
-                              (vol, via, vi.find('opErrno').text))
e7a346
-        self.tree = vi
e7a346
-        self.volume = vol
e7a346
-        self.host = host
e7a346
-
e7a346
-    def get(self, elem):
e7a346
-        return self.tree.findall('.//' + elem)
e7a346
-
e7a346
-    def is_tier(self):
e7a346
-        return (self.get('typeStr')[0].text == 'Tier')
e7a346
-
e7a346
-    def is_hot(self, brickpath):
e7a346
-        logging.debug('brickpath: ' + repr(brickpath))
e7a346
-        return brickpath in self.hot_bricks
e7a346
-
e7a346
-    @property
e7a346
-    @memoize
e7a346
-    def bricks(self):
e7a346
-        def bparse(b):
e7a346
-            host, dirp = b.find("name").text.split(':', 2)
e7a346
-            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
e7a346
-        return [bparse(b) for b in self.get('brick')]
e7a346
-
e7a346
-    @property
e7a346
-    @memoize
e7a346
-    def uuid(self):
e7a346
-        ids = self.get('id')
e7a346
-        if len(ids) != 1:
e7a346
-            raise GsyncdError("volume info of %s obtained from %s: "
e7a346
-                              "ambiguous uuid" % (self.volume, self.host))
e7a346
-        return ids[0].text
e7a346
-
e7a346
-    def replica_count(self, tier, hot):
e7a346
-        if (tier and hot):
e7a346
-            return int(self.get('hotBricks/hotreplicaCount')[0].text)
e7a346
-        elif (tier and not hot):
e7a346
-            return int(self.get('coldBricks/coldreplicaCount')[0].text)
e7a346
-        else:
e7a346
-            return int(self.get('replicaCount')[0].text)
e7a346
-
e7a346
-    def disperse_count(self, tier, hot):
e7a346
-        if (tier and hot):
e7a346
-            # Tiering doesn't support disperse volume as hot brick,
e7a346
-            # hence no xml output, so returning 0. In case, if it's
e7a346
-            # supported later, we should change here.
e7a346
-            return 0
e7a346
-        elif (tier and not hot):
e7a346
-            return int(self.get('coldBricks/colddisperseCount')[0].text)
e7a346
-        else:
e7a346
-            return int(self.get('disperseCount')[0].text)
e7a346
-
e7a346
-    @property
e7a346
-    @memoize
e7a346
-    def hot_bricks(self):
e7a346
-        return [b.text for b in self.get('hotBricks/brick')]
e7a346
-
e7a346
-    def get_hot_bricks_count(self, tier):
e7a346
-        if (tier):
e7a346
-            return int(self.get('hotBricks/hotbrickCount')[0].text)
e7a346
-        else:
e7a346
-            return 0
e7a346
-
e7a346
-
e7a346
 class Monitor(object):
e7a346
 
e7a346
     """class which spawns and manages gsyncd workers"""
e7a346
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
e7a346
index d6618c1..c4b5b53 100644
e7a346
--- a/geo-replication/syncdaemon/resource.py
e7a346
+++ b/geo-replication/syncdaemon/resource.py
e7a346
@@ -13,21 +13,16 @@ import os
e7a346
 import sys
e7a346
 import stat
e7a346
 import time
e7a346
-import signal
e7a346
 import fcntl
e7a346
-import errno
e7a346
 import types
e7a346
 import struct
e7a346
 import socket
e7a346
 import logging
e7a346
 import tempfile
e7a346
-import threading
e7a346
 import subprocess
e7a346
 import errno
e7a346
 from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
e7a346
 from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
e7a346
-from select import error as SelectError
e7a346
-import shutil
e7a346
 
e7a346
 from gconf import gconf
e7a346
 import repce
e7a346
@@ -43,7 +38,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
e7a346
 from syncdutils import GX_GFID_CANONICAL_LEN
e7a346
 from gsyncdstatus import GeorepStatus
e7a346
 from syncdutils import get_master_and_slave_data_from_args
e7a346
-from syncdutils import lf
e7a346
+from syncdutils import lf, Popen, sup, Volinfo
e7a346
 from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
e7a346
 
e7a346
 UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
e7a346
@@ -52,14 +47,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
e7a346
 
e7a346
 ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
e7a346
 
e7a346
-def sup(x, *a, **kw):
e7a346
-    """a rubyesque "super" for python ;)
e7a346
-
e7a346
-    invoke caller method in parent class with given args.
e7a346
-    """
e7a346
-    return getattr(super(type(x), x),
e7a346
-                   sys._getframe(1).f_code.co_name)(*a, **kw)
e7a346
-
e7a346
+slv_volume = None
e7a346
+slv_host = None
e7a346
+slv_bricks = None
e7a346
 
e7a346
 def desugar(ustr):
e7a346
     """transform sugared url strings to standard <scheme>://<urlbody> form
e7a346
@@ -114,149 +104,6 @@ def parse_url(ustr):
e7a346
     return getattr(this, sch.upper())(path)
e7a346
 
e7a346
 
e7a346
-class Popen(subprocess.Popen):
e7a346
-
e7a346
-    """customized subclass of subprocess.Popen with a ring
e7a346
-    buffer for children error output"""
e7a346
-
e7a346
-    @classmethod
e7a346
-    def init_errhandler(cls):
e7a346
-        """start the thread which handles children's error output"""
e7a346
-        cls.errstore = {}
e7a346
-
e7a346
-        def tailer():
e7a346
-            while True:
e7a346
-                errstore = cls.errstore.copy()
e7a346
-                try:
e7a346
-                    poe, _, _ = select(
e7a346
-                        [po.stderr for po in errstore], [], [], 1)
e7a346
-                except (ValueError, SelectError):
e7a346
-                    # stderr is already closed wait for some time before
e7a346
-                    # checking next error
e7a346
-                    time.sleep(0.5)
e7a346
-                    continue
e7a346
-                for po in errstore:
e7a346
-                    if po.stderr not in poe:
e7a346
-                        continue
e7a346
-                    po.lock.acquire()
e7a346
-                    try:
e7a346
-                        if po.on_death_row:
e7a346
-                            continue
e7a346
-                        la = errstore[po]
e7a346
-                        try:
e7a346
-                            fd = po.stderr.fileno()
e7a346
-                        except ValueError:  # file is already closed
e7a346
-                            time.sleep(0.5)
e7a346
-                            continue
e7a346
-
e7a346
-                        try:
e7a346
-                            l = os.read(fd, 1024)
e7a346
-                        except OSError:
e7a346
-                            time.sleep(0.5)
e7a346
-                            continue
e7a346
-
e7a346
-                        if not l:
e7a346
-                            continue
e7a346
-                        tots = len(l)
e7a346
-                        for lx in la:
e7a346
-                            tots += len(lx)
e7a346
-                        while tots > 1 << 20 and la:
e7a346
-                            tots -= len(la.pop(0))
e7a346
-                        la.append(l)
e7a346
-                    finally:
e7a346
-                        po.lock.release()
e7a346
-        t = syncdutils.Thread(target=tailer)
e7a346
-        t.start()
e7a346
-        cls.errhandler = t
e7a346
-
e7a346
-    @classmethod
e7a346
-    def fork(cls):
e7a346
-        """fork wrapper that restarts errhandler thread in child"""
e7a346
-        pid = os.fork()
e7a346
-        if not pid:
e7a346
-            cls.init_errhandler()
e7a346
-        return pid
e7a346
-
e7a346
-    def __init__(self, args, *a, **kw):
e7a346
-        """customizations for subprocess.Popen instantiation
e7a346
-
e7a346
-        - 'close_fds' is taken to be the default
e7a346
-        - if child's stderr is chosen to be managed,
e7a346
-          register it with the error handler thread
e7a346
-        """
e7a346
-        self.args = args
e7a346
-        if 'close_fds' not in kw:
e7a346
-            kw['close_fds'] = True
e7a346
-        self.lock = threading.Lock()
e7a346
-        self.on_death_row = False
e7a346
-        self.elines = []
e7a346
-        try:
e7a346
-            sup(self, args, *a, **kw)
e7a346
-        except:
e7a346
-            ex = sys.exc_info()[1]
e7a346
-            if not isinstance(ex, OSError):
e7a346
-                raise
e7a346
-            raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
e7a346
-                              (args[0], errno.errorcode[ex.errno],
e7a346
-                               os.strerror(ex.errno)))
e7a346
-        if kw.get('stderr') == subprocess.PIPE:
e7a346
-            assert(getattr(self, 'errhandler', None))
e7a346
-            self.errstore[self] = []
e7a346
-
e7a346
-    def errlog(self):
e7a346
-        """make a log about child's failure event"""
e7a346
-        logging.error(lf("command returned error",
e7a346
-                         cmd=" ".join(self.args),
e7a346
-                         error=self.returncode))
e7a346
-        lp = ''
e7a346
-
e7a346
-        def logerr(l):
e7a346
-            logging.error(self.args[0] + "> " + l)
e7a346
-        for l in self.elines:
e7a346
-            ls = l.split('\n')
e7a346
-            ls[0] = lp + ls[0]
e7a346
-            lp = ls.pop()
e7a346
-            for ll in ls:
e7a346
-                logerr(ll)
e7a346
-        if lp:
e7a346
-            logerr(lp)
e7a346
-
e7a346
-    def errfail(self):
e7a346
-        """fail nicely if child did not terminate with success"""
e7a346
-        self.errlog()
e7a346
-        syncdutils.finalize(exval=1)
e7a346
-
e7a346
-    def terminate_geterr(self, fail_on_err=True):
e7a346
-        """kill child, finalize stderr harvesting (unregister
e7a346
-        from errhandler, set up .elines), fail on error if
e7a346
-        asked for
e7a346
-        """
e7a346
-        self.lock.acquire()
e7a346
-        try:
e7a346
-            self.on_death_row = True
e7a346
-        finally:
e7a346
-            self.lock.release()
e7a346
-        elines = self.errstore.pop(self)
e7a346
-        if self.poll() is None:
e7a346
-            self.terminate()
e7a346
-            if self.poll() is None:
e7a346
-                time.sleep(0.1)
e7a346
-                self.kill()
e7a346
-                self.wait()
e7a346
-        while True:
e7a346
-            if not select([self.stderr], [], [], 0.1)[0]:
e7a346
-                break
e7a346
-            b = os.read(self.stderr.fileno(), 1024)
e7a346
-            if b:
e7a346
-                elines.append(b)
e7a346
-            else:
e7a346
-                break
e7a346
-        self.stderr.close()
e7a346
-        self.elines = elines
e7a346
-        if fail_on_err and self.returncode != 0:
e7a346
-            self.errfail()
e7a346
-
e7a346
-
e7a346
 class Server(object):
e7a346
 
e7a346
     """singleton implemening those filesystem access primitives
e7a346
@@ -776,6 +623,31 @@ class Server(object):
e7a346
                 if isinstance(st, int):
e7a346
                     blob = entry_pack_mkdir(
e7a346
                         gfid, bname, e['mode'], e['uid'], e['gid'])
e7a346
+                else:
e7a346
+                    # If gfid of a directory exists on slave but path based
e7a346
+                    # create is getting EEXIST. This means the directory is
e7a346
+                    # renamed in master but recorded as MKDIR during hybrid
e7a346
+                    # crawl. Get the directory path by reading the backend
e7a346
+                    # symlink and trying to rename to new name as said by
e7a346
+                    # master.
e7a346
+                    global slv_bricks
e7a346
+                    global slv_volume
e7a346
+                    global slv_host
e7a346
+                    if not slv_bricks:
e7a346
+                        slv_info = Volinfo (slv_volume, slv_host)
e7a346
+                        slv_bricks = slv_info.bricks
e7a346
+                    # Result of readlink would be of format as below.
e7a346
+                    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
e7a346
+                    realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
e7a346
+                                                        ".glusterfs", gfid[0:2],
e7a346
+                                                        gfid[2:4], gfid))
e7a346
+                    realpath_parts = realpath.split('/')
e7a346
+                    src_pargfid = realpath_parts[-2]
e7a346
+                    src_basename = realpath_parts[-1]
e7a346
+                    src_entry = os.path.join(pfx, src_pargfid, src_basename)
e7a346
+                    logging.info(lf("Special case: rename on mkdir",
e7a346
+                                   gfid=gfid, entry=repr(entry)))
e7a346
+                    rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
e7a346
             elif op == 'LINK':
e7a346
                 slink = os.path.join(pfx, gfid)
e7a346
                 st = lstat(slink)
e7a346
@@ -1318,6 +1190,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
e7a346
     def __init__(self, path):
e7a346
         self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
e7a346
 
e7a346
+        global slv_volume
e7a346
+        global slv_host
e7a346
+        slv_volume = self.volume
e7a346
+        slv_host = self.host
e7a346
+
e7a346
     def canonical_path(self):
e7a346
         return ':'.join([gethostbyname(self.host), self.volume])
e7a346
 
e7a346
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
e7a346
index 2b57f83..a493c37 100644
e7a346
--- a/geo-replication/syncdaemon/syncdutils.py
e7a346
+++ b/geo-replication/syncdaemon/syncdutils.py
e7a346
@@ -16,13 +16,18 @@ import fcntl
e7a346
 import shutil
e7a346
 import logging
e7a346
 import socket
e7a346
+import errno
e7a346
+import threading
e7a346
 import subprocess
e7a346
+from subprocess import PIPE
e7a346
 from threading import Lock, Thread as baseThread
e7a346
 from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
e7a346
 from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
e7a346
 from signal import signal, SIGTERM
e7a346
 import select as oselect
e7a346
 from os import waitpid as owaitpid
e7a346
+import xml.etree.ElementTree as XET
e7a346
+from select import error as SelectError
e7a346
 
e7a346
 from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
e7a346
 sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
e7a346
@@ -76,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
e7a346
 PERCENTAGE_ESCAPE_CHAR = "%25"
e7a346
 
e7a346
 
e7a346
+def sup(x, *a, **kw):
e7a346
+    """a rubyesque "super" for python ;)
e7a346
+
e7a346
+    invoke caller method in parent class with given args.
e7a346
+    """
e7a346
+    return getattr(super(type(x), x),
e7a346
+                   sys._getframe(1).f_code.co_name)(*a, **kw)
e7a346
+
e7a346
+
e7a346
 def escape(s):
e7a346
     """the chosen flavor of string escaping, used all over
e7a346
        to turn whatever data to creatable representation"""
e7a346
@@ -650,3 +664,226 @@ def lf(event, **kwargs):
e7a346
     for k, v in kwargs.items():
e7a346
         msg += "\t{0}={1}".format(k, v)
e7a346
     return msg
e7a346
+
e7a346
+
e7a346
+class Popen(subprocess.Popen):
e7a346
+
e7a346
+    """customized subclass of subprocess.Popen with a ring
e7a346
+    buffer for children error output"""
e7a346
+
e7a346
+    @classmethod
e7a346
+    def init_errhandler(cls):
e7a346
+        """start the thread which handles children's error output"""
e7a346
+        cls.errstore = {}
e7a346
+
e7a346
+        def tailer():
e7a346
+            while True:
e7a346
+                errstore = cls.errstore.copy()
e7a346
+                try:
e7a346
+                    poe, _, _ = select(
e7a346
+                        [po.stderr for po in errstore], [], [], 1)
e7a346
+                except (ValueError, SelectError):
e7a346
+                    # stderr is already closed wait for some time before
e7a346
+                    # checking next error
e7a346
+                    time.sleep(0.5)
e7a346
+                    continue
e7a346
+                for po in errstore:
e7a346
+                    if po.stderr not in poe:
e7a346
+                        continue
e7a346
+                    po.lock.acquire()
e7a346
+                    try:
e7a346
+                        if po.on_death_row:
e7a346
+                            continue
e7a346
+                        la = errstore[po]
e7a346
+                        try:
e7a346
+                            fd = po.stderr.fileno()
e7a346
+                        except ValueError:  # file is already closed
e7a346
+                            time.sleep(0.5)
e7a346
+                            continue
e7a346
+
e7a346
+                        try:
e7a346
+                            l = os.read(fd, 1024)
e7a346
+                        except OSError:
e7a346
+                            time.sleep(0.5)
e7a346
+                            continue
e7a346
+
e7a346
+                        if not l:
e7a346
+                            continue
e7a346
+                        tots = len(l)
e7a346
+                        for lx in la:
e7a346
+                            tots += len(lx)
e7a346
+                        while tots > 1 << 20 and la:
e7a346
+                            tots -= len(la.pop(0))
e7a346
+                        la.append(l)
e7a346
+                    finally:
e7a346
+                        po.lock.release()
e7a346
+        t = Thread(target=tailer)
e7a346
+        t.start()
e7a346
+        cls.errhandler = t
e7a346
+
e7a346
+    @classmethod
e7a346
+    def fork(cls):
e7a346
+        """fork wrapper that restarts errhandler thread in child"""
e7a346
+        pid = os.fork()
e7a346
+        if not pid:
e7a346
+            cls.init_errhandler()
e7a346
+        return pid
e7a346
+
e7a346
+    def __init__(self, args, *a, **kw):
e7a346
+        """customizations for subprocess.Popen instantiation
e7a346
+
e7a346
+        - 'close_fds' is taken to be the default
e7a346
+        - if child's stderr is chosen to be managed,
e7a346
+          register it with the error handler thread
e7a346
+        """
e7a346
+        self.args = args
e7a346
+        if 'close_fds' not in kw:
e7a346
+            kw['close_fds'] = True
e7a346
+        self.lock = threading.Lock()
e7a346
+        self.on_death_row = False
e7a346
+        self.elines = []
e7a346
+        try:
e7a346
+            sup(self, args, *a, **kw)
e7a346
+        except:
e7a346
+            ex = sys.exc_info()[1]
e7a346
+            if not isinstance(ex, OSError):
e7a346
+                raise
e7a346
+            raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
e7a346
+                              (args[0], errno.errorcode[ex.errno],
e7a346
+                               os.strerror(ex.errno)))
e7a346
+        if kw.get('stderr') == subprocess.PIPE:
e7a346
+            assert(getattr(self, 'errhandler', None))
e7a346
+            self.errstore[self] = []
e7a346
+
e7a346
+    def errlog(self):
e7a346
+        """make a log about child's failure event"""
e7a346
+        logging.error(lf("command returned error",
e7a346
+                         cmd=" ".join(self.args),
e7a346
+                         error=self.returncode))
e7a346
+        lp = ''
e7a346
+
e7a346
+        def logerr(l):
e7a346
+            logging.error(self.args[0] + "> " + l)
e7a346
+        for l in self.elines:
e7a346
+            ls = l.split('\n')
e7a346
+            ls[0] = lp + ls[0]
e7a346
+            lp = ls.pop()
e7a346
+            for ll in ls:
e7a346
+                logerr(ll)
e7a346
+        if lp:
e7a346
+            logerr(lp)
e7a346
+
e7a346
+    def errfail(self):
e7a346
+        """fail nicely if child did not terminate with success"""
e7a346
+        self.errlog()
e7a346
+        finalize(exval=1)
e7a346
+
e7a346
+    def terminate_geterr(self, fail_on_err=True):
e7a346
+        """kill child, finalize stderr harvesting (unregister
e7a346
+        from errhandler, set up .elines), fail on error if
e7a346
+        asked for
e7a346
+        """
e7a346
+        self.lock.acquire()
e7a346
+        try:
e7a346
+            self.on_death_row = True
e7a346
+        finally:
e7a346
+            self.lock.release()
e7a346
+        elines = self.errstore.pop(self)
e7a346
+        if self.poll() is None:
e7a346
+            self.terminate()
e7a346
+            if self.poll() is None:
e7a346
+                time.sleep(0.1)
e7a346
+                self.kill()
e7a346
+                self.wait()
e7a346
+        while True:
e7a346
+            if not select([self.stderr], [], [], 0.1)[0]:
e7a346
+                break
e7a346
+            b = os.read(self.stderr.fileno(), 1024)
e7a346
+            if b:
e7a346
+                elines.append(b)
e7a346
+            else:
e7a346
+                break
e7a346
+        self.stderr.close()
e7a346
+        self.elines = elines
e7a346
+        if fail_on_err and self.returncode != 0:
e7a346
+            self.errfail()
e7a346
+
e7a346
+
e7a346
+class Volinfo(object):
e7a346
+
e7a346
+    def __init__(self, vol, host='localhost', prelude=[]):
e7a346
+        po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
e7a346
+                              'volume', 'info', vol],
e7a346
+                   stdout=PIPE, stderr=PIPE)
e7a346
+        vix = po.stdout.read()
e7a346
+        po.wait()
e7a346
+        po.terminate_geterr()
e7a346
+        vi = XET.fromstring(vix)
e7a346
+        if vi.find('opRet').text != '0':
e7a346
+            if prelude:
e7a346
+                via = '(via %s) ' % prelude.join(' ')
e7a346
+            else:
e7a346
+                via = ' '
e7a346
+            raise GsyncdError('getting volume info of %s%s '
e7a346
+                              'failed with errorcode %s' %
e7a346
+                              (vol, via, vi.find('opErrno').text))
e7a346
+        self.tree = vi
e7a346
+        self.volume = vol
e7a346
+        self.host = host
e7a346
+
e7a346
+    def get(self, elem):
e7a346
+        return self.tree.findall('.//' + elem)
e7a346
+
e7a346
+    def is_tier(self):
e7a346
+        return (self.get('typeStr')[0].text == 'Tier')
e7a346
+
e7a346
+    def is_hot(self, brickpath):
e7a346
+        logging.debug('brickpath: ' + repr(brickpath))
e7a346
+        return brickpath in self.hot_bricks
e7a346
+
e7a346
+    @property
e7a346
+    @memoize
e7a346
+    def bricks(self):
e7a346
+        def bparse(b):
e7a346
+            host, dirp = b.find("name").text.split(':', 2)
e7a346
+            return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
e7a346
+        return [bparse(b) for b in self.get('brick')]
e7a346
+
e7a346
+    @property
e7a346
+    @memoize
e7a346
+    def uuid(self):
e7a346
+        ids = self.get('id')
e7a346
+        if len(ids) != 1:
e7a346
+            raise GsyncdError("volume info of %s obtained from %s: "
e7a346
+                              "ambiguous uuid" % (self.volume, self.host))
e7a346
+        return ids[0].text
e7a346
+
e7a346
+    def replica_count(self, tier, hot):
e7a346
+        if (tier and hot):
e7a346
+            return int(self.get('hotBricks/hotreplicaCount')[0].text)
e7a346
+        elif (tier and not hot):
e7a346
+            return int(self.get('coldBricks/coldreplicaCount')[0].text)
e7a346
+        else:
e7a346
+            return int(self.get('replicaCount')[0].text)
e7a346
+
e7a346
+    def disperse_count(self, tier, hot):
e7a346
+        if (tier and hot):
e7a346
+            # Tiering doesn't support disperse volume as hot brick,
e7a346
+            # hence no xml output, so returning 0. In case, if it's
e7a346
+            # supported later, we should change here.
e7a346
+            return 0
e7a346
+        elif (tier and not hot):
e7a346
+            return int(self.get('coldBricks/colddisperseCount')[0].text)
e7a346
+        else:
e7a346
+            return int(self.get('disperseCount')[0].text)
e7a346
+
e7a346
+    @property
e7a346
+    @memoize
e7a346
+    def hot_bricks(self):
e7a346
+        return [b.text for b in self.get('hotBricks/brick')]
e7a346
+
e7a346
+    def get_hot_bricks_count(self, tier):
e7a346
+        if (tier):
e7a346
+            return int(self.get('hotBricks/hotbrickCount')[0].text)
e7a346
+        else:
e7a346
+            return 0
e7a346
-- 
e7a346
1.8.3.1
e7a346