Blob Blame History Raw
From c4b8c629f738a8d9e020adccd70a4893c9e42fb8 Mon Sep 17 00:00:00 2001
From: Poornima G <pgurusid@redhat.com>
Date: Wed, 12 Apr 2017 15:24:14 +0530
Subject: [PATCH 392/393] Implement negative lookup cache

Before creating any file negative lookups(1 in Fuse, 4 in SMB etc.)
are sent to verify if the file already exists. By serving these
lookups from the cache when possible, increases the create
performance by multiple folds in SMB access and some percentage
in Fuse/NFS access.

Feature page: https://review.gluster.org/#/c/16436

>Signed-off-by: Poornima G <pgurusid@redhat.com>
>Reviewed-on: https://review.gluster.org/16952
>Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
>Tested-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
>Smoke: Gluster Build System <jenkins@build.gluster.org>
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>

Updates #82
Change-Id: Ib1c0e7ac7a386f943d84f6398c27f9a03665b2a4
BUG: 1427099
Signed-off-by: Poornima G <pgurusid@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/103884
Reviewed-by: Milind Changire <mchangir@redhat.com>
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
---
 configure.ac                                       |    2 +
 glusterfs.spec.in                                  |    1 +
 libglusterfs/src/glfs-message-id.h                 |    4 +
 libglusterfs/src/globals.h                         |    4 +-
 tests/basic/nl-cache.t                             |   64 ++
 xlators/mgmt/glusterd/src/glusterd-volume-set.c    |   31 +
 xlators/performance/Makefile.am                    |    2 +-
 xlators/performance/nl-cache/Makefile.am           |    3 +
 xlators/performance/nl-cache/src/Makefile.am       |   12 +
 xlators/performance/nl-cache/src/nl-cache-helper.c | 1142 ++++++++++++++++++++
 .../performance/nl-cache/src/nl-cache-mem-types.h  |   29 +
 .../performance/nl-cache/src/nl-cache-messages.h   |   34 +
 xlators/performance/nl-cache/src/nl-cache.c        |  775 +++++++++++++
 xlators/performance/nl-cache/src/nl-cache.h        |  173 +++
 14 files changed, 2274 insertions(+), 2 deletions(-)
 create mode 100755 tests/basic/nl-cache.t
 create mode 100644 xlators/performance/nl-cache/Makefile.am
 create mode 100644 xlators/performance/nl-cache/src/Makefile.am
 create mode 100644 xlators/performance/nl-cache/src/nl-cache-helper.c
 create mode 100644 xlators/performance/nl-cache/src/nl-cache-mem-types.h
 create mode 100644 xlators/performance/nl-cache/src/nl-cache-messages.h
 create mode 100644 xlators/performance/nl-cache/src/nl-cache.c
 create mode 100644 xlators/performance/nl-cache/src/nl-cache.h

diff --git a/configure.ac b/configure.ac
index 2c25be6..b5357dc 100644
--- a/configure.ac
+++ b/configure.ac
@@ -101,6 +101,8 @@ AC_CONFIG_FILES([Makefile
                 xlators/performance/md-cache/src/Makefile
                 xlators/performance/decompounder/Makefile
                 xlators/performance/decompounder/src/Makefile
+                xlators/performance/nl-cache/Makefile
+                xlators/performance/nl-cache/src/Makefile
                 xlators/debug/Makefile
                 xlators/debug/trace/Makefile
                 xlators/debug/trace/src/Makefile
diff --git a/glusterfs.spec.in b/glusterfs.spec.in
index c313d2f..da334d5 100644
--- a/glusterfs.spec.in
+++ b/glusterfs.spec.in
@@ -1128,6 +1128,7 @@ exit 0
 %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/performance/readdir-ahead.so
 %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/performance/stat-prefetch.so
 %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/performance/write-behind.so
+%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/performance/nl-cache.so
 %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/system/posix-acl.so
 %dir %{_localstatedir}/run/gluster
 %if 0%{?_tmpfilesdir:1}
diff --git a/libglusterfs/src/glfs-message-id.h b/libglusterfs/src/glfs-message-id.h
index de7c494..a511f0d 100644
--- a/libglusterfs/src/glfs-message-id.h
+++ b/libglusterfs/src/glfs-message-id.h
@@ -179,6 +179,10 @@ GLFS_MSGID_COMP_SYMLINK_CACHE_END
 #define GLFS_MSGID_COMP_LEASES_END         (GLFS_MSGID_COMP_LEASES +\
                                            GLFS_MSGID_SEGMENT)
 
+#define GLFS_MSGID_COMP_NLC                GLFS_MSGID_COMP_LEASES_END
+#define GLFS_MSGID_COMP_NLC_END            (GLFS_MSGID_COMP_NLC +\
+                                           GLFS_MSGID_SEGMENT)
+
 /* --- new segments for messages goes above this line --- */
 
 #endif /* !_GLFS_MESSAGE_ID_H_ */
diff --git a/libglusterfs/src/globals.h b/libglusterfs/src/globals.h
index 5d57a42..a1ec8f0 100644
--- a/libglusterfs/src/globals.h
+++ b/libglusterfs/src/globals.h
@@ -43,7 +43,7 @@
  */
 #define GD_OP_VERSION_MIN  1 /* MIN is the fresh start op-version, mostly
                                 should not change */
-#define GD_OP_VERSION_MAX  GD_OP_VERSION_3_10_1 /* MAX VERSION is the maximum
+#define GD_OP_VERSION_MAX  GD_OP_VERSION_3_11_0 /* MAX VERSION is the maximum
                                                   count in VME table, should
                                                   keep changing with
                                                   introduction of newer
@@ -89,6 +89,8 @@
 
 #define GD_OP_VERSION_3_10_1   31001 /* Op-version for GlusterFS 3.10.1 */
 
+#define GD_OP_VERSION_3_11_0   31100 /* Op-version for GlusterFS 3.11.0 */
+
 #include "xlator.h"
 
 /* THIS */
diff --git a/tests/basic/nl-cache.t b/tests/basic/nl-cache.t
new file mode 100755
index 0000000..ddd4e25
--- /dev/null
+++ b/tests/basic/nl-cache.t
@@ -0,0 +1,64 @@
+#!/bin/bash
+
+. $(dirname $0)/../include.rc
+. $(dirname $0)/../volume.rc
+
+cleanup;
+
+TEST glusterd
+
+TEST $CLI volume create $V0 $H0:$B0/${V0}{0..4}
+EXPECT 'Created' volinfo_field $V0 'Status'
+
+TEST $CLI volume set $V0 performance.nl-cache on
+TEST $CLI volume set $V0 features.cache-invalidation on
+TEST $CLI volume set $V0 features.cache-invalidation-timeout 600
+
+TEST $CLI volume start $V0;
+EXPECT 'Started' volinfo_field $V0 'Status';
+
+TEST glusterfs --volfile-id=/$V0 --volfile-server=$H0 $M0
+TEST glusterfs --volfile-id=/$V0 --volfile-server=$H0 $M1
+
+TEST ! ls $M0/file2
+TEST touch $M0/file1
+TEST ! ls $M0/file2
+TEST touch $M0/file2
+TEST ls $M0/file2
+TEST rm $M0/file2
+TEST rm $M0/file1
+
+TEST mkdir $M0/dir1
+TEST ! ls -l $M0/dir1/file
+TEST mkdir $M0/dir1/dir2
+TEST ! ls -l $M0/dir1/file
+TEST ! ls -l $M0/dir1/dir2/file
+TEST ls -l $M0/dir1/dir2
+TEST rmdir $M0/dir1/dir2
+TEST rmdir $M0/dir1
+
+TEST ! ls -l $M0/file2
+TEST touch $M1/file2
+TEST ls -l $M0/file2
+TEST rm $M1/file2
+
+TEST ! ls -l $M0/dir1
+TEST mkdir $M1/dir1
+TEST ls -l $M0/dir1
+TEST ! ls -l $M0/dir1/file1
+TEST mkdir $M1/dir1/dir2
+TEST ! ls -l $M0/dir1/file1
+TEST ls -l $M0/dir1/dir2
+TEST ! ls -l $M1/dir1/file1
+
+TEST touch $M0/dir1/file
+TEST ln $M0/dir1/file $M0/dir1/file_link
+TEST ls -l $M1/dir1/file
+TEST ls -l $M1/dir1/file_link
+TEST rm $M0/dir1/file
+TEST rm $M0/dir1/file_link
+TEST rmdir $M0/dir1/dir2
+TEST rmdir $M0/dir1
+
+cleanup;
+#G_TESTDEF_TEST_STATUS_NETBSD7=BAD_TEST,BUG=000000
diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
index bbb07b0..4ee27b4 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
@@ -1941,6 +1941,16 @@ struct volopt_map_entry glusterd_volopt_map[] = {
           .flags       = OPT_FLAG_CLIENT_OPT | OPT_FLAG_XLATOR_OPT
 
         },
+        { .key         = "performance.nl-cache",
+          .voltype     = "performance/nl-cache",
+          .option      = "!perf",
+          .value       = "off",
+          .op_version  = GD_OP_VERSION_3_11_0,
+          .description = "enable/disable negative entry caching translator in "
+                         "the volume. Enabling this option improves performance"
+                         " of 'create file/directory' workload",
+          .flags       = OPT_FLAG_CLIENT_OPT | OPT_FLAG_XLATOR_OPT
+        },
         { .key         = "performance.stat-prefetch",
           .voltype     = "performance/md-cache",
           .option      = "!perf",
@@ -3104,6 +3114,27 @@ struct volopt_map_entry glusterd_volopt_map[] = {
           .flags       = OPT_FLAG_CLIENT_OPT,
           .op_version  = GD_OP_VERSION_3_9_1,
         },
+        { .key         = "performance.nl-cache-positive-entry",
+          .voltype     = "performance/nl-cache",
+          .value       = "on",
+          .type        = DOC,
+          .flags       = OPT_FLAG_CLIENT_OPT,
+          .op_version  = GD_OP_VERSION_3_11_0,
+          .description = "enable/disable storing of entries that were lookedup"
+                         " and found to be present in the volume, thus lookup"
+                         " on non existant file is served from the cache",
+        },
+        { .key         = "performance.nl-cache-limit",
+          .voltype     = "performance/nl-cache",
+          .value       = "10MB",
+          .flags       = OPT_FLAG_CLIENT_OPT,
+          .op_version  = GD_OP_VERSION_3_11_0,
+        },
+        { .key         = "performance.nl-cache-timeout",
+          .voltype     = "performance/nl-cache",
+          .flags       = OPT_FLAG_CLIENT_OPT,
+          .op_version  = GD_OP_VERSION_3_11_0,
+        },
         { .key        = "disperse.optimistic-change-log",
           .voltype    = "cluster/disperse",
           .type       = NO_DOC,
diff --git a/xlators/performance/Makefile.am b/xlators/performance/Makefile.am
index eb4e32c..e162734 100644
--- a/xlators/performance/Makefile.am
+++ b/xlators/performance/Makefile.am
@@ -1,3 +1,3 @@
-SUBDIRS = write-behind read-ahead readdir-ahead io-threads io-cache symlink-cache quick-read md-cache open-behind decompounder
+SUBDIRS = write-behind read-ahead readdir-ahead io-threads io-cache symlink-cache quick-read md-cache open-behind decompounder nl-cache
 
 CLEANFILES = 
diff --git a/xlators/performance/nl-cache/Makefile.am b/xlators/performance/nl-cache/Makefile.am
new file mode 100644
index 0000000..a985f42
--- /dev/null
+++ b/xlators/performance/nl-cache/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/performance/nl-cache/src/Makefile.am b/xlators/performance/nl-cache/src/Makefile.am
new file mode 100644
index 0000000..f45e8be
--- /dev/null
+++ b/xlators/performance/nl-cache/src/Makefile.am
@@ -0,0 +1,12 @@
+xlator_LTLIBRARIES = nl-cache.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance
+nl_cache_la_LDFLAGS = -module -avoid-version
+nl_cache_la_SOURCES = nl-cache.c nl-cache-helper.c
+nl_cache_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+noinst_HEADERS = nl-cache.h nl-cache-mem-types.h nl-cache-messages.h
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+        -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src \
+        -I$(CONTRIBDIR)/timer-wheel
+
+AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS)
+CLEANFILES =
diff --git a/xlators/performance/nl-cache/src/nl-cache-helper.c b/xlators/performance/nl-cache/src/nl-cache-helper.c
new file mode 100644
index 0000000..34438ed
--- /dev/null
+++ b/xlators/performance/nl-cache/src/nl-cache-helper.c
@@ -0,0 +1,1142 @@
+/*
+ *   Copyright (c) 2017 Red Hat, Inc. <http://www.redhat.com>
+ *   This file is part of GlusterFS.
+ *
+ *   This file is licensed to you under your choice of the GNU Lesser
+ *   General Public License, version 3 or any later version (LGPLv3 or
+ *   later), or the GNU General Public License, version 2 (GPLv2), in all
+ *   cases as published by the Free Software Foundation.
+ */
+
+#include "nl-cache.h"
+#include "timer-wheel.h"
+#include "statedump.h"
+
+/* Caching guidelines:
+ * This xlator serves negative lookup(ENOENT lookups) from the cache,
+ * there by making create faster.
+ *   What is cached?
+ *      Negative lookup cache is stored for each directory, and has 2 entries:
+ *      - Negative entries: Populated only when lookup/stat returns ENOENT.
+ *        Fuse mostly sends only one lookup before create, hence negative entry
+ *        cache is almost useless. But for SMB access, multiple lookups/stats
+ *        are sent before creating the file. Hence the negative entry cache.
+ *        It can exist even when the positive entry cache is invalid. It also
+ *        has the entries that were deleted from this directory.
+ *        Freed on recieving upcall(with dentry change flag) or on expiring
+ *        timeout of the cache.
+ *
+ *      - Positive entries: Populated as a part of readdirp, and as a part of
+ *        mkdir followed by creates inside that directory. Lookups and other
+ *        fops do not populate the positive entry (as it can grow long and is
+ *        of no value add)
+ *        Freed on recieving upcall(with dentry change flag) or on expiring
+ *        timeout of the cache.
+ *
+ *   Data structures to store cache?
+ *      The cache of any directory is stored in the inode_ctx of the directory.
+ *      Negative entries are stored as list of strings.
+ *             Search - O(n)
+ *             Add    - O(1)
+ *             Delete - O(n) - as it has to be searched before deleting
+ *      Positive entries are stored as a list, each list node has a pointer
+ *          to the inode of the positive entry or the name of the entry.
+ *          Since the client side inode table already will have inodes for
+ *          positive entries, we just take a ref of that inode and store as
+ *          positive entry cache. In cases like hardlinks and readdirp where
+ *          inode is NULL, we store the names.
+ *          Name Search - O(n)
+ *          Inode Search - O(1) - Actually complexity of inode_find()
+ *          Name/inode Add - O(1)
+ *          Name Delete - O(n)
+ *          Inode Delete - O(1)
+ *
+ * Locking order:
+ *
+ * TODO:
+ * - Fill Positive entries on readdir/p, after which in lookup_cbk check if the
+ *   name is in PE and replace it with inode.
+ * - fini, PARENET_DOWN, disable caching
+ * - Virtual setxattr to dump the inode_ctx, to ease debugging
+ * - Handle dht_nuke xattr: clear all cache
+ * - Special handling for .meta and .trashcan?
+ */
+
+int __nlc_inode_ctx_timer_start (xlator_t *this, inode_t *inode, nlc_ctx_t *nlc_ctx);
+int __nlc_add_to_lru (xlator_t *this, inode_t *inode, nlc_ctx_t *nlc_ctx);
+void nlc_remove_from_lru (xlator_t *this, inode_t *inode);
+void __nlc_inode_ctx_timer_delete (xlator_t *this, nlc_ctx_t *nlc_ctx);
+gf_boolean_t __nlc_search_ne (nlc_ctx_t *nlc_ctx, const char *name);
+
+static int32_t
+nlc_get_cache_timeout (xlator_t *this)
+{
+        nlc_conf_t *conf = NULL;
+
+        conf = this->private;
+
+        /* Cache timeout is generally not meant to be changed often,
+         * once set, hence not within locks */
+        return conf->cache_timeout;
+}
+
+
+static gf_boolean_t
+__nlc_is_cache_valid (xlator_t *this, nlc_ctx_t *nlc_ctx)
+{
+        nlc_conf_t   *conf         = NULL;
+        time_t       last_val_time;
+        gf_boolean_t ret           = _gf_false;
+
+        GF_VALIDATE_OR_GOTO (this->name, nlc_ctx, out);
+
+        conf = this->private;
+
+        LOCK (&conf->lock);
+        {
+                last_val_time = conf->last_child_down;
+        }
+        UNLOCK (&conf->lock);
+
+        if (last_val_time <= nlc_ctx->cache_time)
+                ret = _gf_true;
+out:
+        return ret;
+}
+
+
+void
+nlc_update_child_down_time (xlator_t *this, time_t *now)
+{
+        nlc_conf_t *conf = NULL;
+
+        conf = this->private;
+
+        LOCK (&conf->lock);
+        {
+                conf->last_child_down = *now;
+        }
+        UNLOCK (&conf->lock);
+
+        return;
+}
+
+
+void
+nlc_disable_cache (xlator_t *this)
+{
+        nlc_conf_t *conf = NULL;
+
+        conf = this->private;
+
+        LOCK (&conf->lock);
+        {
+                conf->disable_cache = _gf_true;
+        }
+        UNLOCK (&conf->lock);
+
+        return;
+}
+
+
+static int
+__nlc_inode_ctx_get (xlator_t *this, inode_t *inode, nlc_ctx_t **nlc_ctx_p,
+                     nlc_pe_t **nlc_pe_p)
+{
+        int              ret = 0;
+        nlc_ctx_t        *nlc_ctx = NULL;
+        nlc_pe_t         *nlc_pe = NULL;
+        uint64_t         nlc_ctx_int = 0;
+        uint64_t         nlc_pe_int = 0;
+
+        ret = __inode_ctx_get2 (inode, this, &nlc_ctx_int, &nlc_pe_int);
+        if (ret == 0 && nlc_ctx_p) {
+                nlc_ctx = (void *) (long) (nlc_ctx_int);
+                *nlc_ctx_p = nlc_ctx;
+        }
+        if (ret == 0 && nlc_pe_p) {
+                nlc_pe = (void *) (long) (&nlc_pe_int);
+                *nlc_pe_p = nlc_pe;
+        }
+        return ret;
+}
+
+
+static int
+nlc_inode_ctx_set (xlator_t *this, inode_t *inode, nlc_ctx_t *nlc_ctx,
+                   nlc_pe_t *nlc_pe_p)
+{
+        int   ret = -1;
+
+        /* The caller may choose to set one of the ctxs, hence check
+         * if the ctx1/2 is non zero and then send the adress. If we
+         * blindly send the address of both the ctxs, it may reset the
+         * ctx the caller had sent NULL(intended as leave untouched) for.*/
+        LOCK(&inode->lock);
+        {
+                ret = __inode_ctx_set2 (inode, this,
+                                        nlc_ctx ? (uint64_t *) &nlc_ctx : 0,
+                                        nlc_pe_p ? (uint64_t *) &nlc_pe_p : 0);
+        }
+        UNLOCK(&inode->lock);
+        return ret;
+}
+
+
+static void
+nlc_inode_ctx_get (xlator_t *this, inode_t *inode, nlc_ctx_t **nlc_ctx_p,
+                   nlc_pe_t **nlc_pe_p)
+{
+        int ret = 0;
+
+        LOCK (&inode->lock);
+        {
+                ret = __nlc_inode_ctx_get (this, inode, nlc_ctx_p, nlc_pe_p);
+                if (ret < 0)
+                        gf_msg_debug (this->name, 0, "inode ctx get failed for "
+                                      "inode:%p", inode);
+        }
+        UNLOCK (&inode->lock);
+
+        return;
+}
+
+
+static nlc_ctx_t *
+nlc_inode_ctx_get_set (xlator_t *this, inode_t *inode, nlc_ctx_t **nlc_ctx_p,
+                       nlc_pe_t **nlc_pe_p)
+{
+        int                         ret    = 0;
+        nlc_ctx_t                  *nlc_ctx = NULL;
+        nlc_conf_t                 *conf   = NULL;
+
+        conf = this->private;
+
+        LOCK (&inode->lock);
+        {
+                ret = __nlc_inode_ctx_get (this, inode, &nlc_ctx, nlc_pe_p);
+                if (nlc_ctx)
+                        goto unlock;
+
+                nlc_ctx = GF_CALLOC (sizeof (*nlc_ctx), 1, gf_nlc_mt_nlc_ctx_t);
+                if (!nlc_ctx)
+                        goto unlock;
+
+                LOCK_INIT (&nlc_ctx->lock);
+                INIT_LIST_HEAD (&nlc_ctx->pe);
+                INIT_LIST_HEAD (&nlc_ctx->ne);
+
+                ret = __nlc_inode_ctx_timer_start (this, inode, nlc_ctx);
+                if (ret < 0)
+                        goto unlock;
+
+                ret = __nlc_add_to_lru (this, inode, nlc_ctx);
+                if (ret < 0) {
+                        __nlc_inode_ctx_timer_delete (this, nlc_ctx);
+                        goto unlock;
+                }
+
+                ret = __inode_ctx_set2 (inode, this, (uint64_t *) &nlc_ctx, NULL);
+                if (ret) {
+                        gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
+                                NLC_MSG_NO_MEMORY, "inode ctx set failed");
+                        __nlc_inode_ctx_timer_delete (this, nlc_ctx);
+                        nlc_remove_from_lru (this, inode);
+                        goto unlock;
+                }
+
+                /*TODO: also sizeof (gf_tw_timer_list) + nlc_timer_data_t ?*/
+                nlc_ctx->cache_size = sizeof (*nlc_ctx);
+                GF_ATOMIC_ADD (conf->current_cache_size, nlc_ctx->cache_size);
+        }
+unlock:
+        UNLOCK (&inode->lock);
+
+        if (ret == 0 && nlc_ctx_p)
+                *nlc_ctx_p = nlc_ctx;
+
+        if (ret < 0 && nlc_ctx) {
+                LOCK_DESTROY (&nlc_ctx->lock);
+                GF_FREE (nlc_ctx);
+                nlc_ctx = NULL;
+                goto out;
+        }
+out:
+        return nlc_ctx;
+}
+
+
+nlc_local_t *
+nlc_local_init (call_frame_t *frame, xlator_t *this, glusterfs_fop_t fop,
+                loc_t *loc, loc_t *loc2)
+{
+        nlc_local_t *local = NULL;
+
+        local = GF_CALLOC (sizeof (*local), 1, gf_nlc_mt_nlc_local_t);
+        if (!local)
+                goto out;
+
+        if (loc)
+                loc_copy (&local->loc, loc);
+        if (loc2)
+                loc_copy (&local->loc2, loc2);
+
+        local->fop = fop;
+        frame->local = local;
+out:
+        return local;
+}
+
+
+void
+nlc_local_wipe (xlator_t *this, nlc_local_t *local)
+{
+        if (!local)
+                goto out;
+
+        loc_wipe (&local->loc);
+
+        loc_wipe (&local->loc2);
+
+        GF_FREE (local);
+out:
+        return;
+}
+
+
+static void
+__nlc_set_dir_state (nlc_ctx_t *nlc_ctx, uint64_t new_state)
+{
+        nlc_ctx->state |= new_state;
+
+        return;
+}
+
+
+void
+nlc_set_dir_state (xlator_t *this, inode_t *inode, uint64_t state)
+{
+        nlc_ctx_t        *nlc_ctx = NULL;
+
+        if (inode->ia_type != IA_IFDIR) {
+                gf_msg_callingfn (this->name, GF_LOG_ERROR, EINVAL,
+                                  NLC_MSG_EINVAL, "inode is not of type dir");
+                goto out;
+        }
+
+        nlc_inode_ctx_get_set (this, inode, &nlc_ctx, NULL);
+        if (!nlc_ctx)
+                goto out;
+
+        LOCK (&nlc_ctx->lock);
+        {
+                __nlc_set_dir_state (nlc_ctx, state);
+        }
+        UNLOCK (&nlc_ctx->lock);
+out:
+        return;
+}
+
+
+static void
+nlc_cache_timeout_handler (struct gf_tw_timer_list *timer,
+                           void *data, unsigned long calltime)
+{
+        nlc_timer_data_t *tmp = data;
+
+        nlc_inode_clear_cache (tmp->this, tmp->inode, NLC_TIMER_EXPIRED);
+        inode_unref (tmp->inode);
+
+        GF_FREE (tmp);
+        GF_FREE (timer);
+
+        return;
+}
+
+
+void
+__nlc_inode_ctx_timer_delete (xlator_t *this, nlc_ctx_t *nlc_ctx)
+{
+        nlc_conf_t                  *conf   = NULL;
+
+        conf = this->private;
+
+        gf_tw_del_timer (conf->timer_wheel, nlc_ctx->timer);
+
+        inode_unref (nlc_ctx->timer_data->inode);
+        GF_FREE (nlc_ctx->timer_data);
+
+        GF_FREE (nlc_ctx->timer);
+        nlc_ctx->timer = NULL;
+
+        return;
+}
+
+
+int
+__nlc_inode_ctx_timer_start (xlator_t *this, inode_t *inode, nlc_ctx_t *nlc_ctx)
+{
+        struct gf_tw_timer_list    *timer  = NULL;
+        nlc_timer_data_t           *tmp    = NULL;
+        nlc_conf_t                 *conf   = NULL;
+        int                         ret    = -1;
+
+        conf = this->private;
+
+        /* We are taking inode_table->lock within inode->lock
+         * as the only other caller which takes inode->lock within
+         * inode_table->lock and cause deadlock is inode_table_destroy.
+         * Hopefully, there can be no fop when inode_table_destroy is
+         * being called. */
+        tmp = GF_CALLOC (1, sizeof (*tmp), gf_nlc_mt_nlc_timer_data_t);
+        if (!tmp)
+                goto out;
+        tmp->inode = inode_ref (inode);
+        tmp->this = this;
+
+        timer = GF_CALLOC (1, sizeof (*timer),
+                           gf_common_mt_tw_timer_list);
+        if (!timer)
+                goto out;
+
+        INIT_LIST_HEAD (&timer->entry);
+        timer->expires = nlc_get_cache_timeout (this);
+        timer->function = nlc_cache_timeout_handler;
+        timer->data = tmp;
+        nlc_ctx->timer = timer;
+        nlc_ctx->timer_data = tmp;
+        gf_tw_add_timer (conf->timer_wheel, timer);
+
+        time (&nlc_ctx->cache_time);
+        gf_msg_trace (this->name, 0, "Registering timer:%p, inode:%p, "
+                      "gfid:%s", timer, inode, uuid_utoa (inode->gfid));
+
+        ret = 0;
+
+out:
+        if (ret < 0) {
+                if (tmp && tmp->inode)
+                        inode_unref (tmp->inode);
+                GF_FREE (tmp);
+                GF_FREE (timer);
+        }
+
+        return ret;
+}
+
+
+int
+__nlc_add_to_lru (xlator_t *this, inode_t *inode, nlc_ctx_t *nlc_ctx)
+{
+        nlc_lru_node_t              *lru_ino   = NULL;
+        uint64_t                    nlc_pe_int = 0;
+        nlc_conf_t                  *conf      = NULL;
+        int                          ret       = -1;
+
+        conf = this->private;
+
+        lru_ino = GF_CALLOC (1, sizeof (*lru_ino), gf_nlc_mt_nlc_lru_node);
+        if (!lru_ino)
+                goto out;
+
+        INIT_LIST_HEAD (&lru_ino->list);
+        lru_ino->inode = inode_ref (inode);
+        LOCK (&conf->lock);
+        {
+                list_add_tail (&lru_ino->list, &conf->lru);
+        }
+        UNLOCK (&conf->lock);
+
+        nlc_ctx->refd_inodes = 0;
+        ret = __inode_ctx_get2 (inode, this, NULL, &nlc_pe_int);
+        if (nlc_pe_int == 0)
+                GF_ATOMIC_ADD (conf->refd_inodes, 1);
+
+        ret = 0;
+
+out:
+        return ret;
+}
+
+
+void
+nlc_remove_from_lru (xlator_t *this, inode_t *inode)
+{
+        nlc_lru_node_t              *lru_node   = NULL;
+        nlc_lru_node_t              *tmp        = NULL;
+        nlc_lru_node_t              *tmp1       = NULL;
+        nlc_conf_t                  *conf       = NULL;
+
+        conf = this->private;
+
+        LOCK (&conf->lock);
+        {
+                list_for_each_entry_safe (lru_node, tmp, &conf->lru, list) {
+                        if (inode == lru_node->inode) {
+                                list_del (&lru_node->list);
+                                tmp1 = lru_node;
+                                break;
+                        }
+                }
+        }
+        UNLOCK (&conf->lock);
+
+        if (tmp1) {
+                inode_unref (tmp1->inode);
+                GF_FREE (tmp1);
+        }
+
+        return;
+}
+
+
+void
+nlc_lru_prune (xlator_t *this, inode_t *inode)
+{
+        nlc_lru_node_t              *lru_node   = NULL;
+        nlc_lru_node_t              *prune_node = NULL;
+        nlc_lru_node_t              *tmp        = NULL;
+        nlc_conf_t                  *conf       = NULL;
+
+        conf = this->private;
+
+        LOCK (&conf->lock);
+        {
+                if ((conf->current_cache_size.cnt < conf->cache_size) &&
+                    (conf->refd_inodes.cnt < conf->inode_limit))
+                        goto unlock;
+
+                list_for_each_entry_safe (lru_node, tmp, &conf->lru, list) {
+                        list_del (&lru_node->list);
+                        prune_node = lru_node;
+                        goto unlock;
+                }
+        }
+unlock:
+        UNLOCK (&conf->lock);
+
+        if (prune_node) {
+                nlc_inode_clear_cache (this, prune_node->inode, NLC_LRU_PRUNE);
+                inode_unref (prune_node->inode);
+                GF_FREE (prune_node);
+        }
+        return;
+}
+
+
+void
+nlc_clear_all_cache (xlator_t *this)
+{
+        nlc_conf_t                  *conf       = NULL;
+        struct list_head            clear_list;
+        nlc_lru_node_t              *prune_node = NULL;
+        nlc_lru_node_t              *tmp        = NULL;
+
+        conf = this->private;
+
+        INIT_LIST_HEAD (&clear_list);
+
+        LOCK (&conf->lock);
+        {
+                list_replace_init (&conf->lru, &clear_list);
+        }
+        UNLOCK (&conf->lock);
+
+        list_for_each_entry_safe (prune_node, tmp, &clear_list, list) {
+                list_del (&prune_node->list);
+                nlc_inode_clear_cache (this, prune_node->inode, NLC_LRU_PRUNE);
+                inode_unref (prune_node->inode);
+                GF_FREE (prune_node);
+        }
+
+        return;
+}
+
+
+static void
+__nlc_free_pe (xlator_t *this, nlc_ctx_t *nlc_ctx, nlc_pe_t *pe)
+{
+        uint64_t          pe_int      = 0;
+        nlc_conf_t       *conf        = NULL;
+        uint64_t         *nlc_ctx_int = NULL;
+
+        conf = this->private;
+
+        if (pe->inode) {
+                inode_ctx_reset1 (pe->inode, this, &pe_int);
+                inode_ctx_get2 (pe->inode, this, nlc_ctx_int, NULL);
+                inode_unref (pe->inode);
+        }
+        list_del (&pe->list);
+
+        nlc_ctx->cache_size -= sizeof (*pe) + sizeof (pe->name);
+        GF_ATOMIC_SUB (conf->current_cache_size,
+                       (sizeof (*pe) + sizeof (pe->name)));
+
+        nlc_ctx->refd_inodes -= 1;
+        if (nlc_ctx_int == 0)
+                GF_ATOMIC_SUB (conf->refd_inodes, 1);
+
+        GF_FREE (pe->name);
+        GF_FREE (pe);
+
+        return;
+}
+
+
+static void
+__nlc_free_ne (xlator_t *this, nlc_ctx_t *nlc_ctx, nlc_ne_t *ne)
+{
+        nlc_conf_t                  *conf   = NULL;
+
+        conf = this->private;
+
+        list_del (&ne->list);
+        GF_FREE (ne->name);
+        GF_FREE (ne);
+
+        nlc_ctx->cache_size -= sizeof (*ne) + sizeof (ne->name);
+        GF_ATOMIC_SUB (conf->current_cache_size,
+                       (sizeof (*ne) + sizeof (ne->name)));
+
+        return;
+}
+
+
+void
+nlc_inode_clear_cache (xlator_t *this, inode_t *inode, int reason)
+{
+        uint64_t         nlc_ctx_int = 0;
+        nlc_ctx_t        *nlc_ctx    = NULL;
+        nlc_pe_t         *pe         = NULL;
+        nlc_pe_t         *tmp        = NULL;
+        nlc_ne_t         *ne         = NULL;
+        nlc_ne_t         *tmp1       = NULL;
+        nlc_conf_t       *conf       = NULL;
+
+        conf = this->private;
+
+        inode_ctx_reset0 (inode, this, &nlc_ctx_int);
+        if (nlc_ctx_int == 0)
+                goto out;
+
+        nlc_ctx = (void *) (long) nlc_ctx_int;
+
+        if (reason != NLC_LRU_PRUNE)
+                nlc_remove_from_lru (this, inode);
+
+        LOCK (&nlc_ctx->lock);
+        {
+                if (reason != NLC_TIMER_EXPIRED)
+                        __nlc_inode_ctx_timer_delete (this, nlc_ctx);
+
+                if (IS_PE_VALID (nlc_ctx->state))
+                        list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+                                __nlc_free_pe (this, nlc_ctx, pe);
+                        }
+
+                if (IS_NE_VALID (nlc_ctx->state))
+                        list_for_each_entry_safe (ne, tmp1, &nlc_ctx->ne, list) {
+                                __nlc_free_ne (this, nlc_ctx, ne);
+                        }
+        }
+        UNLOCK (&nlc_ctx->lock);
+
+        LOCK_DESTROY (&nlc_ctx->lock);
+
+        nlc_ctx->cache_size -= sizeof (*nlc_ctx);
+        GF_ASSERT (nlc_ctx->cache_size == 0);
+        GF_FREE (nlc_ctx);
+
+        GF_ATOMIC_SUB (conf->current_cache_size, sizeof (*nlc_ctx));
+
+out:
+        return;
+}
+
+
+static void
+__nlc_del_pe (xlator_t *this, nlc_ctx_t *nlc_ctx, inode_t *entry_ino,
+              const char *name, gf_boolean_t multilink)
+{
+        nlc_pe_t         *pe     = NULL;
+        nlc_pe_t         *tmp    = NULL;
+        gf_boolean_t     found  = _gf_false;
+        uint64_t         pe_int = 0;
+
+        if (!IS_PE_VALID (nlc_ctx->state))
+                goto out;
+
+        if (!entry_ino)
+                goto name_search;
+
+        /* If there are hardlinks first search names, followed by inodes */
+        if (multilink) {
+                list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+                        if (pe->name && (strcmp (pe->name, name) == 0)) {
+                                found = _gf_true;
+                                goto out;
+                        }
+                }
+                inode_ctx_reset1 (entry_ino, this, &pe_int);
+                if (pe_int) {
+                        pe = (void *) (long) (pe_int);
+                        found = _gf_true;
+                        goto out;
+                }
+                goto out;
+        }
+
+        inode_ctx_reset1 (entry_ino, this, &pe_int);
+        if (pe_int) {
+                pe = (void *) (long) (pe_int);
+                found = _gf_true;
+                goto out;
+        }
+
+name_search:
+        list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+                if (pe->name && (strcmp (pe->name, name) == 0)) {
+                        found = _gf_true;
+                        break;
+                        /* TODO: can there be duplicates? */
+                }
+        }
+
+out:
+        if (found)
+                __nlc_free_pe (this, nlc_ctx, pe);
+
+        return;
+}
+
+
+static void
+__nlc_del_ne (xlator_t *this, nlc_ctx_t *nlc_ctx, const char *name)
+{
+        nlc_ne_t  *ne     = NULL;
+        nlc_ne_t  *tmp    = NULL;
+
+        if (!IS_NE_VALID (nlc_ctx->state))
+                goto out;
+
+        list_for_each_entry_safe (ne, tmp, &nlc_ctx->ne, list) {
+                if (strcmp (ne->name, name) == 0) {
+                        __nlc_free_ne (this, nlc_ctx, ne);
+                        break;
+                }
+        }
+out:
+        return;
+}
+
+
+static void
+__nlc_add_pe (xlator_t *this, nlc_ctx_t *nlc_ctx, inode_t *entry_ino,
+              const char *name)
+{
+        nlc_pe_t             *pe         = NULL;
+        int                  ret         = -1;
+        nlc_conf_t           *conf       = NULL;
+        uint64_t             nlc_ctx_int = 0;
+
+        conf = this->private;
+
+        /* TODO: There can be no duplicate entries, as it is added only
+        during create. In case there arises duplicate entries, search PE
+        found = __nlc_search (entries, name, _gf_false);
+        can use bit vector to have simple search than sequential search */
+
+        pe = GF_CALLOC (sizeof (*pe), 1, gf_nlc_mt_nlc_pe_t);
+        if (!pe)
+                goto out;
+
+        if (entry_ino) {
+                pe->inode = inode_ref (entry_ino);
+                nlc_inode_ctx_set (this, entry_ino, NULL, pe);
+        } else if (name) {
+                pe->name = gf_strdup (name);
+                if (!pe->name)
+                        goto out;
+        }
+
+        list_add (&pe->list, &nlc_ctx->pe);
+
+        nlc_ctx->cache_size += sizeof (*pe) + sizeof (pe->name);
+        GF_ATOMIC_ADD (conf->current_cache_size,
+                       (sizeof (*pe) + sizeof (pe->name)));
+
+        nlc_ctx->refd_inodes += 1;
+        inode_ctx_get2 (entry_ino, this, &nlc_ctx_int, NULL);
+        if (nlc_ctx_int == 0)
+                GF_ATOMIC_ADD (conf->refd_inodes, 1);
+
+        ret = 0;
+out:
+        if (ret)
+                GF_FREE (pe);
+
+        return;
+}
+
+
+static void
+__nlc_add_ne (xlator_t *this, nlc_ctx_t *nlc_ctx, const char *name)
+{
+        nlc_ne_t                    *ne     = NULL;
+        int                         ret     = -1;
+        nlc_conf_t                  *conf   = NULL;
+
+        conf = this->private;
+
+        /* TODO: search ne before adding to get rid of duplicate entries
+        found = __nlc_search (entries, name, _gf_false);
+        can use bit vector to have faster search than sequential search */
+
+        ne = GF_CALLOC (sizeof (*ne), 1, gf_nlc_mt_nlc_ne_t);
+        if (!ne)
+                goto out;
+
+        ne->name = gf_strdup (name);
+        if (!ne->name)
+                goto out;
+
+        list_add (&ne->list, &nlc_ctx->ne);
+
+        nlc_ctx->cache_size += sizeof (*ne) + sizeof (ne->name);
+        GF_ATOMIC_ADD (conf->current_cache_size,
+                       (sizeof (*ne) + sizeof (ne->name)));
+        ret = 0;
+out:
+        if (ret)
+                GF_FREE (ne);
+
+        return;
+}
+
+
+void
+nlc_dir_add_ne (xlator_t *this, inode_t *inode, const char *name)
+{
+        nlc_ctx_t        *nlc_ctx = NULL;
+
+        if (inode->ia_type != IA_IFDIR) {
+                gf_msg_callingfn (this->name, GF_LOG_ERROR, EINVAL,
+                                  NLC_MSG_EINVAL, "inode is not of type dir");
+                goto out;
+        }
+
+        nlc_inode_ctx_get_set (this, inode, &nlc_ctx, NULL);
+        if (!nlc_ctx)
+                goto out;
+
+        LOCK (&nlc_ctx->lock);
+        {
+                /* There is one possiblility where we need to search before
+                 * adding NE: when there are two parallel lookups on a non
+                 * existant file */
+                if (!__nlc_search_ne (nlc_ctx, name)) {
+                        __nlc_add_ne (this, nlc_ctx, name);
+                        __nlc_set_dir_state (nlc_ctx, NLC_NE_VALID);
+                }
+        }
+        UNLOCK (&nlc_ctx->lock);
+out:
+        return;
+}
+
+
+void
+nlc_dir_remove_pe (xlator_t *this, inode_t *parent, inode_t *entry_ino,
+                   const char *name, gf_boolean_t multilink)
+{
+        nlc_ctx_t        *nlc_ctx = NULL;
+
+        if (parent->ia_type != IA_IFDIR) {
+                gf_msg_callingfn (this->name, GF_LOG_ERROR, EINVAL,
+                                  NLC_MSG_EINVAL, "inode is not of type dir");
+                goto out;
+        }
+
+        nlc_inode_ctx_get (this, parent, &nlc_ctx, NULL);
+        if (!nlc_ctx)
+                goto out;
+
+        LOCK (&nlc_ctx->lock);
+        {
+                __nlc_del_pe (this, nlc_ctx, entry_ino, name, multilink);
+                __nlc_add_ne (this, nlc_ctx, name);
+                __nlc_set_dir_state (nlc_ctx, NLC_NE_VALID);
+        }
+        UNLOCK (&nlc_ctx->lock);
+out:
+        return;
+}
+
+
+void
+nlc_dir_add_pe (xlator_t *this, inode_t *inode, inode_t *entry_ino,
+                const char *name)
+{
+        nlc_ctx_t        *nlc_ctx = NULL;
+
+        if (inode->ia_type != IA_IFDIR) {
+                gf_msg_callingfn (this->name, GF_LOG_ERROR, EINVAL,
+                                  NLC_MSG_EINVAL, "inode is not of type dir");
+                goto out;
+        }
+
+        nlc_inode_ctx_get_set (this, inode, &nlc_ctx, NULL);
+        if (!nlc_ctx)
+                goto out;
+
+        LOCK (&nlc_ctx->lock);
+        {
+                __nlc_del_ne (this, nlc_ctx, name);
+                __nlc_add_pe (this, nlc_ctx, entry_ino, name);
+                if (!IS_PE_VALID (nlc_ctx->state))
+                        __nlc_set_dir_state (nlc_ctx, NLC_PE_PARTIAL);
+        }
+        UNLOCK (&nlc_ctx->lock);
+out:
+        return;
+}
+
+
+gf_boolean_t
+__nlc_search_ne (nlc_ctx_t *nlc_ctx, const char *name)
+{
+        gf_boolean_t  found = _gf_false;
+        nlc_ne_t     *ne    = NULL;
+        nlc_ne_t     *tmp   = NULL;
+
+        if (!IS_NE_VALID (nlc_ctx->state))
+                goto out;
+
+        list_for_each_entry_safe (ne, tmp, &nlc_ctx->ne, list) {
+                if (strcmp (ne->name, name) == 0) {
+                        found = _gf_true;
+                        break;
+                }
+        }
+out:
+        return found;
+}
+
+
+static gf_boolean_t
+__nlc_search_pe (nlc_ctx_t *nlc_ctx, const char *name)
+{
+        gf_boolean_t   found = _gf_false;
+        nlc_pe_t      *pe    = NULL;
+        nlc_pe_t      *tmp   = NULL;
+
+        if (!IS_PE_VALID (nlc_ctx->state))
+                goto out;
+
+        list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+               if (pe->name && (strcmp (pe->name, name) == 0)) {
+                        found = _gf_true;
+                        break;
+               }
+        }
+out:
+        return found;
+}
+
+
+static char *
+__nlc_get_pe (nlc_ctx_t *nlc_ctx, const char *name, gf_boolean_t case_insensitive)
+{
+        char          *found = NULL;
+        nlc_pe_t      *pe    = NULL;
+        nlc_pe_t      *tmp   = NULL;
+
+        if (!IS_PE_VALID (nlc_ctx->state))
+                goto out;
+
+        if (case_insensitive) {
+                list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+                        if (pe->name &&
+                            (strcasecmp (pe->name, name) == 0)) {
+                                found = pe->name;
+                                break;
+                        }
+                }
+        } else {
+                list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+                        if (pe->name &&
+                            (strcmp (pe->name, name) == 0)) {
+                                found = pe->name;
+                                break;
+                        }
+               }
+        }
+out:
+        return found;
+}
+
+
+gf_boolean_t
+nlc_is_negative_lookup (xlator_t *this, loc_t *loc)
+{
+        nlc_ctx_t       *nlc_ctx   = NULL;
+        inode_t         *inode     = NULL;
+        gf_boolean_t     neg_entry = _gf_false;
+
+        inode = loc->parent;
+        GF_VALIDATE_OR_GOTO (this->name, inode, out);
+
+        if (inode->ia_type != IA_IFDIR) {
+                gf_msg_callingfn (this->name, GF_LOG_ERROR, EINVAL,
+                                  NLC_MSG_EINVAL, "inode is not of type dir");
+                goto out;
+        }
+
+        nlc_inode_ctx_get (this, inode, &nlc_ctx, NULL);
+        if (!nlc_ctx)
+                goto out;
+
+        LOCK (&nlc_ctx->lock);
+        {
+                if (!__nlc_is_cache_valid (this, nlc_ctx))
+                        goto unlock;
+
+                if (__nlc_search_ne (nlc_ctx, loc->name)) {
+                        neg_entry = _gf_true;
+                        goto unlock;
+                }
+                if ((nlc_ctx->state & NLC_PE_FULL) &&
+                    !__nlc_search_pe (nlc_ctx, loc->name)) {
+                        neg_entry = _gf_true;
+                        goto unlock;
+                }
+        }
+unlock:
+        UNLOCK (&nlc_ctx->lock);
+
+out:
+        return neg_entry;
+}
+
+
+gf_boolean_t
+nlc_get_real_file_name (xlator_t *this, loc_t *loc, const char *fname,
+                        int32_t *op_ret, int32_t *op_errno, dict_t *dict)
+{
+        nlc_ctx_t        *nlc_ctx     = NULL;
+        inode_t         *inode      = NULL;
+        gf_boolean_t     hit        = _gf_false;
+        char            *found_file = NULL;
+        int              ret        = 0;
+
+        GF_VALIDATE_OR_GOTO (this->name, loc, out);
+        GF_VALIDATE_OR_GOTO (this->name, fname, out);
+        GF_VALIDATE_OR_GOTO (this->name, op_ret, out);
+        GF_VALIDATE_OR_GOTO (this->name, op_errno, out);
+        GF_VALIDATE_OR_GOTO (this->name, dict, out);
+
+        inode = loc->inode;
+        GF_VALIDATE_OR_GOTO (this->name, inode, out);
+
+        if (inode->ia_type != IA_IFDIR) {
+                gf_msg_callingfn (this->name, GF_LOG_ERROR, EINVAL,
+                                  NLC_MSG_EINVAL, "inode is not of type dir");
+                goto out;
+        }
+
+        nlc_inode_ctx_get (this, inode, &nlc_ctx, NULL);
+        if (!nlc_ctx)
+                goto out;
+
+        LOCK (&nlc_ctx->lock);
+        {
+                if (!__nlc_is_cache_valid (this, nlc_ctx))
+                        goto unlock;
+
+                found_file = __nlc_get_pe (nlc_ctx, fname, _gf_true);
+                if (found_file) {
+                        ret = dict_set_dynstr (dict, GF_XATTR_GET_REAL_FILENAME_KEY,
+                                               gf_strdup (found_file));
+                        if (ret < 0)
+                                goto unlock;
+                        *op_ret = strlen (found_file) + 1;
+                        hit = _gf_true;
+                        goto unlock;
+                }
+                if (!found_file && (nlc_ctx->state & NLC_PE_FULL)) {
+                        *op_ret = -1;
+                        *op_errno = ENOENT;
+                        hit = _gf_true;
+                        goto unlock;
+                }
+        }
+unlock:
+        UNLOCK (&nlc_ctx->lock);
+
+out:
+        return hit;
+}
+
+
+void
+nlc_dump_inodectx (xlator_t *this, inode_t *inode)
+{
+        int32_t     ret                            = -1;
+        char       *path                           = NULL;
+        char       key_prefix[GF_DUMP_MAX_BUF_LEN] = {0, };
+        char       uuid_str[64]                    = {0,};
+        nlc_ctx_t   *nlc_ctx                         = NULL;
+        nlc_pe_t    *pe                             = NULL;
+        nlc_pe_t    *tmp                            = NULL;
+        nlc_ne_t    *ne                             = NULL;
+        nlc_ne_t    *tmp1                           = NULL;
+
+        nlc_inode_ctx_get (this, inode, &nlc_ctx, NULL);
+
+        if (!nlc_ctx)
+                goto out;
+
+        ret = TRY_LOCK (&nlc_ctx->lock);
+        if (!ret) {
+                gf_proc_dump_build_key (key_prefix,
+                                        "xlator.performance.nl-cache",
+                                        "nlc_inode");
+                gf_proc_dump_add_section (key_prefix);
+
+                __inode_path (inode, NULL, &path);
+                if (path != NULL) {
+                        gf_proc_dump_write ("path", "%s", path);
+                        GF_FREE (path);
+                }
+
+                uuid_utoa_r (inode->gfid, uuid_str);
+
+                gf_proc_dump_write ("inode", "%p", inode);
+                gf_proc_dump_write ("gfid", "%s", uuid_str);
+
+                gf_proc_dump_write ("state", "%"PRIu64, nlc_ctx->state);
+                gf_proc_dump_write ("timer", "%p", nlc_ctx->timer);
+                gf_proc_dump_write ("cache-time", "%lld", nlc_ctx->cache_time);
+
+                if (IS_PE_VALID (nlc_ctx->state))
+                        list_for_each_entry_safe (pe, tmp, &nlc_ctx->pe, list) {
+                                gf_proc_dump_write ("pe", "%p, %s", pe,
+                                                    pe->inode, pe->name);
+                        }
+
+                if (IS_NE_VALID (nlc_ctx->state))
+                        list_for_each_entry_safe (ne, tmp1, &nlc_ctx->ne, list) {
+                                gf_proc_dump_write ("ne", "%s", ne->name);
+                        }
+
+                 UNLOCK (&nlc_ctx->lock);
+        }
+
+        if (ret && nlc_ctx)
+                gf_proc_dump_write ("Unable to dump the inode information",
+                                    "(Lock acquisition failed) %p (gfid: %s)",
+                                    nlc_ctx, uuid_str);
+out:
+        return;
+}
diff --git a/xlators/performance/nl-cache/src/nl-cache-mem-types.h b/xlators/performance/nl-cache/src/nl-cache-mem-types.h
new file mode 100644
index 0000000..20fc030
--- /dev/null
+++ b/xlators/performance/nl-cache/src/nl-cache-mem-types.h
@@ -0,0 +1,29 @@
+/*
+ *   Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ *   This file is part of GlusterFS.
+ *
+ *   This file is licensed to you under your choice of the GNU Lesser
+ *   General Public License, version 3 or any later version (LGPLv3 or
+ *   later), or the GNU General Public License, version 2 (GPLv2), in all
+ *   cases as published by the Free Software Foundation.
+ */
+
+
+#ifndef __NL_CACHe_MEM_TYPES_H__
+#define __NL_CACHE_MEM_TYPES_H__
+
+#include "mem-types.h"
+
+enum gf_nlc_mem_types_ {
+        gf_nlc_mt_conf_t = gf_common_mt_end + 1,
+        gf_nlc_mt_nlc_conf_t,
+        gf_nlc_mt_nlc_ctx_t,
+        gf_nlc_mt_nlc_local_t,
+        gf_nlc_mt_nlc_pe_t,
+        gf_nlc_mt_nlc_ne_t,
+        gf_nlc_mt_nlc_timer_data_t,
+        gf_nlc_mt_nlc_lru_node,
+        gf_nlc_mt_end
+};
+
+#endif /* __NL_CACHE_MEM_TYPES_H__ */
diff --git a/xlators/performance/nl-cache/src/nl-cache-messages.h b/xlators/performance/nl-cache/src/nl-cache-messages.h
new file mode 100644
index 0000000..2e3b894
--- /dev/null
+++ b/xlators/performance/nl-cache/src/nl-cache-messages.h
@@ -0,0 +1,34 @@
+/*
+ *   Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ *   This file is part of GlusterFS.
+ *
+ *   This file is licensed to you under your choice of the GNU Lesser
+ *   General Public License, version 3 or any later version (LGPLv3 or
+ *   later), or the GNU General Public License, version 2 (GPLv2), in all
+ *   cases as published by the Free Software Foundation.
+ */
+
+
+#ifndef __NL_CACHE_MESSAGES_H__
+#define __NL_CACHE_MESSAGES_H__
+
+
+#define GLFS_COMP_BASE_NLC GLFS_MSGID_COMP_NLC
+#define GLFS_NUM_MESSAGES 4
+#define GLFS_MSGID_END (GLFS_COMP_BASE_NLC + GLFS_NUM_MESSAGES + 1)
+
+#define glfs_msg_start_x GLFS_COMP_BASE_NLC, "Invalid: Start of messages"
+
+/*!
+ * @messageid 110001
+ * @diagnosis Out of Memory
+ * @recommendedaction None
+ */
+#define NLC_MSG_NO_MEMORY             (GLFS_COMP_BASE_NLC + 1)
+#define NLC_MSG_EINVAL                (GLFS_COMP_BASE_NLC + 2)
+#define NLC_MSG_NO_TIMER_WHEEL        (GLFS_COMP_BASE_NLC + 3)
+#define NLC_MSG_DICT_FAILURE          (GLFS_COMP_BASE_NLC + 4)
+#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+
+
+#endif /* __NL_CACHE_MESSAGES_H__ */
diff --git a/xlators/performance/nl-cache/src/nl-cache.c b/xlators/performance/nl-cache/src/nl-cache.c
new file mode 100644
index 0000000..a34b752
--- /dev/null
+++ b/xlators/performance/nl-cache/src/nl-cache.c
@@ -0,0 +1,775 @@
+/*
+ *   Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ *   This file is part of GlusterFS.
+ *
+ *   This file is licensed to you under your choice of the GNU Lesser
+ *   General Public License, version 3 or any later version (LGPLv3 or
+ *   later), or the GNU General Public License, version 2 (GPLv2), in all
+ *   cases as published by the Free Software Foundation.
+ */
+
+
+#include "nl-cache.h"
+#include "statedump.h"
+#include "upcall-utils.h"
+#include "tw.h"
+
+static void
+nlc_dentry_op (call_frame_t *frame, xlator_t *this, gf_boolean_t multilink)
+{
+        nlc_local_t *local = frame->local;
+
+        GF_VALIDATE_OR_GOTO (this->name, local, out);
+
+        switch (local->fop) {
+        case GF_FOP_MKDIR:
+                nlc_set_dir_state (this, local->loc.inode, NLC_PE_FULL);
+                /*fall-through*/
+        case GF_FOP_MKNOD:
+        case GF_FOP_CREATE:
+        case GF_FOP_SYMLINK:
+                nlc_dir_add_pe (this, local->loc.parent, local->loc.inode,
+                                local->loc.name);
+                break;
+        case GF_FOP_LINK:
+                nlc_dir_add_pe (this, local->loc2.parent, NULL,
+                                local->loc2.name);
+                break;
+        case GF_FOP_RMDIR:
+                nlc_inode_clear_cache (this, local->loc.inode, _gf_false);
+                /*fall-through*/
+        case GF_FOP_UNLINK:
+                nlc_dir_remove_pe (this, local->loc.parent, local->loc.inode,
+                                   local->loc.name, multilink);
+                break;
+        case GF_FOP_RENAME:
+                /* TBD: Should these be atomic ?  In case of rename, the
+                 * newloc->inode can be NULL, and hence use oldloc->inode */
+                nlc_dir_remove_pe (this, local->loc2.parent, local->loc2.inode,
+                                   local->loc2.name, _gf_false);
+
+                /*TODO: Remove old dentry from destination before adding this pe*/
+                nlc_dir_add_pe (this, local->loc.parent, local->loc2.inode,
+                                local->loc.name);
+
+        default:
+                return;
+        }
+out:
+        return;
+}
+
+#define NLC_FOP(_name, _op, loc1, loc2, frame, this, args ...)  do {    \
+        nlc_local_t      *__local   = NULL;                             \
+        nlc_conf_t       *conf      = NULL;                             \
+                                                                        \
+        conf = this->private;                                           \
+                                                                        \
+        if (!IS_PEC_ENABLED (conf))                                     \
+                goto disabled;                                          \
+                                                                        \
+        __local = nlc_local_init (frame, this, _op, loc1, loc2);        \
+        GF_VALIDATE_OR_GOTO (this->name, __local, err);                 \
+                                                                        \
+        STACK_WIND (frame, nlc_##_name##_cbk,                           \
+                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->_name,  \
+                    args);                                              \
+        break;                                                          \
+disabled:                                                               \
+        default_##_name##_resume (frame, this, args);                   \
+        break;                                                          \
+err:                                                                    \
+        default_##_name##_failure_cbk (frame, ENOMEM);                  \
+        break;                                                          \
+} while (0)
+
+#define NLC_FOP_CBK(_name, multilink, frame, cookie, this, op_ret, op_errno, \
+                    args ...) do {                                      \
+        nlc_conf_t  *conf  = NULL;                                      \
+                                                                        \
+        if (op_ret != 0)                                                \
+                goto out;                                               \
+                                                                        \
+        conf = this->private;                                           \
+                                                                        \
+        if (op_ret < 0 || !IS_PEC_ENABLED (conf))                       \
+                goto out;                                               \
+        nlc_dentry_op (frame, this, multilink);                         \
+out:                                                                    \
+        NLC_STACK_UNWIND (_name, frame, op_ret, op_errno, args);        \
+} while (0)
+
+static int32_t
+nlc_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                int32_t op_ret, int32_t op_errno, struct iatt *buf,
+                struct iatt *preoldparent, struct iatt *postoldparent,
+                struct iatt *prenewparent, struct iatt *postnewparent,
+                dict_t *xdata)
+{
+        NLC_FOP_CBK (rename, _gf_false, frame, cookie, this, op_ret, op_errno,
+                     buf, preoldparent, postoldparent, prenewparent,
+                     postnewparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
+            dict_t *xdata)
+{
+        NLC_FOP (rename, GF_FOP_RENAME, newloc, oldloc, frame, this, oldloc,
+                 newloc, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
+               int32_t op_errno, inode_t *inode, struct iatt *buf,
+               struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
+{
+        NLC_FOP_CBK(mknod, _gf_false, frame, cookie, this, op_ret, op_errno,
+                    inode, buf, preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc,
+           mode_t mode, dev_t rdev, mode_t umask,
+           dict_t *xdata)
+{
+        NLC_FOP(mknod, GF_FOP_MKNOD, loc, NULL, frame, this, loc, mode, rdev,
+                umask, xdata);
+        return 0;
+}
+
+static int32_t
+nlc_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
+                struct iatt *buf, struct iatt *preparent,
+                struct iatt *postparent, dict_t *xdata)
+{
+        NLC_FOP_CBK (create, _gf_false, frame, cookie, this, op_ret, op_errno,
+                     fd, inode, buf, preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+            mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
+{
+        NLC_FOP (create, GF_FOP_CREATE, loc, NULL, frame, this, loc, flags,
+                 mode, umask, fd, xdata);
+        return 0;
+}
+
+static int32_t
+nlc_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
+               int32_t op_errno, inode_t *inode, struct iatt *buf,
+               struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
+{
+        NLC_FOP_CBK (mkdir, _gf_false, frame, cookie, this, op_ret, op_errno,
+                     inode, buf, preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+           mode_t umask, dict_t *xdata)
+{
+        NLC_FOP (mkdir, GF_FOP_MKDIR, loc, NULL, frame, this, loc, mode,
+                 umask, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                int32_t op_ret, int32_t op_errno, inode_t *inode,
+                struct iatt *buf, dict_t *xdata, struct iatt *postparent)
+{
+        nlc_local_t *local = NULL;
+        nlc_conf_t  *conf  = NULL;
+
+        local = frame->local;
+        conf = this->private;
+
+        /* Donot add to pe, this may lead to duplicate entry and
+         * requires search before adding if list of strings */
+        if (op_ret < 0 && op_errno == ENOENT) {
+                nlc_dir_add_ne (this, local->loc.parent, local->loc.name);
+                GF_ATOMIC_INC (conf->nlc_counter.nlc_miss);
+        }
+
+        NLC_STACK_UNWIND (lookup, frame, op_ret, op_errno, inode, buf, xdata,
+                         postparent);
+        return 0;
+}
+
+
+static int32_t
+nlc_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+        nlc_local_t *local = NULL;
+        nlc_conf_t  *conf  = NULL;
+        inode_t     *inode = NULL;
+
+        local = nlc_local_init (frame, this, GF_FOP_LOOKUP, loc, NULL);
+        if (!local)
+                goto err;
+
+        conf = this->private;
+
+        if ((!loc->parent && gf_uuid_is_null (loc->pargfid)) || !loc->name)
+                goto wind;
+
+        inode = inode_grep (loc->inode->table, loc->parent, loc->name);
+        if (inode) {
+                inode_unref (inode);
+                goto wind;
+        }
+
+        if (nlc_is_negative_lookup (this, loc)) {
+                GF_ATOMIC_INC (conf->nlc_counter.nlc_hit);
+                gf_msg_trace (this->name, 0, "Serving negative lookup from "
+                              "cache:%s", loc->name);
+                goto unwind;
+        }
+
+wind:
+        STACK_WIND (frame, nlc_lookup_cbk,
+                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup,
+                    loc, xdata);
+        return 0;
+unwind:
+        NLC_STACK_UNWIND (lookup, frame, -1, ENOENT, NULL, NULL, NULL, NULL);
+        return 0;
+err:
+        NLC_STACK_UNWIND (lookup, frame, -1, ENOMEM, NULL, NULL, NULL, NULL);
+        return 0;
+}
+
+static int32_t
+nlc_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+               int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+               struct iatt *postparent, dict_t *xdata)
+{
+        NLC_FOP_CBK (rmdir, _gf_false, frame, cookie, this, op_ret, op_errno,
+                     preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+           dict_t *xdata)
+{
+        NLC_FOP (rmdir, GF_FOP_RMDIR, loc, NULL, frame, this, loc, flags,
+                 xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                  int32_t op_ret, int32_t op_errno, dict_t *dict,
+                  dict_t *xdata)
+{
+        nlc_conf_t  *conf  = NULL;
+
+        conf = this->private;
+        GF_VALIDATE_OR_GOTO (this->name, conf, out);
+
+        if (!IS_PEC_ENABLED (conf))
+                goto out;
+
+        if (op_ret < 0 && op_errno == ENOENT) {
+                GF_ATOMIC_INC (conf->nlc_counter.getrealfilename_miss);
+        }
+
+out:
+        NLC_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, const char *key,
+              dict_t *xdata)
+{
+        int32_t               op_ret                = -1;
+        int32_t               op_errno              = 0;
+        dict_t               *dict                  = NULL;
+        nlc_local_t           *local                 = NULL;
+        gf_boolean_t          hit                   = _gf_false;
+        const char           *fname                 = NULL;
+        nlc_conf_t            *conf                  = NULL;
+
+        conf = this->private;
+
+        if (!IS_PEC_ENABLED (conf))
+                goto wind;
+
+        if (!key || (strncmp (key, GF_XATTR_GET_REAL_FILENAME_KEY,
+                     strlen (GF_XATTR_GET_REAL_FILENAME_KEY)) != 0))
+                goto wind;
+
+        local = nlc_local_init (frame, this, GF_FOP_GETXATTR, loc, NULL);
+        if (!local)
+                goto err;
+
+        if (loc->inode && key) {
+                dict = dict_new ();
+                if (!dict)
+                        goto err;
+
+                fname = key + strlen (GF_XATTR_GET_REAL_FILENAME_KEY);
+                hit = nlc_get_real_file_name (this, loc, fname, &op_ret,
+                                             &op_errno, dict);
+                if (hit)
+                        goto unwind;
+                else
+                        dict_unref (dict);
+        }
+
+        STACK_WIND (frame, nlc_getxattr_cbk,
+                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr,
+                    loc, key, xdata);
+        return 0;
+wind:
+        STACK_WIND (frame, default_getxattr_cbk,
+                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr,
+                    loc, key, xdata);
+        return 0;
+unwind:
+        GF_ATOMIC_INC (conf->nlc_counter.getrealfilename_hit);
+        NLC_STACK_UNWIND (getxattr, frame, op_ret, op_errno, dict, NULL);
+        dict_unref (dict);
+        return 0;
+err:
+        NLC_STACK_UNWIND (getxattr, frame, -1, ENOMEM, NULL, NULL);
+        return 0;
+}
+
+
+static int32_t
+nlc_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                 int32_t op_ret, int32_t op_errno, inode_t *inode,
+                 struct iatt *buf, struct iatt *preparent,
+                 struct iatt *postparent, dict_t *xdata)
+{
+        NLC_FOP_CBK (symlink, _gf_false, frame, cookie, this, op_ret, op_errno,
+                     inode, buf, preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,
+             loc_t *loc, mode_t umask, dict_t *xdata)
+{
+        NLC_FOP (symlink, GF_FOP_SYMLINK, loc, NULL, frame, this, linkpath,
+                 loc, umask, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
+              int32_t op_errno, inode_t *inode, struct iatt *buf,
+              struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
+{
+        NLC_FOP_CBK (link, _gf_false, frame, cookie, this, op_ret, op_errno,
+                     inode, buf, preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
+          dict_t *xdata)
+{
+        NLC_FOP (link, GF_FOP_LINK, oldloc, newloc, frame, this, oldloc,
+                 newloc, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+                struct iatt *postparent, dict_t *xdata)
+{
+        uint32_t    link_count = 0;
+        gf_boolean_t multilink = _gf_false;
+
+        if (xdata && !dict_get_uint32 (xdata, GET_LINK_COUNT, &link_count)) {
+                if (link_count > 1)
+                        multilink = _gf_true;
+        } else {
+                /* Don't touch cache if we don't know enough */
+                gf_msg (this->name, GF_LOG_WARNING, 0, NLC_MSG_DICT_FAILURE,
+                        "Failed to get GET_LINK_COUNT from dict");
+                NLC_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent,
+                                  postparent, xdata);
+                return 0;
+        }
+
+        NLC_FOP_CBK (unlink, multilink, frame, cookie, this, op_ret, op_errno,
+                     preparent, postparent, xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+            dict_t *xdata)
+{
+        nlc_conf_t   *conf     = NULL;
+        gf_boolean_t new_dict = _gf_false;
+
+        conf = this->private;
+
+        if (!IS_PEC_ENABLED (conf))
+                goto do_fop;
+
+        if (!xdata) {
+                xdata = dict_new ();
+                if (xdata)
+                        new_dict = _gf_true;
+        }
+
+        if (xdata && dict_set_uint32 (xdata, GET_LINK_COUNT, 0)) {
+                gf_msg (this->name, GF_LOG_WARNING, 0, NLC_MSG_DICT_FAILURE,
+                        "Failed to set GET_LINK_COUNT in dict");
+                goto err;
+        }
+
+do_fop:
+        NLC_FOP (unlink, GF_FOP_UNLINK, loc, NULL, frame, this, loc, flags,
+                 xdata);
+
+        if (new_dict)
+                dict_unref (xdata);
+        return 0;
+}
+
+
+static int32_t
+nlc_invalidate (xlator_t *this, void *data)
+{
+        struct gf_upcall                    *up_data    = NULL;
+        struct gf_upcall_cache_invalidation *up_ci      = NULL;
+        inode_t                             *inode      = NULL;
+        inode_t                             *parent1    = NULL;
+        inode_t                             *parent2    = NULL;
+        int                                  ret        = 0;
+        inode_table_t                       *itable     = NULL;
+
+        up_data = (struct gf_upcall *)data;
+
+        if (up_data->event_type != GF_UPCALL_CACHE_INVALIDATION)
+                goto out;
+
+        up_ci = (struct gf_upcall_cache_invalidation *)up_data->data;
+
+        /*TODO: Add he inodes found as a member in gf_upcall_cache_invalidation
+         * so that it prevents subsequent xlators from doing inode_find again
+         */
+        itable = ((xlator_t *)this->graph->top)->itable;
+        inode = inode_find (itable, up_data->gfid);
+        if (!inode) {
+                ret = -1;
+                goto out;
+        }
+
+        if ((!((up_ci->flags & UP_TIMES) && inode->ia_type == IA_IFDIR)) &&
+            (!(up_ci->flags & UP_PARENT_DENTRY_FLAGS))) {
+                goto out;
+        }
+
+        if (!gf_uuid_is_null (up_ci->p_stat.ia_gfid)) {
+                parent1 = inode_find (itable, up_ci->p_stat.ia_gfid);
+                if (!parent1) {
+                        ret = -1;
+                        goto out;
+                }
+        }
+
+        if (!gf_uuid_is_null (up_ci->oldp_stat.ia_gfid)) {
+                parent2 = inode_find (itable, up_ci->oldp_stat.ia_gfid);
+                if (!parent2) {
+                        ret = -1;
+                        goto out;
+                }
+        }
+
+        /* TODO: get enough data in upcall so that we do not invalidate but
+         * update */
+        if (inode && inode->ia_type == IA_IFDIR)
+                nlc_inode_clear_cache (this, inode, NLC_NONE);
+        if (parent1)
+                nlc_inode_clear_cache (this, parent1, NLC_NONE);
+        if (parent2)
+                nlc_inode_clear_cache (this, parent2, NLC_NONE);
+out:
+        if (inode)
+                inode_unref (inode);
+        if (parent1)
+                inode_unref (parent1);
+        if (parent2)
+                inode_unref (parent2);
+
+        return ret;
+}
+
+
+int
+notify (xlator_t *this, int event, void *data, ...)
+{
+        int        ret  = 0;
+        time_t     now  = 0;
+
+        switch (event) {
+        case GF_EVENT_CHILD_DOWN:
+        case GF_EVENT_SOME_DESCENDENT_DOWN:
+        case GF_EVENT_CHILD_UP:
+        case GF_EVENT_SOME_DESCENDENT_UP:
+                time (&now);
+                nlc_update_child_down_time (this, &now);
+                /* TODO: nlc_clear_all_cache (this); else
+                 lru prune will lazily clear it*/
+                break;
+        case GF_EVENT_UPCALL:
+                ret = nlc_invalidate (this, data);
+                break;
+        case GF_EVENT_PARENT_DOWN:
+                nlc_disable_cache (this);
+                nlc_clear_all_cache (this);
+        default:
+                break;
+        }
+
+        if (default_notify (this, event, data) != 0)
+                ret = -1;
+
+        return ret;
+}
+
+
+static int32_t
+nlc_forget (xlator_t *this, inode_t *inode)
+{
+        uint64_t pe_int = 0;
+
+        inode_ctx_reset1 (inode, this, &pe_int);
+        GF_ASSERT (pe_int == 0);
+
+        nlc_inode_clear_cache (this, inode, NLC_NONE);
+
+        return 0;
+}
+
+
+static int32_t
+nlc_inodectx (xlator_t *this, inode_t *inode)
+{
+        nlc_dump_inodectx (this, inode);
+        return 0;
+}
+
+
+static int32_t
+nlc_priv_dump (xlator_t *this)
+{
+        nlc_conf_t *conf = NULL;
+        char  key_prefix[GF_DUMP_MAX_BUF_LEN];
+
+        conf = this->private;
+
+        snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s", this->type, this->name);
+        gf_proc_dump_add_section(key_prefix);
+
+        gf_proc_dump_write("negative_lookup_hit_count", "%"PRId64,
+                           conf->nlc_counter.nlc_hit.cnt);
+        gf_proc_dump_write("negative_lookup_miss_count", "%"PRId64,
+                           conf->nlc_counter.nlc_miss.cnt);
+        gf_proc_dump_write("get_real_filename_hit_count", "%"PRId64,
+                           conf->nlc_counter.getrealfilename_hit.cnt);
+        gf_proc_dump_write("get_real_filename_miss_count", "%"PRId64,
+                           conf->nlc_counter.getrealfilename_miss.cnt);
+        gf_proc_dump_write("nameless_lookup_count", "%"PRId64,
+                           conf->nlc_counter.nameless_lookup.cnt);
+        gf_proc_dump_write("inodes_with_positive_dentry_cache", "%"PRId64,
+                           conf->nlc_counter.pe_inode_cnt.cnt);
+        gf_proc_dump_write("inodes_with_negative_dentry_cache", "%"PRId64,
+                           conf->nlc_counter.ne_inode_cnt.cnt);
+        gf_proc_dump_write("dentry_invalidations_recieved", "%"PRId64,
+                           conf->nlc_counter.nlc_invals.cnt);
+        gf_proc_dump_write("cache_limit", "%"PRIu64,
+                           conf->cache_size);
+        gf_proc_dump_write("consumed_cache_size", "%"PRId64,
+                           conf->current_cache_size.cnt);
+        gf_proc_dump_write("inode_limit", "%"PRIu64,
+                           conf->inode_limit);
+        gf_proc_dump_write("consumed_inodes", "%"PRId64,
+                           conf->refd_inodes.cnt);
+
+        return 0;
+}
+
+
+void
+fini (xlator_t *this)
+{
+        return;
+}
+
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+        int     ret = -1;
+
+        ret = xlator_mem_acct_init (this, gf_nlc_mt_end + 1);
+        return ret;
+}
+
+
+int32_t
+reconfigure (xlator_t *this, dict_t *options)
+{
+        nlc_conf_t *conf = NULL;
+
+        conf = this->private;
+
+        GF_OPTION_RECONF ("nl-cache-timeout", conf->cache_timeout, options,
+                          int32, out);
+        GF_OPTION_RECONF ("nl-cache-positive-entry", conf->positive_entry_cache,
+                          options, bool, out);
+        GF_OPTION_RECONF ("nl-cache-limit", conf->cache_size, options,
+                          size_uint64, out);
+
+out:
+        return 0;
+}
+
+
+int32_t
+init (xlator_t *this)
+{
+        nlc_conf_t      *conf       = NULL;
+        int              ret        = -1;
+        inode_table_t   *itable     = NULL;
+
+        conf = GF_CALLOC (sizeof (*conf), 1, gf_nlc_mt_nlc_conf_t);
+        if (!conf)
+                goto out;
+
+        GF_OPTION_INIT ("nl-cache-timeout", conf->cache_timeout, int32, out);
+        GF_OPTION_INIT ("nl-cache-positive-entry", conf->positive_entry_cache,
+                        bool, out);
+        GF_OPTION_INIT ("nl-cache-limit", conf->cache_size, size_uint64, out);
+
+        /* Since the positive entries are stored as list of refs on
+         * existing inodes, we should not overflow the inode lru_limit.
+         * Hence keep the limit of inodes that are refed by this xlator,
+         * to 80% of inode_table->lru_limit. In fuse where the limit is
+         * infinite, take 131072 as lru limit (as in gfapi). */
+        itable = ((xlator_t *)this->graph->top)->itable;
+        if (itable && itable->lru_limit)
+                conf->inode_limit = itable->lru_limit * 80 / 100;
+        else
+                conf->inode_limit = 131072 * 80 / 100;
+
+        LOCK_INIT (&conf->lock);
+        GF_ATOMIC_INIT (conf->current_cache_size, 0);
+        GF_ATOMIC_INIT (conf->refd_inodes, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.nlc_hit, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.nlc_miss, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.nameless_lookup, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.getrealfilename_hit, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.getrealfilename_miss, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.pe_inode_cnt, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.ne_inode_cnt, 0);
+        GF_ATOMIC_INIT (conf->nlc_counter.nlc_invals, 0);
+
+        INIT_LIST_HEAD (&conf->lru);
+        time (&conf->last_child_down);
+
+        if (!glusterfs_global_timer_wheel (this)) {
+                gf_msg_debug (this->name, 0, "Initing the global timer wheel");
+                ret = glusterfs_global_timer_wheel_init (this->ctx);
+                if (ret) {
+                        gf_msg (this->name, GF_LOG_ERROR, 0,
+                                NLC_MSG_NO_TIMER_WHEEL,
+                                "Initing the global timer wheel failed");
+                                goto out;
+                }
+        }
+        conf->timer_wheel = glusterfs_global_timer_wheel (this);
+
+        this->private = conf;
+
+        ret = 0;
+out:
+        return ret;
+}
+
+
+struct xlator_fops fops = {
+        .rename               = nlc_rename,
+        .mknod                = nlc_mknod,
+        .create               = nlc_create,
+        .mkdir                = nlc_mkdir,
+        .lookup               = nlc_lookup,
+        .rmdir                = nlc_rmdir,
+        .getxattr             = nlc_getxattr,
+        .symlink              = nlc_symlink,
+        .link                 = nlc_link,
+        .unlink               = nlc_unlink,
+        /* TODO:
+        .readdir              = nlc_readdir,
+        .readdirp             = nlc_readdirp,
+        .seek                 = nlc_seek,
+        .opendir              = nlc_opendir, */
+};
+
+
+struct xlator_cbks cbks = {
+        .forget               = nlc_forget,
+};
+
+
+struct xlator_dumpops dumpops = {
+        .inodectx             = nlc_inodectx,
+        .priv                 = nlc_priv_dump,
+};
+
+struct volume_options options[] = {
+        { .key = {"nl-cache-positive-entry"},
+          .type = GF_OPTION_TYPE_BOOL,
+          .default_value = "false",
+          .description = "Cache the name of the files/directories that was"
+                         " looked up and are present in a directory",
+        },
+        { .key = {"nl-cache-limit"},
+          .type = GF_OPTION_TYPE_SIZET,
+          .min = 0,
+          .max = 100 * GF_UNIT_MB,
+          .default_value = "131072",
+          .description = "the value over which caching will be disabled for"
+                         "a while and the cache is cleared based on LRU",
+        },
+        { .key = {"nl-cache-timeout"},
+          .type = GF_OPTION_TYPE_INT,
+          .min = 0,
+          .max = 600,
+          .default_value = "600",
+          .description = "Time period after which cache has to be refreshed",
+        },
+        { .key = {NULL} },
+};
diff --git a/xlators/performance/nl-cache/src/nl-cache.h b/xlators/performance/nl-cache/src/nl-cache.h
new file mode 100644
index 0000000..e94641c
--- /dev/null
+++ b/xlators/performance/nl-cache/src/nl-cache.h
@@ -0,0 +1,173 @@
+/*
+ *   Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ *   This file is part of GlusterFS.
+ *
+ *   This file is licensed to you under your choice of the GNU Lesser
+ *   General Public License, version 3 or any later version (LGPLv3 or
+ *   later), or the GNU General Public License, version 2 (GPLv2), in all
+ *   cases as published by the Free Software Foundation.
+ */
+
+#ifndef __NL_CACHE_H__
+#define __NL_CACHE_H__
+
+#include "nl-cache-mem-types.h"
+#include "nl-cache-messages.h"
+#include "glusterfs.h"
+#include "xlator.h"
+#include "defaults.h"
+#include "atomic.h"
+
+#define NLC_INVALID 0x0000
+#define NLC_PE_FULL 0x0001
+#define NLC_PE_PARTIAL 0x0002
+#define NLC_NE_VALID 0x0004
+
+#define IS_PE_VALID(state) ((state != NLC_INVALID) && \
+                            (state & (NLC_PE_FULL | NLC_PE_PARTIAL)))
+#define IS_NE_VALID(state) ((state != NLC_INVALID) && (state & NLC_NE_VALID))
+
+#define IS_PEC_ENABLED(conf) (conf->positive_entry_cache)
+#define IS_CACHE_ENABLED(conf) ((!conf->cache_disabled))
+
+#define NLC_STACK_UNWIND(fop, frame, params ...) do {       \
+        nlc_local_t *__local = NULL;                        \
+        xlator_t *__xl      = NULL;                         \
+        if (frame) {                                        \
+                __xl = frame->this;                         \
+                __local = frame->local;                     \
+                frame->local = NULL;                        \
+        }                                                   \
+        STACK_UNWIND_STRICT (fop, frame, params);           \
+        nlc_local_wipe (__xl, __local);                     \
+} while (0)
+
+enum nlc_cache_clear_reason {
+        NLC_NONE = 0,
+        NLC_TIMER_EXPIRED,
+        NLC_LRU_PRUNE,
+};
+
+struct nlc_ne {
+        struct list_head  list;
+        char             *name;
+};
+typedef struct nlc_ne nlc_ne_t;
+
+struct nlc_pe {
+        struct list_head  list;
+        inode_t          *inode;
+        char             *name;
+};
+typedef struct nlc_pe nlc_pe_t;
+
+struct nlc_timer_data {
+        inode_t          *inode;
+        xlator_t         *this;
+};
+typedef struct nlc_timer_data nlc_timer_data_t;
+
+struct nlc_lru_node {
+        inode_t          *inode;
+        struct list_head  list;
+};
+typedef struct nlc_lru_node nlc_lru_node_t;
+
+struct nlc_ctx {
+        struct list_head         pe;   /* list of positive entries */
+        struct list_head         ne;   /* list of negative entries */
+        uint64_t                 state;
+        time_t                   cache_time;
+        struct gf_tw_timer_list *timer;
+        nlc_timer_data_t         *timer_data;
+        size_t                   cache_size;
+        uint64_t                 refd_inodes;
+        gf_lock_t                lock;
+};
+typedef struct nlc_ctx nlc_ctx_t;
+
+struct nlc_local {
+        loc_t    loc;
+        loc_t    loc2;
+        inode_t *inode;
+        inode_t *parent;
+        fd_t    *fd;
+        char    *linkname;
+        glusterfs_fop_t fop;
+};
+typedef struct nlc_local nlc_local_t;
+
+struct nlc_statistics {
+        gf_atomic_t nlc_hit; /* No. of times lookup/stat was served from this xl */
+        gf_atomic_t nlc_miss; /* No. of times negative lookups were sent to disk */
+        /* More granular counters */
+        gf_atomic_t nameless_lookup;
+        gf_atomic_t getrealfilename_hit;
+        gf_atomic_t getrealfilename_miss;
+        gf_atomic_t pe_inode_cnt;
+        gf_atomic_t ne_inode_cnt;
+        gf_atomic_t nlc_invals; /* No. of invalidates recieved from upcall*/
+};
+
+struct nlc_conf {
+        int32_t              cache_timeout;
+        gf_boolean_t         positive_entry_cache;
+        gf_boolean_t         negative_entry_cache;
+        gf_boolean_t         disable_cache;
+        uint64_t             cache_size;
+        gf_atomic_t          current_cache_size;
+        uint64_t             inode_limit;
+        gf_atomic_t          refd_inodes;
+        struct tvec_base    *timer_wheel;
+        time_t               last_child_down;
+        struct list_head     lru;
+        gf_lock_t            lock;
+        struct nlc_statistics nlc_counter;
+};
+typedef struct nlc_conf nlc_conf_t;
+
+gf_boolean_t
+nlc_get_real_file_name (xlator_t *this, loc_t *loc, const char *fname,
+                        int32_t *op_ret, int32_t *op_errno, dict_t *dict);
+
+gf_boolean_t
+nlc_is_negative_lookup (xlator_t *this, loc_t *loc);
+
+void
+nlc_set_dir_state (xlator_t *this, inode_t *inode, uint64_t state);
+
+void
+nlc_dir_add_pe (xlator_t *this, inode_t *inode, inode_t *entry_ino,
+                const char *name);
+
+void
+nlc_dir_remove_pe (xlator_t *this, inode_t *inode, inode_t *entry_ino,
+                   const char *name, gf_boolean_t multilink);
+
+void
+nlc_dir_add_ne (xlator_t *this, inode_t *inode, const char *name);
+
+void
+nlc_local_wipe (xlator_t *this, nlc_local_t *local);
+
+nlc_local_t *
+nlc_local_init (call_frame_t *frame, xlator_t *this, glusterfs_fop_t fop,
+                loc_t *loc, loc_t *loc2);
+
+void
+nlc_update_child_down_time (xlator_t *this, time_t *now);
+
+void
+nlc_inode_clear_cache (xlator_t *this, inode_t *inode,
+                      int reason);
+
+void
+nlc_dump_inodectx (xlator_t *this, inode_t *inode);
+
+void
+nlc_clear_all_cache (xlator_t *this);
+
+void
+nlc_disable_cache (xlator_t *this);
+
+#endif /* __NL_CACHE_H__ */
-- 
1.8.3.1