|
|
d1681e |
From d354eb1abb2160495e205c87e1b2ecd8778c70ed Mon Sep 17 00:00:00 2001
|
|
|
d1681e |
From: Kotresh HR <khiremat@redhat.com>
|
|
|
d1681e |
Date: Thu, 21 Sep 2017 18:11:15 -0400
|
|
|
d1681e |
Subject: [PATCH 292/305] geo-rep: Fix rename of directory in hybrid crawl
|
|
|
d1681e |
|
|
|
d1681e |
In hybrid crawl, renames and unlink can't be
|
|
|
d1681e |
synced but directory renames can be detected.
|
|
|
d1681e |
While syncing the directory on slave, if the
|
|
|
d1681e |
gfid already exists, it should be rename.
|
|
|
d1681e |
Hence if directory gfid already exists, rename
|
|
|
d1681e |
it.
|
|
|
d1681e |
|
|
|
d1681e |
Backport of:
|
|
|
d1681e |
> Patch: https://review.gluster.org/18448
|
|
|
d1681e |
> Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
|
|
|
d1681e |
> BUG: 1499566
|
|
|
d1681e |
> Signed-off-by: Kotresh HR <khiremat@redhat.com>
|
|
|
d1681e |
|
|
|
d1681e |
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
|
|
|
d1681e |
BUG: 1582417
|
|
|
d1681e |
Signed-off-by: Kotresh HR <khiremat@redhat.com>
|
|
|
d1681e |
Reviewed-on: https://code.engineering.redhat.com/gerrit/140285
|
|
|
d1681e |
Tested-by: RHGS Build Bot <nigelb@redhat.com>
|
|
|
d1681e |
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
|
|
|
d1681e |
---
|
|
|
d1681e |
geo-replication/syncdaemon/gsyncd.py | 4 +-
|
|
|
d1681e |
geo-replication/syncdaemon/monitor.py | 85 +----------
|
|
|
d1681e |
geo-replication/syncdaemon/resource.py | 191 +++++--------------------
|
|
|
d1681e |
geo-replication/syncdaemon/syncdutils.py | 237 +++++++++++++++++++++++++++++++
|
|
|
d1681e |
4 files changed, 276 insertions(+), 241 deletions(-)
|
|
|
d1681e |
|
|
|
d1681e |
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
|
|
|
d1681e |
index 629e8b7..b0ed0ae 100644
|
|
|
d1681e |
--- a/geo-replication/syncdaemon/gsyncd.py
|
|
|
d1681e |
+++ b/geo-replication/syncdaemon/gsyncd.py
|
|
|
d1681e |
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
|
|
|
d1681e |
from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
|
|
|
d1681e |
from libcxattr import Xattr
|
|
|
d1681e |
import struct
|
|
|
d1681e |
-from syncdutils import get_master_and_slave_data_from_args, lf
|
|
|
d1681e |
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
|
|
|
d1681e |
|
|
|
d1681e |
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
|
|
|
d1681e |
|
|
|
d1681e |
@@ -778,7 +778,7 @@ def main_i():
|
|
|
d1681e |
else:
|
|
|
d1681e |
gconf.label = 'slave'
|
|
|
d1681e |
startup(go_daemon=go_daemon, log_file=log_file, label=gconf.label)
|
|
|
d1681e |
- resource.Popen.init_errhandler()
|
|
|
d1681e |
+ Popen.init_errhandler()
|
|
|
d1681e |
|
|
|
d1681e |
if be_agent:
|
|
|
d1681e |
os.setsid()
|
|
|
d1681e |
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
|
|
|
d1681e |
index 0f43c4f..55f8330 100644
|
|
|
d1681e |
--- a/geo-replication/syncdaemon/monitor.py
|
|
|
d1681e |
+++ b/geo-replication/syncdaemon/monitor.py
|
|
|
d1681e |
@@ -16,7 +16,7 @@ import logging
|
|
|
d1681e |
import uuid
|
|
|
d1681e |
import xml.etree.ElementTree as XET
|
|
|
d1681e |
from subprocess import PIPE
|
|
|
d1681e |
-from resource import Popen, FILE, GLUSTER, SSH
|
|
|
d1681e |
+from resource import FILE, GLUSTER, SSH
|
|
|
d1681e |
from threading import Lock
|
|
|
d1681e |
from errno import ECHILD, ESRCH
|
|
|
d1681e |
import re
|
|
|
d1681e |
@@ -24,8 +24,9 @@ import random
|
|
|
d1681e |
from gconf import gconf
|
|
|
d1681e |
from syncdutils import select, waitpid, errno_wrap, lf
|
|
|
d1681e |
from syncdutils import set_term_handler, is_host_local, GsyncdError
|
|
|
d1681e |
-from syncdutils import escape, Thread, finalize, memoize, boolify
|
|
|
d1681e |
+from syncdutils import escape, Thread, finalize, boolify
|
|
|
d1681e |
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
|
|
|
d1681e |
+from syncdutils import Volinfo, Popen
|
|
|
d1681e |
|
|
|
d1681e |
from gsyncdstatus import GeorepStatus, set_monitor_status
|
|
|
d1681e |
|
|
|
d1681e |
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
|
|
|
d1681e |
return list(up_hosts)
|
|
|
d1681e |
|
|
|
d1681e |
|
|
|
d1681e |
-class Volinfo(object):
|
|
|
d1681e |
-
|
|
|
d1681e |
- def __init__(self, vol, host='localhost', prelude=[]):
|
|
|
d1681e |
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
|
|
|
d1681e |
- 'volume', 'info', vol],
|
|
|
d1681e |
- stdout=PIPE, stderr=PIPE)
|
|
|
d1681e |
- vix = po.stdout.read()
|
|
|
d1681e |
- po.wait()
|
|
|
d1681e |
- po.terminate_geterr()
|
|
|
d1681e |
- vi = XET.fromstring(vix)
|
|
|
d1681e |
- if vi.find('opRet').text != '0':
|
|
|
d1681e |
- if prelude:
|
|
|
d1681e |
- via = '(via %s) ' % prelude.join(' ')
|
|
|
d1681e |
- else:
|
|
|
d1681e |
- via = ' '
|
|
|
d1681e |
- raise GsyncdError('getting volume info of %s%s '
|
|
|
d1681e |
- 'failed with errorcode %s' %
|
|
|
d1681e |
- (vol, via, vi.find('opErrno').text))
|
|
|
d1681e |
- self.tree = vi
|
|
|
d1681e |
- self.volume = vol
|
|
|
d1681e |
- self.host = host
|
|
|
d1681e |
-
|
|
|
d1681e |
- def get(self, elem):
|
|
|
d1681e |
- return self.tree.findall('.//' + elem)
|
|
|
d1681e |
-
|
|
|
d1681e |
- def is_tier(self):
|
|
|
d1681e |
- return (self.get('typeStr')[0].text == 'Tier')
|
|
|
d1681e |
-
|
|
|
d1681e |
- def is_hot(self, brickpath):
|
|
|
d1681e |
- logging.debug('brickpath: ' + repr(brickpath))
|
|
|
d1681e |
- return brickpath in self.hot_bricks
|
|
|
d1681e |
-
|
|
|
d1681e |
- @property
|
|
|
d1681e |
- @memoize
|
|
|
d1681e |
- def bricks(self):
|
|
|
d1681e |
- def bparse(b):
|
|
|
d1681e |
- host, dirp = b.find("name").text.split(':', 2)
|
|
|
d1681e |
- return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
|
|
|
d1681e |
- return [bparse(b) for b in self.get('brick')]
|
|
|
d1681e |
-
|
|
|
d1681e |
- @property
|
|
|
d1681e |
- @memoize
|
|
|
d1681e |
- def uuid(self):
|
|
|
d1681e |
- ids = self.get('id')
|
|
|
d1681e |
- if len(ids) != 1:
|
|
|
d1681e |
- raise GsyncdError("volume info of %s obtained from %s: "
|
|
|
d1681e |
- "ambiguous uuid" % (self.volume, self.host))
|
|
|
d1681e |
- return ids[0].text
|
|
|
d1681e |
-
|
|
|
d1681e |
- def replica_count(self, tier, hot):
|
|
|
d1681e |
- if (tier and hot):
|
|
|
d1681e |
- return int(self.get('hotBricks/hotreplicaCount')[0].text)
|
|
|
d1681e |
- elif (tier and not hot):
|
|
|
d1681e |
- return int(self.get('coldBricks/coldreplicaCount')[0].text)
|
|
|
d1681e |
- else:
|
|
|
d1681e |
- return int(self.get('replicaCount')[0].text)
|
|
|
d1681e |
-
|
|
|
d1681e |
- def disperse_count(self, tier, hot):
|
|
|
d1681e |
- if (tier and hot):
|
|
|
d1681e |
- # Tiering doesn't support disperse volume as hot brick,
|
|
|
d1681e |
- # hence no xml output, so returning 0. In case, if it's
|
|
|
d1681e |
- # supported later, we should change here.
|
|
|
d1681e |
- return 0
|
|
|
d1681e |
- elif (tier and not hot):
|
|
|
d1681e |
- return int(self.get('coldBricks/colddisperseCount')[0].text)
|
|
|
d1681e |
- else:
|
|
|
d1681e |
- return int(self.get('disperseCount')[0].text)
|
|
|
d1681e |
-
|
|
|
d1681e |
- @property
|
|
|
d1681e |
- @memoize
|
|
|
d1681e |
- def hot_bricks(self):
|
|
|
d1681e |
- return [b.text for b in self.get('hotBricks/brick')]
|
|
|
d1681e |
-
|
|
|
d1681e |
- def get_hot_bricks_count(self, tier):
|
|
|
d1681e |
- if (tier):
|
|
|
d1681e |
- return int(self.get('hotBricks/hotbrickCount')[0].text)
|
|
|
d1681e |
- else:
|
|
|
d1681e |
- return 0
|
|
|
d1681e |
-
|
|
|
d1681e |
-
|
|
|
d1681e |
class Monitor(object):
|
|
|
d1681e |
|
|
|
d1681e |
"""class which spawns and manages gsyncd workers"""
|
|
|
d1681e |
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
|
|
|
d1681e |
index d6618c1..c4b5b53 100644
|
|
|
d1681e |
--- a/geo-replication/syncdaemon/resource.py
|
|
|
d1681e |
+++ b/geo-replication/syncdaemon/resource.py
|
|
|
d1681e |
@@ -13,21 +13,16 @@ import os
|
|
|
d1681e |
import sys
|
|
|
d1681e |
import stat
|
|
|
d1681e |
import time
|
|
|
d1681e |
-import signal
|
|
|
d1681e |
import fcntl
|
|
|
d1681e |
-import errno
|
|
|
d1681e |
import types
|
|
|
d1681e |
import struct
|
|
|
d1681e |
import socket
|
|
|
d1681e |
import logging
|
|
|
d1681e |
import tempfile
|
|
|
d1681e |
-import threading
|
|
|
d1681e |
import subprocess
|
|
|
d1681e |
import errno
|
|
|
d1681e |
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
|
|
|
d1681e |
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
|
|
|
d1681e |
-from select import error as SelectError
|
|
|
d1681e |
-import shutil
|
|
|
d1681e |
|
|
|
d1681e |
from gconf import gconf
|
|
|
d1681e |
import repce
|
|
|
d1681e |
@@ -43,7 +38,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
|
|
|
d1681e |
from syncdutils import GX_GFID_CANONICAL_LEN
|
|
|
d1681e |
from gsyncdstatus import GeorepStatus
|
|
|
d1681e |
from syncdutils import get_master_and_slave_data_from_args
|
|
|
d1681e |
-from syncdutils import lf
|
|
|
d1681e |
+from syncdutils import lf, Popen, sup, Volinfo
|
|
|
d1681e |
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
|
|
|
d1681e |
|
|
|
d1681e |
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
|
|
|
d1681e |
@@ -52,14 +47,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
|
|
|
d1681e |
|
|
|
d1681e |
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
|
|
|
d1681e |
|
|
|
d1681e |
-def sup(x, *a, **kw):
|
|
|
d1681e |
- """a rubyesque "super" for python ;)
|
|
|
d1681e |
-
|
|
|
d1681e |
- invoke caller method in parent class with given args.
|
|
|
d1681e |
- """
|
|
|
d1681e |
- return getattr(super(type(x), x),
|
|
|
d1681e |
- sys._getframe(1).f_code.co_name)(*a, **kw)
|
|
|
d1681e |
-
|
|
|
d1681e |
+slv_volume = None
|
|
|
d1681e |
+slv_host = None
|
|
|
d1681e |
+slv_bricks = None
|
|
|
d1681e |
|
|
|
d1681e |
def desugar(ustr):
|
|
|
d1681e |
"""transform sugared url strings to standard <scheme>://<urlbody> form
|
|
|
d1681e |
@@ -114,149 +104,6 @@ def parse_url(ustr):
|
|
|
d1681e |
return getattr(this, sch.upper())(path)
|
|
|
d1681e |
|
|
|
d1681e |
|
|
|
d1681e |
-class Popen(subprocess.Popen):
|
|
|
d1681e |
-
|
|
|
d1681e |
- """customized subclass of subprocess.Popen with a ring
|
|
|
d1681e |
- buffer for children error output"""
|
|
|
d1681e |
-
|
|
|
d1681e |
- @classmethod
|
|
|
d1681e |
- def init_errhandler(cls):
|
|
|
d1681e |
- """start the thread which handles children's error output"""
|
|
|
d1681e |
- cls.errstore = {}
|
|
|
d1681e |
-
|
|
|
d1681e |
- def tailer():
|
|
|
d1681e |
- while True:
|
|
|
d1681e |
- errstore = cls.errstore.copy()
|
|
|
d1681e |
- try:
|
|
|
d1681e |
- poe, _, _ = select(
|
|
|
d1681e |
- [po.stderr for po in errstore], [], [], 1)
|
|
|
d1681e |
- except (ValueError, SelectError):
|
|
|
d1681e |
- # stderr is already closed wait for some time before
|
|
|
d1681e |
- # checking next error
|
|
|
d1681e |
- time.sleep(0.5)
|
|
|
d1681e |
- continue
|
|
|
d1681e |
- for po in errstore:
|
|
|
d1681e |
- if po.stderr not in poe:
|
|
|
d1681e |
- continue
|
|
|
d1681e |
- po.lock.acquire()
|
|
|
d1681e |
- try:
|
|
|
d1681e |
- if po.on_death_row:
|
|
|
d1681e |
- continue
|
|
|
d1681e |
- la = errstore[po]
|
|
|
d1681e |
- try:
|
|
|
d1681e |
- fd = po.stderr.fileno()
|
|
|
d1681e |
- except ValueError: # file is already closed
|
|
|
d1681e |
- time.sleep(0.5)
|
|
|
d1681e |
- continue
|
|
|
d1681e |
-
|
|
|
d1681e |
- try:
|
|
|
d1681e |
- l = os.read(fd, 1024)
|
|
|
d1681e |
- except OSError:
|
|
|
d1681e |
- time.sleep(0.5)
|
|
|
d1681e |
- continue
|
|
|
d1681e |
-
|
|
|
d1681e |
- if not l:
|
|
|
d1681e |
- continue
|
|
|
d1681e |
- tots = len(l)
|
|
|
d1681e |
- for lx in la:
|
|
|
d1681e |
- tots += len(lx)
|
|
|
d1681e |
- while tots > 1 << 20 and la:
|
|
|
d1681e |
- tots -= len(la.pop(0))
|
|
|
d1681e |
- la.append(l)
|
|
|
d1681e |
- finally:
|
|
|
d1681e |
- po.lock.release()
|
|
|
d1681e |
- t = syncdutils.Thread(target=tailer)
|
|
|
d1681e |
- t.start()
|
|
|
d1681e |
- cls.errhandler = t
|
|
|
d1681e |
-
|
|
|
d1681e |
- @classmethod
|
|
|
d1681e |
- def fork(cls):
|
|
|
d1681e |
- """fork wrapper that restarts errhandler thread in child"""
|
|
|
d1681e |
- pid = os.fork()
|
|
|
d1681e |
- if not pid:
|
|
|
d1681e |
- cls.init_errhandler()
|
|
|
d1681e |
- return pid
|
|
|
d1681e |
-
|
|
|
d1681e |
- def __init__(self, args, *a, **kw):
|
|
|
d1681e |
- """customizations for subprocess.Popen instantiation
|
|
|
d1681e |
-
|
|
|
d1681e |
- - 'close_fds' is taken to be the default
|
|
|
d1681e |
- - if child's stderr is chosen to be managed,
|
|
|
d1681e |
- register it with the error handler thread
|
|
|
d1681e |
- """
|
|
|
d1681e |
- self.args = args
|
|
|
d1681e |
- if 'close_fds' not in kw:
|
|
|
d1681e |
- kw['close_fds'] = True
|
|
|
d1681e |
- self.lock = threading.Lock()
|
|
|
d1681e |
- self.on_death_row = False
|
|
|
d1681e |
- self.elines = []
|
|
|
d1681e |
- try:
|
|
|
d1681e |
- sup(self, args, *a, **kw)
|
|
|
d1681e |
- except:
|
|
|
d1681e |
- ex = sys.exc_info()[1]
|
|
|
d1681e |
- if not isinstance(ex, OSError):
|
|
|
d1681e |
- raise
|
|
|
d1681e |
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
|
|
|
d1681e |
- (args[0], errno.errorcode[ex.errno],
|
|
|
d1681e |
- os.strerror(ex.errno)))
|
|
|
d1681e |
- if kw.get('stderr') == subprocess.PIPE:
|
|
|
d1681e |
- assert(getattr(self, 'errhandler', None))
|
|
|
d1681e |
- self.errstore[self] = []
|
|
|
d1681e |
-
|
|
|
d1681e |
- def errlog(self):
|
|
|
d1681e |
- """make a log about child's failure event"""
|
|
|
d1681e |
- logging.error(lf("command returned error",
|
|
|
d1681e |
- cmd=" ".join(self.args),
|
|
|
d1681e |
- error=self.returncode))
|
|
|
d1681e |
- lp = ''
|
|
|
d1681e |
-
|
|
|
d1681e |
- def logerr(l):
|
|
|
d1681e |
- logging.error(self.args[0] + "> " + l)
|
|
|
d1681e |
- for l in self.elines:
|
|
|
d1681e |
- ls = l.split('\n')
|
|
|
d1681e |
- ls[0] = lp + ls[0]
|
|
|
d1681e |
- lp = ls.pop()
|
|
|
d1681e |
- for ll in ls:
|
|
|
d1681e |
- logerr(ll)
|
|
|
d1681e |
- if lp:
|
|
|
d1681e |
- logerr(lp)
|
|
|
d1681e |
-
|
|
|
d1681e |
- def errfail(self):
|
|
|
d1681e |
- """fail nicely if child did not terminate with success"""
|
|
|
d1681e |
- self.errlog()
|
|
|
d1681e |
- syncdutils.finalize(exval=1)
|
|
|
d1681e |
-
|
|
|
d1681e |
- def terminate_geterr(self, fail_on_err=True):
|
|
|
d1681e |
- """kill child, finalize stderr harvesting (unregister
|
|
|
d1681e |
- from errhandler, set up .elines), fail on error if
|
|
|
d1681e |
- asked for
|
|
|
d1681e |
- """
|
|
|
d1681e |
- self.lock.acquire()
|
|
|
d1681e |
- try:
|
|
|
d1681e |
- self.on_death_row = True
|
|
|
d1681e |
- finally:
|
|
|
d1681e |
- self.lock.release()
|
|
|
d1681e |
- elines = self.errstore.pop(self)
|
|
|
d1681e |
- if self.poll() is None:
|
|
|
d1681e |
- self.terminate()
|
|
|
d1681e |
- if self.poll() is None:
|
|
|
d1681e |
- time.sleep(0.1)
|
|
|
d1681e |
- self.kill()
|
|
|
d1681e |
- self.wait()
|
|
|
d1681e |
- while True:
|
|
|
d1681e |
- if not select([self.stderr], [], [], 0.1)[0]:
|
|
|
d1681e |
- break
|
|
|
d1681e |
- b = os.read(self.stderr.fileno(), 1024)
|
|
|
d1681e |
- if b:
|
|
|
d1681e |
- elines.append(b)
|
|
|
d1681e |
- else:
|
|
|
d1681e |
- break
|
|
|
d1681e |
- self.stderr.close()
|
|
|
d1681e |
- self.elines = elines
|
|
|
d1681e |
- if fail_on_err and self.returncode != 0:
|
|
|
d1681e |
- self.errfail()
|
|
|
d1681e |
-
|
|
|
d1681e |
-
|
|
|
d1681e |
class Server(object):
|
|
|
d1681e |
|
|
|
d1681e |
"""singleton implemening those filesystem access primitives
|
|
|
d1681e |
@@ -776,6 +623,31 @@ class Server(object):
|
|
|
d1681e |
if isinstance(st, int):
|
|
|
d1681e |
blob = entry_pack_mkdir(
|
|
|
d1681e |
gfid, bname, e['mode'], e['uid'], e['gid'])
|
|
|
d1681e |
+ else:
|
|
|
d1681e |
+ # If gfid of a directory exists on slave but path based
|
|
|
d1681e |
+ # create is getting EEXIST. This means the directory is
|
|
|
d1681e |
+ # renamed in master but recorded as MKDIR during hybrid
|
|
|
d1681e |
+ # crawl. Get the directory path by reading the backend
|
|
|
d1681e |
+ # symlink and trying to rename to new name as said by
|
|
|
d1681e |
+ # master.
|
|
|
d1681e |
+ global slv_bricks
|
|
|
d1681e |
+ global slv_volume
|
|
|
d1681e |
+ global slv_host
|
|
|
d1681e |
+ if not slv_bricks:
|
|
|
d1681e |
+ slv_info = Volinfo (slv_volume, slv_host)
|
|
|
d1681e |
+ slv_bricks = slv_info.bricks
|
|
|
d1681e |
+ # Result of readlink would be of format as below.
|
|
|
d1681e |
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
|
|
|
d1681e |
+ realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
|
|
|
d1681e |
+ ".glusterfs", gfid[0:2],
|
|
|
d1681e |
+ gfid[2:4], gfid))
|
|
|
d1681e |
+ realpath_parts = realpath.split('/')
|
|
|
d1681e |
+ src_pargfid = realpath_parts[-2]
|
|
|
d1681e |
+ src_basename = realpath_parts[-1]
|
|
|
d1681e |
+ src_entry = os.path.join(pfx, src_pargfid, src_basename)
|
|
|
d1681e |
+ logging.info(lf("Special case: rename on mkdir",
|
|
|
d1681e |
+ gfid=gfid, entry=repr(entry)))
|
|
|
d1681e |
+ rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
|
|
|
d1681e |
elif op == 'LINK':
|
|
|
d1681e |
slink = os.path.join(pfx, gfid)
|
|
|
d1681e |
st = lstat(slink)
|
|
|
d1681e |
@@ -1318,6 +1190,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
|
|
|
d1681e |
def __init__(self, path):
|
|
|
d1681e |
self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
|
|
|
d1681e |
|
|
|
d1681e |
+ global slv_volume
|
|
|
d1681e |
+ global slv_host
|
|
|
d1681e |
+ slv_volume = self.volume
|
|
|
d1681e |
+ slv_host = self.host
|
|
|
d1681e |
+
|
|
|
d1681e |
def canonical_path(self):
|
|
|
d1681e |
return ':'.join([gethostbyname(self.host), self.volume])
|
|
|
d1681e |
|
|
|
d1681e |
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
|
|
|
d1681e |
index 2b57f83..a493c37 100644
|
|
|
d1681e |
--- a/geo-replication/syncdaemon/syncdutils.py
|
|
|
d1681e |
+++ b/geo-replication/syncdaemon/syncdutils.py
|
|
|
d1681e |
@@ -16,13 +16,18 @@ import fcntl
|
|
|
d1681e |
import shutil
|
|
|
d1681e |
import logging
|
|
|
d1681e |
import socket
|
|
|
d1681e |
+import errno
|
|
|
d1681e |
+import threading
|
|
|
d1681e |
import subprocess
|
|
|
d1681e |
+from subprocess import PIPE
|
|
|
d1681e |
from threading import Lock, Thread as baseThread
|
|
|
d1681e |
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
|
|
|
d1681e |
from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
|
|
|
d1681e |
from signal import signal, SIGTERM
|
|
|
d1681e |
import select as oselect
|
|
|
d1681e |
from os import waitpid as owaitpid
|
|
|
d1681e |
+import xml.etree.ElementTree as XET
|
|
|
d1681e |
+from select import error as SelectError
|
|
|
d1681e |
|
|
|
d1681e |
from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
|
|
|
d1681e |
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
|
|
|
d1681e |
@@ -76,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
|
|
|
d1681e |
PERCENTAGE_ESCAPE_CHAR = "%25"
|
|
|
d1681e |
|
|
|
d1681e |
|
|
|
d1681e |
+def sup(x, *a, **kw):
|
|
|
d1681e |
+ """a rubyesque "super" for python ;)
|
|
|
d1681e |
+
|
|
|
d1681e |
+ invoke caller method in parent class with given args.
|
|
|
d1681e |
+ """
|
|
|
d1681e |
+ return getattr(super(type(x), x),
|
|
|
d1681e |
+ sys._getframe(1).f_code.co_name)(*a, **kw)
|
|
|
d1681e |
+
|
|
|
d1681e |
+
|
|
|
d1681e |
def escape(s):
|
|
|
d1681e |
"""the chosen flavor of string escaping, used all over
|
|
|
d1681e |
to turn whatever data to creatable representation"""
|
|
|
d1681e |
@@ -650,3 +664,226 @@ def lf(event, **kwargs):
|
|
|
d1681e |
for k, v in kwargs.items():
|
|
|
d1681e |
msg += "\t{0}={1}".format(k, v)
|
|
|
d1681e |
return msg
|
|
|
d1681e |
+
|
|
|
d1681e |
+
|
|
|
d1681e |
+class Popen(subprocess.Popen):
|
|
|
d1681e |
+
|
|
|
d1681e |
+ """customized subclass of subprocess.Popen with a ring
|
|
|
d1681e |
+ buffer for children error output"""
|
|
|
d1681e |
+
|
|
|
d1681e |
+ @classmethod
|
|
|
d1681e |
+ def init_errhandler(cls):
|
|
|
d1681e |
+ """start the thread which handles children's error output"""
|
|
|
d1681e |
+ cls.errstore = {}
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def tailer():
|
|
|
d1681e |
+ while True:
|
|
|
d1681e |
+ errstore = cls.errstore.copy()
|
|
|
d1681e |
+ try:
|
|
|
d1681e |
+ poe, _, _ = select(
|
|
|
d1681e |
+ [po.stderr for po in errstore], [], [], 1)
|
|
|
d1681e |
+ except (ValueError, SelectError):
|
|
|
d1681e |
+ # stderr is already closed wait for some time before
|
|
|
d1681e |
+ # checking next error
|
|
|
d1681e |
+ time.sleep(0.5)
|
|
|
d1681e |
+ continue
|
|
|
d1681e |
+ for po in errstore:
|
|
|
d1681e |
+ if po.stderr not in poe:
|
|
|
d1681e |
+ continue
|
|
|
d1681e |
+ po.lock.acquire()
|
|
|
d1681e |
+ try:
|
|
|
d1681e |
+ if po.on_death_row:
|
|
|
d1681e |
+ continue
|
|
|
d1681e |
+ la = errstore[po]
|
|
|
d1681e |
+ try:
|
|
|
d1681e |
+ fd = po.stderr.fileno()
|
|
|
d1681e |
+ except ValueError: # file is already closed
|
|
|
d1681e |
+ time.sleep(0.5)
|
|
|
d1681e |
+ continue
|
|
|
d1681e |
+
|
|
|
d1681e |
+ try:
|
|
|
d1681e |
+ l = os.read(fd, 1024)
|
|
|
d1681e |
+ except OSError:
|
|
|
d1681e |
+ time.sleep(0.5)
|
|
|
d1681e |
+ continue
|
|
|
d1681e |
+
|
|
|
d1681e |
+ if not l:
|
|
|
d1681e |
+ continue
|
|
|
d1681e |
+ tots = len(l)
|
|
|
d1681e |
+ for lx in la:
|
|
|
d1681e |
+ tots += len(lx)
|
|
|
d1681e |
+ while tots > 1 << 20 and la:
|
|
|
d1681e |
+ tots -= len(la.pop(0))
|
|
|
d1681e |
+ la.append(l)
|
|
|
d1681e |
+ finally:
|
|
|
d1681e |
+ po.lock.release()
|
|
|
d1681e |
+ t = Thread(target=tailer)
|
|
|
d1681e |
+ t.start()
|
|
|
d1681e |
+ cls.errhandler = t
|
|
|
d1681e |
+
|
|
|
d1681e |
+ @classmethod
|
|
|
d1681e |
+ def fork(cls):
|
|
|
d1681e |
+ """fork wrapper that restarts errhandler thread in child"""
|
|
|
d1681e |
+ pid = os.fork()
|
|
|
d1681e |
+ if not pid:
|
|
|
d1681e |
+ cls.init_errhandler()
|
|
|
d1681e |
+ return pid
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def __init__(self, args, *a, **kw):
|
|
|
d1681e |
+ """customizations for subprocess.Popen instantiation
|
|
|
d1681e |
+
|
|
|
d1681e |
+ - 'close_fds' is taken to be the default
|
|
|
d1681e |
+ - if child's stderr is chosen to be managed,
|
|
|
d1681e |
+ register it with the error handler thread
|
|
|
d1681e |
+ """
|
|
|
d1681e |
+ self.args = args
|
|
|
d1681e |
+ if 'close_fds' not in kw:
|
|
|
d1681e |
+ kw['close_fds'] = True
|
|
|
d1681e |
+ self.lock = threading.Lock()
|
|
|
d1681e |
+ self.on_death_row = False
|
|
|
d1681e |
+ self.elines = []
|
|
|
d1681e |
+ try:
|
|
|
d1681e |
+ sup(self, args, *a, **kw)
|
|
|
d1681e |
+ except:
|
|
|
d1681e |
+ ex = sys.exc_info()[1]
|
|
|
d1681e |
+ if not isinstance(ex, OSError):
|
|
|
d1681e |
+ raise
|
|
|
d1681e |
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
|
|
|
d1681e |
+ (args[0], errno.errorcode[ex.errno],
|
|
|
d1681e |
+ os.strerror(ex.errno)))
|
|
|
d1681e |
+ if kw.get('stderr') == subprocess.PIPE:
|
|
|
d1681e |
+ assert(getattr(self, 'errhandler', None))
|
|
|
d1681e |
+ self.errstore[self] = []
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def errlog(self):
|
|
|
d1681e |
+ """make a log about child's failure event"""
|
|
|
d1681e |
+ logging.error(lf("command returned error",
|
|
|
d1681e |
+ cmd=" ".join(self.args),
|
|
|
d1681e |
+ error=self.returncode))
|
|
|
d1681e |
+ lp = ''
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def logerr(l):
|
|
|
d1681e |
+ logging.error(self.args[0] + "> " + l)
|
|
|
d1681e |
+ for l in self.elines:
|
|
|
d1681e |
+ ls = l.split('\n')
|
|
|
d1681e |
+ ls[0] = lp + ls[0]
|
|
|
d1681e |
+ lp = ls.pop()
|
|
|
d1681e |
+ for ll in ls:
|
|
|
d1681e |
+ logerr(ll)
|
|
|
d1681e |
+ if lp:
|
|
|
d1681e |
+ logerr(lp)
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def errfail(self):
|
|
|
d1681e |
+ """fail nicely if child did not terminate with success"""
|
|
|
d1681e |
+ self.errlog()
|
|
|
d1681e |
+ finalize(exval=1)
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def terminate_geterr(self, fail_on_err=True):
|
|
|
d1681e |
+ """kill child, finalize stderr harvesting (unregister
|
|
|
d1681e |
+ from errhandler, set up .elines), fail on error if
|
|
|
d1681e |
+ asked for
|
|
|
d1681e |
+ """
|
|
|
d1681e |
+ self.lock.acquire()
|
|
|
d1681e |
+ try:
|
|
|
d1681e |
+ self.on_death_row = True
|
|
|
d1681e |
+ finally:
|
|
|
d1681e |
+ self.lock.release()
|
|
|
d1681e |
+ elines = self.errstore.pop(self)
|
|
|
d1681e |
+ if self.poll() is None:
|
|
|
d1681e |
+ self.terminate()
|
|
|
d1681e |
+ if self.poll() is None:
|
|
|
d1681e |
+ time.sleep(0.1)
|
|
|
d1681e |
+ self.kill()
|
|
|
d1681e |
+ self.wait()
|
|
|
d1681e |
+ while True:
|
|
|
d1681e |
+ if not select([self.stderr], [], [], 0.1)[0]:
|
|
|
d1681e |
+ break
|
|
|
d1681e |
+ b = os.read(self.stderr.fileno(), 1024)
|
|
|
d1681e |
+ if b:
|
|
|
d1681e |
+ elines.append(b)
|
|
|
d1681e |
+ else:
|
|
|
d1681e |
+ break
|
|
|
d1681e |
+ self.stderr.close()
|
|
|
d1681e |
+ self.elines = elines
|
|
|
d1681e |
+ if fail_on_err and self.returncode != 0:
|
|
|
d1681e |
+ self.errfail()
|
|
|
d1681e |
+
|
|
|
d1681e |
+
|
|
|
d1681e |
+class Volinfo(object):
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def __init__(self, vol, host='localhost', prelude=[]):
|
|
|
d1681e |
+ po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
|
|
|
d1681e |
+ 'volume', 'info', vol],
|
|
|
d1681e |
+ stdout=PIPE, stderr=PIPE)
|
|
|
d1681e |
+ vix = po.stdout.read()
|
|
|
d1681e |
+ po.wait()
|
|
|
d1681e |
+ po.terminate_geterr()
|
|
|
d1681e |
+ vi = XET.fromstring(vix)
|
|
|
d1681e |
+ if vi.find('opRet').text != '0':
|
|
|
d1681e |
+ if prelude:
|
|
|
d1681e |
+ via = '(via %s) ' % prelude.join(' ')
|
|
|
d1681e |
+ else:
|
|
|
d1681e |
+ via = ' '
|
|
|
d1681e |
+ raise GsyncdError('getting volume info of %s%s '
|
|
|
d1681e |
+ 'failed with errorcode %s' %
|
|
|
d1681e |
+ (vol, via, vi.find('opErrno').text))
|
|
|
d1681e |
+ self.tree = vi
|
|
|
d1681e |
+ self.volume = vol
|
|
|
d1681e |
+ self.host = host
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def get(self, elem):
|
|
|
d1681e |
+ return self.tree.findall('.//' + elem)
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def is_tier(self):
|
|
|
d1681e |
+ return (self.get('typeStr')[0].text == 'Tier')
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def is_hot(self, brickpath):
|
|
|
d1681e |
+ logging.debug('brickpath: ' + repr(brickpath))
|
|
|
d1681e |
+ return brickpath in self.hot_bricks
|
|
|
d1681e |
+
|
|
|
d1681e |
+ @property
|
|
|
d1681e |
+ @memoize
|
|
|
d1681e |
+ def bricks(self):
|
|
|
d1681e |
+ def bparse(b):
|
|
|
d1681e |
+ host, dirp = b.find("name").text.split(':', 2)
|
|
|
d1681e |
+ return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
|
|
|
d1681e |
+ return [bparse(b) for b in self.get('brick')]
|
|
|
d1681e |
+
|
|
|
d1681e |
+ @property
|
|
|
d1681e |
+ @memoize
|
|
|
d1681e |
+ def uuid(self):
|
|
|
d1681e |
+ ids = self.get('id')
|
|
|
d1681e |
+ if len(ids) != 1:
|
|
|
d1681e |
+ raise GsyncdError("volume info of %s obtained from %s: "
|
|
|
d1681e |
+ "ambiguous uuid" % (self.volume, self.host))
|
|
|
d1681e |
+ return ids[0].text
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def replica_count(self, tier, hot):
|
|
|
d1681e |
+ if (tier and hot):
|
|
|
d1681e |
+ return int(self.get('hotBricks/hotreplicaCount')[0].text)
|
|
|
d1681e |
+ elif (tier and not hot):
|
|
|
d1681e |
+ return int(self.get('coldBricks/coldreplicaCount')[0].text)
|
|
|
d1681e |
+ else:
|
|
|
d1681e |
+ return int(self.get('replicaCount')[0].text)
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def disperse_count(self, tier, hot):
|
|
|
d1681e |
+ if (tier and hot):
|
|
|
d1681e |
+ # Tiering doesn't support disperse volume as hot brick,
|
|
|
d1681e |
+ # hence no xml output, so returning 0. In case, if it's
|
|
|
d1681e |
+ # supported later, we should change here.
|
|
|
d1681e |
+ return 0
|
|
|
d1681e |
+ elif (tier and not hot):
|
|
|
d1681e |
+ return int(self.get('coldBricks/colddisperseCount')[0].text)
|
|
|
d1681e |
+ else:
|
|
|
d1681e |
+ return int(self.get('disperseCount')[0].text)
|
|
|
d1681e |
+
|
|
|
d1681e |
+ @property
|
|
|
d1681e |
+ @memoize
|
|
|
d1681e |
+ def hot_bricks(self):
|
|
|
d1681e |
+ return [b.text for b in self.get('hotBricks/brick')]
|
|
|
d1681e |
+
|
|
|
d1681e |
+ def get_hot_bricks_count(self, tier):
|
|
|
d1681e |
+ if (tier):
|
|
|
d1681e |
+ return int(self.get('hotBricks/hotbrickCount')[0].text)
|
|
|
d1681e |
+ else:
|
|
|
d1681e |
+ return 0
|
|
|
d1681e |
--
|
|
|
d1681e |
1.8.3.1
|
|
|
d1681e |
|