From a59b2f4129565dbfa1b63899dd550e9c22b02923 Mon Sep 17 00:00:00 2001 From: Mohammad Nweider Date: Wed, 18 Oct 2017 13:02:15 +0000 Subject: [PATCH] Ticket 49401 - improve valueset sorted performance on delete Bug Description: valueset sorted maintains a list of syntax sorted references to the attributes of the entry. During addition these are modified and added properly, so they stay sorted. However, in the past to maintain the sorted property, during a delete we would need to remove the vs->sorted array, and recreate it via qsort, While this was an improvement from past (where we would removed vs->sorted during an attr delete), it still has performance implications on very very large datasets, IE 50,000 member groups with addition/deletion, large entry caches and replication. Fix Description: Implement a new algorithm that is able to maintain existing sort data in a near linear time. https://pagure.io/389-ds-base/issue/49401 Author: nweiderm, wibrown Review by: wibrown, lkrispen, tbordaz (Thanks nweiderm!) (cherry picked from commit a43a8efc7907116146b505ac40f18fac71f474b0) --- ldap/servers/slapd/valueset.c | 171 +++++++++++++++++++++++++----------------- 1 file changed, 103 insertions(+), 68 deletions(-) diff --git a/ldap/servers/slapd/valueset.c b/ldap/servers/slapd/valueset.c index d2c67d2fb..1c1bc150a 100644 --- a/ldap/servers/slapd/valueset.c +++ b/ldap/servers/slapd/valueset.c @@ -677,100 +677,136 @@ valueset_array_purge(const Slapi_Attr *a, Slapi_ValueSet *vs, const CSN *csn) size_t i = 0; size_t j = 0; int nextValue = 0; + int nv = 0; int numValues = 0; + Slapi_Value **va2 = NULL; + int *sorted2 = NULL; /* Loop over all the values freeing the old ones. */ - for (i = 0; i < vs->num; i++) { + for(i = 0; i < vs->num; i++) + { /* If we have the sorted array, find the va array ref by it. */ if (vs->sorted) { j = vs->sorted[i]; } else { j = i; } - csnset_purge(&(vs->va[j]->v_csnset), csn); - if (vs->va[j]->v_csnset == NULL) { - slapi_value_free(&vs->va[j]); - vs->va[j] = NULL; - } else if (vs->va[j] != NULL) { - /* This value survived, we should count it. */ - numValues++; + if (vs->va[j]) { + csnset_purge(&(vs->va[j]->v_csnset),csn); + if (vs->va[j]->v_csnset == NULL) { + slapi_value_free(&vs->va[j]); + /* Set the removed value to NULL so we know later to skip it */ + vs->va[j] = NULL; + if (vs->sorted) { + /* Mark the value in sorted for removal */ + vs->sorted[i] = -1; + } + } else { + /* This value survived, we should count it. */ + numValues++; + } } } - /* Now compact the value/sorted list. */ + /* Compact vs->va and vs->sorted only when there're + * remaining values ie: numValues is greater than 0 */ /* - * Because we want to preserve the sorted array, this is complicated. + * Algorithm explination: We start with a pair of arrays - the attrs, and the sorted array that provides + * a lookup into it: + * + * va: [d e a c b] sorted: [2 4 3 0 1] + * + * When we remove the element b, we NULL it, and we have to mark the place where it "was" as a -1 to + * flag it's removal. + * + * va: [d e a c NULL] sorted: [2 -1 3 0 1] + * + * Now a second va is created with the reduced allocation, + * + * va2: [ X X X X ] .... + * + * Now we loop over sorted, skipping -1 that we find. In a new counter we create new sorted + * references, and move the values compacting them in the process. + * va: [d e a c NULL] + * va2: [a x x x] + * sorted: [_0 -1 3 0 1] + * + * Looping a few more times would yield: * - * We have an array of values: - * [ b, a, c, NULL, e, NULL, NULL, d] - * And an array of indicies that are sorted. - * [ 1, 0, 2, 7, 4, 3, 5, 6 ] - * Were we to iterate over the sorted array, we get refs to the values in - * some order. - * The issue is now we must *remove* from both the values *and* the sorted. + * va2: [a c x x] + * sorted: [_0 _1 3 0 1] * - * Previously, we just discarded this, because too hard. Now we try to keep - * it. The issue is that this is surprisingly hard to actually keep in - * sync. + * va2: [a c d x] + * sorted: [_0 _1 _2 0 1] * - * We can't just blindly move the values down: That breaks the sorted array - * and we would need to iterate over the sorted array multiple times to - * achieve this. + * va2: [a c d e] + * sorted: [_0 _1 _2 _3 1] + * + * Not only does this sort va, but with sorted, we have a faster lookup, and it will benefit cache + * lookup. * - * It's actually going to be easier to just ditch the sorted, compact vs - * and then qsort the array. */ + if (numValues > 0) { + if(vs->sorted) { + /* Let's allocate va2 and sorted2 */ + va2 = (Slapi_Value **) slapi_ch_malloc( (numValues + 1) * sizeof(Slapi_Value *)); + sorted2 = (int *) slapi_ch_malloc( (numValues + 1)* sizeof(int)); + } - j = 0; - while (nextValue < numValues && j < vs->num) { - /* nextValue is what we are looking at now - * j tracks along the array getting next elements. - * - * [ b, a, c, NULL, e, NULL, NULL, d] - * ^nv ^j - * [ b, a, c, e, NULL, NULL, NULL, d] - * ^nv ^j - * [ b, a, c, e, NULL, NULL, NULL, d] - * ^nv ^j - * [ b, a, c, e, NULL, NULL, NULL, d] - * ^nv ^j - * [ b, a, c, e, NULL, NULL, NULL, d] - * ^nv ^j - * [ b, a, c, e, d, NULL, NULL, NULL] - * ^nv ^j - */ - if (vs->va[nextValue] == NULL) { - /* Advance j till we find something */ - while (vs->va[j] == NULL) { - j++; + /* I is the index for the *new* va2 array */ + for(i=0; inum; i++) { + if (vs->sorted) { + /* Skip any removed values from the index */ + while((nv < vs->num) && (-1 == vs->sorted[nv])) { + nv++; + } + /* We have a remaining value, add it to the va */ + if(nv < vs->num) { + va2[i] = vs->va[vs->sorted[nv]]; + sorted2[i] = i; + nv++; + } + } else { + while((nextValue < vs->num) && (NULL == vs->va[nextValue])) { + nextValue++; + } + + if(nextValue < vs->num) { + vs->va[i] = vs->va[nextValue]; + nextValue++; + } else { + break; + } } - /* We have something! */ - vs->va[nextValue] = vs->va[j]; - vs->va[j] = NULL; } - nextValue++; - } - /* Fix up the number of values */ - vs->num = numValues; - /* Should we re-alloc values to be smaller? */ - /* Other parts of DS are lazy. Lets clean our list */ - for (j = vs->num; j < vs->max; j++) { - vs->va[j] = NULL; - } - /* All the values were deleted, we can discard the whole array. */ - if (vs->num == 0) { if (vs->sorted) { + /* Finally replace the valuearray and adjust num, max */ + slapi_ch_free((void **)&vs->va); slapi_ch_free((void **)&vs->sorted); + vs->va = va2; + vs->sorted = sorted2; + vs->num = numValues; + vs->max = vs->num + 1; + } else { + vs->num = numValues; } - slapi_ch_free((void **)&vs->va); - vs->va = NULL; - vs->max = 0; - } else if (vs->sorted != NULL) { - /* We still have values! rebuild the sorted array */ - valueset_array_to_sorted(a, vs); + + for (j = vs->num; j < vs->max; j++) { + vs->va[j] = NULL; + if (vs->sorted) { + vs->sorted[j] = -1; + } + } + } else { + slapi_valueset_done(vs); } + /* We still have values but not sorted array! rebuild it */ + if(vs->num > VALUESET_ARRAY_SORT_THRESHOLD && vs->sorted == NULL) { + vs->sorted = (int *) slapi_ch_malloc( vs->max* sizeof(int)); + valueset_array_to_sorted(a, vs); + } #ifdef DEBUG PR_ASSERT(vs->num == 0 || (vs->num > 0 && vs->va[0] != NULL)); size_t index = 0; @@ -781,7 +817,6 @@ valueset_array_purge(const Slapi_Attr *a, Slapi_ValueSet *vs, const CSN *csn) PR_ASSERT(vs->va[index] == NULL); } #endif - /* return the number of remaining values */ return numValues; } -- 2.13.6