Blob Blame History Raw
From 27465e9f8b567db4a5265b1cfd0f08f300667416 Mon Sep 17 00:00:00 2001
From: Raghavendra Bhat <raghavendra@redhat.com>
Date: Tue, 26 May 2015 19:22:14 +0530
Subject: [PATCH 185/190] features/bit-rot-stub: deny access to bad objects

       Backport of http://review.gluster.org/11126

* Access to bad objects (especially operations such as open, readv, writev)
  should be denied to prevent applications from getting wrong data.

* Do not allow anyone apart from scrubber to set bad object xattr.

* Do not allow bad object xattr to be removed.

Change-Id: Id4e43b8318a7b0822231485c60bbc551b9adf7e8
BUG: 1224227
Signed-off-by: Raghavendra Bhat <raghavendra@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/51757
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Tested-by: Venky Shankar <vshankar@redhat.com>
---
 libglusterfs/src/glusterfs.h                       |    3 +
 xlators/features/bit-rot/src/bitd/bit-rot.c        |    7 +-
 xlators/features/bit-rot/src/stub/bit-rot-common.h |   15 +-
 .../bit-rot/src/stub/bit-rot-stub-messages.h       |   28 ++
 xlators/features/bit-rot/src/stub/bit-rot-stub.c   |  310 ++++++++++++++++++--
 xlators/features/bit-rot/src/stub/bit-rot-stub.h   |   94 ++++++
 xlators/performance/quick-read/src/quick-read.c    |    5 +
 7 files changed, 430 insertions(+), 32 deletions(-)

diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index c00bf55..97965ab 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -127,6 +127,9 @@
 #define BITROT_CURRENT_VERSION_KEY  "trusted.bit-rot.version"
 #define BITROT_SIGNING_VERSION_KEY  "trusted.bit-rot.signature"
 
+/* globally usable bad file marker */
+#define GLUSTERFS_BAD_INODE         "glusterfs.bad-inode"
+
 /* on-disk size of signing xattr (not the signature itself) */
 #define BITROT_SIGNING_XATTR_SIZE_KEY  "trusted.glusterfs.bit-rot.size"
 
diff --git a/xlators/features/bit-rot/src/bitd/bit-rot.c b/xlators/features/bit-rot/src/bitd/bit-rot.c
index cf9e8e2..94063cb 100644
--- a/xlators/features/bit-rot/src/bitd/bit-rot.c
+++ b/xlators/features/bit-rot/src/bitd/bit-rot.c
@@ -164,11 +164,10 @@ bitd_is_bad_file (xlator_t *this, br_child_t *child, loc_t *loc, fd_t *fd)
 
         if (fd)
                 ret = syncop_fgetxattr (child->xl, fd, &xattr,
-                                        "trusted.glusterfs.bad-file", NULL,
-                                        NULL);
+                                        BITROT_OBJECT_BAD_KEY, NULL, NULL);
         else if (loc)
-                ret = syncop_getxattr (child->xl, loc, &xattr,
-                                       "trusted.glusterfs.bad-file", NULL,
+                ret = syncop_getxattr (child->xl, loc,
+                                       &xattr, BITROT_OBJECT_BAD_KEY, NULL,
                                        NULL);
 
         if (!ret) {
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-common.h b/xlators/features/bit-rot/src/stub/bit-rot-common.h
index a8285d2..f8d03de 100644
--- a/xlators/features/bit-rot/src/stub/bit-rot-common.h
+++ b/xlators/features/bit-rot/src/stub/bit-rot-common.h
@@ -41,12 +41,23 @@ typedef enum br_sign_state {
 } br_sign_state_t;
 
 static inline br_vxattr_status_t
-br_version_xattr_state (dict_t *xattr,
-                        br_version_t **obuf, br_signature_t **sbuf)
+br_version_xattr_state (dict_t *xattr, br_version_t **obuf,
+                        br_signature_t **sbuf, gf_boolean_t *objbad)
 {
         int32_t             ret    = 0;
         int32_t             vxattr = 0;
         br_vxattr_status_t  status;
+        void               *data   = NULL;
+
+        /**
+         * The key being present in the dict indicates the xattr was set on
+         * disk. The presence of xattr itself as of now is suffecient to say
+         * the the object is bad.
+         */
+        *objbad = _gf_false;
+        ret = dict_get_bin (xattr, BITROT_OBJECT_BAD_KEY, (void **)&data);
+        if (!ret)
+                *objbad = _gf_true;
 
         ret = dict_get_bin (xattr, BITROT_CURRENT_VERSION_KEY, (void **)obuf);
         if (ret)
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h b/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h
index d940b65..db5736a 100644
--- a/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub-messages.h
@@ -153,6 +153,34 @@
  * @recommendedaction
  *
  */
+#define BRS_MSG_BAD_OBJ_MARK_FAIL           (GLFS_BITROT_STUB_BASE + 16)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+#define BRS_MSG_NON_SCRUB_BAD_OBJ_MARK      (GLFS_BITROT_STUB_BASE + 17)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+#define BRS_MSG_REMOVE_BAD_OBJECT_XATTR     (GLFS_BITROT_STUB_BASE + 18)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+#define BRS_MSG_BAD_OBJECT_ACCESS           (GLFS_BITROT_STUB_BASE + 20)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
 /*------------*/
 #define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
 #endif /* !_BITROT_STUB_MESSAGES_H_ */
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub.c b/xlators/features/bit-rot/src/stub/bit-rot-stub.c
index 600eb80..de81510 100644
--- a/xlators/features/bit-rot/src/stub/bit-rot-stub.c
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub.c
@@ -237,7 +237,8 @@ br_stub_prepare_signing_request (dict_t *dict,
  */
 static inline int
 br_stub_init_inode_versions (xlator_t *this, fd_t *fd, inode_t *inode,
-                             unsigned long version, gf_boolean_t markdirty)
+                             unsigned long version, gf_boolean_t markdirty,
+                             gf_boolean_t bad_object)
 {
         int32_t ret = 0;
         br_stub_inode_ctx_t *ctx = NULL;
@@ -252,17 +253,21 @@ br_stub_init_inode_versions (xlator_t *this, fd_t *fd, inode_t *inode,
                 : __br_stub_mark_inode_synced (ctx);
         __br_stub_set_ongoing_version (ctx, version);
 
+        if (bad_object)
+                __br_stub_mark_object_bad (ctx);
+
         if (fd) {
                 ret = br_stub_add_fd_to_inode (this, fd, ctx);
                 if (ret)
                         goto free_ctx;
         }
+
         ret = br_stub_set_inode_ctx (this, inode, ctx);
         if (ret)
                 goto free_ctx;
         return 0;
 
- free_ctx:
+free_ctx:
         GF_FREE (ctx);
  error_return:
         return -1;
@@ -290,7 +295,7 @@ br_stub_mod_inode_versions (xlator_t *this,
 
                 ret = 0;
         }
- unblock:
+unblock:
         UNLOCK (&inode->lock);
 
         return ret;
@@ -623,7 +628,7 @@ int32_t
 br_stub_perform_objsign (call_frame_t *frame, xlator_t *this,
                          fd_t *fd, dict_t *dict, int flags, dict_t *xdata)
 {
-        STACK_WIND (frame, default_setxattr_cbk,
+        STACK_WIND (frame, default_fsetxattr_cbk,
                     FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, fd,
                     dict, flags, xdata);
 
@@ -900,13 +905,101 @@ br_stub_handle_object_reopen (call_frame_t *frame,
         STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, NULL);
 }
 
+/**
+ * This function only handles bad file identification. Instead of checking in
+ * fops like open, readv, writev whether the object is bad or not by doing
+ * getxattr calls, better to catch them when scrubber marks it as bad.
+ * So this callback is called only when the fsetxattr is sent by the scrubber
+ * to mark the object as bad.
+ */
+int
+br_stub_fsetxattr_bad_object_cbk (call_frame_t *frame, void *cookie,
+                                  xlator_t *this, int32_t op_ret,
+                                  int32_t op_errno, dict_t *xdata)
+{
+        br_stub_local_t *local = NULL;
+        int32_t          ret   = -1;
+
+        local = frame->local;
+        frame->local = NULL;
+
+        if (op_ret < 0)
+                goto unwind;
+
+        /*
+         * What to do if marking the object as bad fails? (i.e. in memory
+         * marking within the inode context. If we are here means fsetxattr
+         * fop has succeeded on disk and the bad object xattr has been set).
+         * We can return failure to scruber, but there is nothing the scrubber
+         * can do with it (it might assume that the on disk setxattr itself has
+         * failed). The main purpose of this operation is to help identify the
+         * bad object by checking the inode context itself (thus avoiding the
+         * necessity of doing a getxattr fop on the disk).
+         *
+         * So as of now, success itself is being returned even though inode
+         * context set operation fails.
+         * In future if there is any change in the policy which can handle this,
+         * then appropriate response should be sent (i.e. success or error).
+         */
+        ret = br_stub_mark_object_bad (this, local->u.context.inode);
+        if (ret)
+                gf_msg (this->name, GF_LOG_ERROR, 0, BRS_MSG_BAD_OBJ_MARK_FAIL,
+                        "failed to mark object %s as bad",
+                        uuid_utoa (local->u.context.inode->gfid));
+
+unwind:
+        STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, xdata);
+        br_stub_cleanup_local (local);
+        br_stub_dealloc_local (local);
+        return 0;
+}
+
+static int32_t
+br_stub_handle_bad_object_key (call_frame_t *frame, xlator_t *this, fd_t *fd,
+                               dict_t *dict, int flags, dict_t *xdata)
+{
+        br_stub_local_t *local    = NULL;
+        int32_t          op_ret   = -1;
+        int32_t         op_errno = EINVAL;
+
+        if (frame->root->pid != GF_CLIENT_PID_SCRUB) {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        BRS_MSG_NON_SCRUB_BAD_OBJ_MARK, "bad object marking "
+                        "on %s is not from the scrubber",
+                        uuid_utoa (fd->inode->gfid));
+                goto unwind;
+        }
+
+        local = br_stub_alloc_local (this);
+        if (!local) {
+                gf_msg (this->name, GF_LOG_ERROR, 0, BRS_MSG_NO_MEMORY,
+                        "failed to allocate memory for fsetxattr on %s",
+                        uuid_utoa (fd->inode->gfid));
+                op_ret = -1;
+                op_errno = ENOMEM;
+                goto unwind;
+        }
+
+        br_stub_fill_local (local, NULL, fd, fd->inode,
+                            fd->inode->gfid, BR_STUB_NO_VERSIONING, 0);
+        frame->local = local;
+
+        STACK_WIND (frame, br_stub_fsetxattr_bad_object_cbk, FIRST_CHILD (this),
+                    FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags,
+                    xdata);
+        return 0;
+unwind:
+        STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, NULL);
+        return 0;
+}
+
 int
 br_stub_fsetxattr (call_frame_t *frame, xlator_t *this,
                    fd_t *fd, dict_t *dict, int flags, dict_t *xdata)
 {
-        int32_t          ret  = 0;
-        uint32_t val = 0;
-        br_isignature_t *sign = NULL;
+        int32_t              ret      = 0;
+        uint32_t             val      = 0;
+        br_isignature_t     *sign     = NULL;
 
         if (!IA_ISREG (fd->inode->ia_type))
                 goto wind;
@@ -927,11 +1020,18 @@ br_stub_fsetxattr (call_frame_t *frame, xlator_t *this,
                 goto done;
         }
 
- wind:
-        STACK_WIND (frame, default_setxattr_cbk,
-                    FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsetxattr, fd,
-                    dict, flags, xdata);
- done:
+        /* handle bad object */
+        if (dict_get (dict, BITROT_OBJECT_BAD_KEY)) {
+                br_stub_handle_bad_object_key (frame, this, fd,
+                                               dict, flags, xdata);
+                goto done;
+        }
+
+wind:
+        STACK_WIND (frame, default_fsetxattr_cbk, FIRST_CHILD (this),
+                    FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags,
+                    xdata);
+done:
         return 0;
 }
 
@@ -940,6 +1040,59 @@ br_stub_fsetxattr (call_frame_t *frame, xlator_t *this,
 
 /** {{{ */
 
+/* {f}removexattr() */
+
+int32_t
+br_stub_removexattr (call_frame_t *frame, xlator_t *this,
+                     loc_t *loc, const char *name, dict_t *xdata)
+{
+        int32_t op_ret    = -1;
+        int32_t op_errno  = EINVAL;
+
+        if (!strcmp (BITROT_OBJECT_BAD_KEY, name)) {
+                gf_msg (this->name, GF_LOG_WARNING, 0,
+                        BRS_MSG_REMOVE_BAD_OBJECT_XATTR, "Remove xattr called"
+                        " on bad object xattr for file %s", loc->path);
+                goto unwind;
+        }
+
+        STACK_WIND_TAIL (frame, FIRST_CHILD(this),
+                         FIRST_CHILD(this)->fops->removexattr,
+                         loc, name, xdata);
+        return 0;
+unwind:
+        STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno, NULL);
+        return 0;
+}
+
+int32_t
+br_stub_fremovexattr (call_frame_t *frame, xlator_t *this,
+                      fd_t *fd, const char *name, dict_t *xdata)
+{
+        int32_t op_ret    = -1;
+        int32_t op_errno  = EINVAL;
+
+        if (!strcmp (BITROT_OBJECT_BAD_KEY, name)) {
+                gf_msg (this->name, GF_LOG_WARNING, 0,
+                        BRS_MSG_REMOVE_BAD_OBJECT_XATTR, "Remove xattr called"
+                        " on bad object xattr for inode %s",
+                        uuid_utoa (fd->inode->gfid));
+                goto unwind;
+        }
+
+        STACK_WIND_TAIL (frame, FIRST_CHILD(this),
+                         FIRST_CHILD(this)->fops->fremovexattr,
+                         fd, name, xdata);
+        return 0;
+unwind:
+        STACK_UNWIND_STRICT (fremovexattr, frame, op_ret, op_errno, NULL);
+        return 0;
+}
+
+/** }}} */
+
+/** {{{ */
+
 /* {f}getxattr() */
 
 int
@@ -1044,6 +1197,7 @@ br_stub_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         br_vxattr_status_t   status;
         br_stub_local_t     *local        = NULL;
         inode_t             *inode        = NULL;
+        gf_boolean_t         bad_object   = _gf_false;
 
         if (op_ret < 0)
                 goto unwind;
@@ -1055,7 +1209,11 @@ br_stub_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         inode = local->u.context.inode;
 
         op_ret   = -1;
-        status = br_version_xattr_state (xattr, &obuf, &sbuf);
+        status = br_version_xattr_state (xattr, &obuf, &sbuf, &bad_object);
+
+        op_errno = EIO;
+        if (bad_object)
+                goto delkeys;
 
         op_errno = EINVAL;
         if (status == BR_VXATTR_STATUS_INVALID)
@@ -1286,6 +1444,31 @@ unwind:
         return 0;
 }
 
+int32_t
+br_stub_readv (call_frame_t *frame, xlator_t *this,
+               fd_t *fd, size_t size, off_t offset, uint32_t flags, dict_t *xdata)
+{
+        int32_t              op_ret   = -1;
+        int32_t              op_errno = EINVAL;
+
+        GF_VALIDATE_OR_GOTO ("bit-rot-stub", this, unwind);
+        GF_VALIDATE_OR_GOTO (this->name, frame, unwind);
+        GF_VALIDATE_OR_GOTO (this->name, fd, unwind);
+        GF_VALIDATE_OR_GOTO (this->name, fd->inode, unwind);
+
+        BR_STUB_HANDLE_BAD_OBJECT (this, fd->inode, op_ret, op_errno, unwind);
+
+        STACK_WIND_TAIL (frame, FIRST_CHILD(this),
+                         FIRST_CHILD(this)->fops->readv, fd, size, offset,
+                         flags, xdata);
+        return 0;
+
+unwind:
+        STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, NULL, 0, NULL,
+                             NULL, NULL);
+        return 0;
+}
+
 /**
  * The first write response on the first fd in the list of fds will set
  * the flag to indicate that the inode is modified. The subsequent write
@@ -1367,6 +1550,8 @@ br_stub_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
         if (ret)
                 goto unwind;
 
+        BR_STUB_HANDLE_BAD_OBJECT (this, fd->inode, op_ret, op_errno, unwind);
+
         /**
          * The inode is not dirty and also witnessed atleast one successful
          * modification operation. Therefore, subsequent operations need not
@@ -1486,6 +1671,8 @@ br_stub_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd,
         if (ret)
                 goto unwind;
 
+        BR_STUB_HANDLE_BAD_OBJECT (this, fd->inode, op_ret, op_errno, unwind);
+
         if (!inc_version && modified)
                 goto wind;
 
@@ -1616,6 +1803,8 @@ br_stub_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc,
         if (ret)
                 goto cleanup_fd;
 
+        BR_STUB_HANDLE_BAD_OBJECT (this, fd->inode, op_ret, op_errno, unwind);
+
         if (!inc_version && modified)
                 goto wind;
 
@@ -1689,15 +1878,14 @@ br_stub_open (call_frame_t *frame, xlator_t *this,
         int32_t              ret      = -1;
         br_stub_inode_ctx_t *ctx      = NULL;
         uint64_t             ctx_addr = 0;
+        int32_t              op_ret   = -1;
+        int32_t              op_errno = EINVAL;
 
         GF_VALIDATE_OR_GOTO ("bit-rot-stub", this, unwind);
         GF_VALIDATE_OR_GOTO (this->name, loc, unwind);
         GF_VALIDATE_OR_GOTO (this->name, fd, unwind);
         GF_VALIDATE_OR_GOTO (this->name, fd->inode, unwind);
 
-        if (frame->root->pid == GF_CLIENT_PID_SCRUB)
-                goto wind;
-
         ret = br_stub_get_inode_ctx (this, fd->inode, &ctx_addr);
         if (ret) {
                 gf_msg (this->name, GF_LOG_ERROR, 0,
@@ -1708,6 +1896,12 @@ br_stub_open (call_frame_t *frame, xlator_t *this,
         }
 
         ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+        BR_STUB_HANDLE_BAD_OBJECT (this, loc->inode, op_ret, op_errno, unwind);
+
+        if (frame->root->pid == GF_CLIENT_PID_SCRUB)
+                goto wind;
+
         if (flags == O_RDONLY)
                 goto wind;
 
@@ -1725,7 +1919,7 @@ wind:
                     FIRST_CHILD (this)->fops->open, loc, flags, fd, xdata);
         return 0;
 unwind:
-        STACK_UNWIND_STRICT (open, frame, -1, EINVAL, NULL, NULL);
+        STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, NULL, NULL);
         return 0;
 }
 
@@ -1784,7 +1978,7 @@ br_stub_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         ret = br_stub_get_inode_ctx (this, fd->inode, &ctx_addr);
         if (ret < 0) {
                 ret = br_stub_init_inode_versions (this, fd, inode, version,
-                                                   _gf_true);
+                                                   _gf_true, _gf_false);
                 if (ret) {
                         op_ret = -1;
                         op_errno = EINVAL;
@@ -1834,7 +2028,7 @@ br_stub_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                 goto unwind;
 
         ret = br_stub_init_inode_versions (this, NULL, inode, version,
-                                           _gf_true);
+                                           _gf_true, _gf_false);
         /**
          * Like lookup, if init_inode_versions fail, return EINVAL
          */
@@ -1869,6 +2063,23 @@ unwind:
 
 /** }}} */
 
+/**
+ * As of now, only lookup searches for bad object xattr and marks the
+ * object as bad in its inode context if the xattr is present. But there
+ * is a possibility that, at the time of the lookup the object was not
+ * marked bad (i.e. bad object xattr was not set), and later its marked
+ * as bad. In this case, object is not bad, so when a fop such as open or
+ * readv or writev comes on the object, the fop will be sent downward instead
+ * of sending as error upwards.
+ * The solution for this is to do a getxattr for the below list of fops.
+ * lookup, readdirp, open, readv, writev.
+ * But doing getxattr for each of the above fops might be costly.
+ * So another method followed is to catch the bad file marking by the scrubber
+ * and set that info within the object's inode context. In this way getxattr
+ * calls can be avoided and bad objects can be caught instantly. Fetching the
+ * xattr is needed only in lookups when there is a brick restart or inode
+ * forget.
+ */
 static inline int32_t
 br_stub_lookup_version (xlator_t *this,
                         uuid_t gfid, inode_t *inode, dict_t *xattr)
@@ -1877,6 +2088,7 @@ br_stub_lookup_version (xlator_t *this,
         br_version_t       *obuf    = NULL;
         br_signature_t     *sbuf    = NULL;
         br_vxattr_status_t  status;
+        gf_boolean_t        bad_object = _gf_false;
 
         /**
          * versioning xattrs were requested from POSIX. if available, figure
@@ -1886,13 +2098,13 @@ br_stub_lookup_version (xlator_t *this,
          * operation (such as write(), etc..) triggers synchronization to
          * disk.
          */
-        status = br_version_xattr_state (xattr, &obuf, &sbuf);
-
+        status = br_version_xattr_state (xattr, &obuf, &sbuf, &bad_object);
         version = ((status == BR_VXATTR_STATUS_FULL)
                    || (status == BR_VXATTR_STATUS_UNSIGNED))
                         ? obuf->ongoingversion : BITROT_DEFAULT_CURRENT_VERSION;
-        return br_stub_init_inode_versions (this, NULL,
-                                            inode, version, _gf_true);
+
+        return br_stub_init_inode_versions (this, NULL, inode, version,
+                                            _gf_true, bad_object);
 }
 
 
@@ -1975,6 +2187,9 @@ br_stub_readdirp (call_frame_t *frame, xlator_t *this,
         ret = dict_set_uint32 (dict, BITROT_SIGNING_VERSION_KEY, 0);
         if (ret)
                 goto unwind;
+        ret = dict_set_uint32 (dict, BITROT_OBJECT_BAD_KEY, 0);
+        if (ret)
+                goto unwind;
 
         STACK_WIND (frame, br_stub_readdirp_cbk, FIRST_CHILD (this),
                     FIRST_CHILD(this)->fops->readdirp, fd, size,
@@ -2009,18 +2224,51 @@ br_stub_lookup_cbk (call_frame_t *frame, void *cookie,
                 goto unwind;
         if (!IA_ISREG (stbuf->ia_type))
                 goto unwind;
-        if (cookie != (void *) BR_STUB_REQUEST_COOKIE)
+
+        /**
+         * If the object is bad, then "bad inode" marker has to be sent back
+         * in resoinse, for revalidated lookups as well. Some xlators such as
+         * quick-read might cache the data in revalidated lookup as fresh
+         * lookup would anyway have sent "bad inode" marker.
+         * In general send bad inode marker for every lookup operation on the
+         * bad object.
+         */
+        if (cookie != (void *) BR_STUB_REQUEST_COOKIE) {
+                ret =  br_stub_mark_xdata_bad_object (this, inode, xattr);
+                if (ret) {
+                        op_ret = -1;
+                        op_errno = EIO;
+                        goto unwind;
+                }
+
                 goto delkey;
+        }
 
         ret = br_stub_lookup_version (this, stbuf->ia_gfid, inode, xattr);
         if (ret < 0) {
                 op_ret   = -1;
                 op_errno = EINVAL;
+                goto delkey;
+        }
+
+        /**
+         * If the object is bad, send "bad inode" marker back in response
+         * for xlator(s) to act accordingly (such as quick-read, etc..)
+         */
+        ret = br_stub_mark_xdata_bad_object (this, inode, xattr);
+        if (ret) {
+                /**
+                 * aaha! bad object, but sorry we would not
+                 * satisfy the request on allocation failures.
+                 */
+                op_ret = -1;
+                op_errno = EIO;
+                goto unwind;
         }
 
- delkey:
+delkey:
         br_stub_remove_vxattrs (xattr);
- unwind:
+unwind:
         STACK_UNWIND_STRICT (lookup, frame,
                              op_ret, op_errno, inode, stbuf, xattr, postparent);
 
@@ -2037,6 +2285,10 @@ br_stub_lookup (call_frame_t *frame,
         uint64_t ctx_addr = 0;
         gf_boolean_t xref = _gf_false;
 
+        GF_VALIDATE_OR_GOTO ("bit-rot-stub", this, unwind);
+        GF_VALIDATE_OR_GOTO (this->name, loc, unwind);
+        GF_VALIDATE_OR_GOTO (this->name, loc->inode, unwind);
+
         ret = br_stub_get_inode_ctx (this, loc->inode, &ctx_addr);
         if (ret < 0)
                 ctx_addr = 0;
@@ -2069,6 +2321,9 @@ br_stub_lookup (call_frame_t *frame,
         ret = dict_set_uint32 (xdata, BITROT_SIGNING_VERSION_KEY, 0);
         if (ret)
                 goto unwind;
+        ret = dict_set_uint32 (xdata, BITROT_OBJECT_BAD_KEY, 0);
+        if (ret)
+                goto unwind;
         cookie = (void *) BR_STUB_REQUEST_COOKIE;
 
  wind:
@@ -2335,6 +2590,9 @@ struct xlator_fops fops = {
         .truncate  = br_stub_truncate,
         .ftruncate = br_stub_ftruncate,
         .mknod     = br_stub_mknod,
+        .readv     = br_stub_readv,
+        .removexattr = br_stub_removexattr,
+        .fremovexattr = br_stub_fremovexattr,
 };
 
 struct xlator_cbks cbks = {
diff --git a/xlators/features/bit-rot/src/stub/bit-rot-stub.h b/xlators/features/bit-rot/src/stub/bit-rot-stub.h
index 48c7a37..e5649fc 100644
--- a/xlators/features/bit-rot/src/stub/bit-rot-stub.h
+++ b/xlators/features/bit-rot/src/stub/bit-rot-stub.h
@@ -37,6 +37,7 @@ typedef struct br_stub_inode_ctx {
         int            info_sign;
         struct list_head fd_list; /* list of open fds or fds participating in
                                      write operations */
+        gf_boolean_t bad_object;
 } br_stub_inode_ctx_t;
 
 typedef struct br_stub_fd {
@@ -85,6 +86,18 @@ typedef struct br_stub_private {
         struct mem_pool *local_pool;
 } br_stub_private_t;
 
+static inline gf_boolean_t
+__br_stub_is_bad_object (br_stub_inode_ctx_t *ctx)
+{
+        return ctx->bad_object;
+}
+
+static inline void
+__br_stub_mark_object_bad (br_stub_inode_ctx_t *ctx)
+{
+        ctx->bad_object = _gf_true;
+}
+
 /* inode writeback helpers */
 static inline void
 __br_stub_mark_inode_dirty (br_stub_inode_ctx_t *ctx)
@@ -370,12 +383,93 @@ static inline void
 br_stub_remove_vxattrs (dict_t *xattr)
 {
         if (xattr) {
+                dict_del (xattr, BITROT_OBJECT_BAD_KEY);
                 dict_del (xattr, BITROT_CURRENT_VERSION_KEY);
                 dict_del (xattr, BITROT_SIGNING_VERSION_KEY);
                 dict_del (xattr, BITROT_SIGNING_XATTR_SIZE_KEY);
         }
 }
 
+#define BR_STUB_HANDLE_BAD_OBJECT(this, inode, op_ret, op_errno, label) \
+        do {                                                            \
+                if (br_stub_is_bad_object (this, inode)) {              \
+                        gf_msg (this->name, GF_LOG_ERROR, 0,            \
+                                 BRS_MSG_BAD_OBJECT_ACCESS,             \
+                                 "%s is a bad object. Returning",       \
+                                 uuid_utoa (inode->gfid));              \
+                        op_ret = -1;                                    \
+                        op_errno = EIO;                                 \
+                        goto label;                                     \
+                }                                                       \
+        } while (0)
+
+static inline gf_boolean_t
+br_stub_is_bad_object (xlator_t *this, inode_t *inode)
+{
+        gf_boolean_t         bad_object = _gf_false;
+        uint64_t             ctx_addr   = 0;
+        br_stub_inode_ctx_t *ctx        = NULL;
+        int32_t              ret        = -1;
+
+        ret = br_stub_get_inode_ctx (this, inode, &ctx_addr);
+        if (ret) {
+                gf_msg (this->name, GF_LOG_ERROR, 0, BRS_MSG_SET_CONTEXT_FAILED,
+                        "failed to get the inode context for the inode %s",
+                        uuid_utoa (inode->gfid));
+                goto out;
+        }
+
+        ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+        LOCK (&inode->lock);
+        {
+                bad_object = __br_stub_is_bad_object (ctx);
+        }
+        UNLOCK (&inode->lock);
+
+out:
+        return bad_object;
+}
+
+static inline int32_t
+br_stub_mark_object_bad (xlator_t *this, inode_t *inode)
+{
+        int32_t  ret = -1;
+        uint64_t ctx_addr = 0;
+        br_stub_inode_ctx_t *ctx = NULL;
+
+        ret = br_stub_get_inode_ctx (this, inode, &ctx_addr);
+        if (ret) {
+                gf_msg (this->name, GF_LOG_ERROR, 0,
+                        BRS_MSG_GET_INODE_CONTEXT_FAILED, "failed to get the "
+                        "inode context for the inode %s",
+                        uuid_utoa (inode->gfid));
+               goto out;
+        }
+
+        ctx = (br_stub_inode_ctx_t *)(long)ctx_addr;
+
+        LOCK (&inode->lock);
+        {
+                __br_stub_mark_object_bad (ctx);
+        }
+        UNLOCK (&inode->lock);
+
+out:
+        return ret;
+}
+
+static inline int32_t
+br_stub_mark_xdata_bad_object (xlator_t *this, inode_t *inode, dict_t *xdata)
+{
+        int32_t    ret = 0;
+
+        if (br_stub_is_bad_object (this, inode))
+                ret = dict_set_int32 (xdata, GLUSTERFS_BAD_INODE, 1);
+
+        return ret;
+}
+
 int32_t
 br_stub_add_fd_to_inode (xlator_t *this, fd_t *fd, br_stub_inode_ctx_t *ctx);
 
diff --git a/xlators/performance/quick-read/src/quick-read.c b/xlators/performance/quick-read/src/quick-read.c
index c6913ee..1426ae5 100644
--- a/xlators/performance/quick-read/src/quick-read.c
+++ b/xlators/performance/quick-read/src/quick-read.c
@@ -409,6 +409,11 @@ qr_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                 goto out;
 	}
 
+        if (dict_get (xdata, GLUSTERFS_BAD_INODE)) {
+                qr_inode_prune (this, inode);
+                goto out;
+        }
+
 	if (dict_get (xdata, "sh-failed")) {
 		qr_inode_prune (this, inode);
 		goto out;
-- 
1.7.1