Blob Blame History Raw
From dfe7218f07ffa70b73c51c71b0f051be926b6d92 Mon Sep 17 00:00:00 2001
From: Aleš Matěj <amatej@redhat.com>
Date: Tue, 14 May 2019 16:48:13 +0200
Subject: [PATCH] Correct pkg count in headers if there were invalid pkgs (RhBug:1596211)

---
 src/createrepo_c.c  | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
 src/dumper_thread.c |   4 +++-
 src/dumper_thread.h |   3 ++-
 src/threads.c       |  23 +++++++++++++++++++++++
 src/threads.h       |   5 +++++
 src/xml_file.c      | 123 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/xml_file.h      |  15 +++++++++++++++
 7 files changed, 260 insertions(+), 16 deletions(-)

diff --git a/src/createrepo_c.c b/src/createrepo_c.c
index e16ae34..67c2752 100644
--- a/src/createrepo_c.c
+++ b/src/createrepo_c.c
@@ -124,7 +124,7 @@ fill_pool(GThreadPool *pool,
           struct CmdOptions *cmd_options,
           GSList **current_pkglist,
           FILE *output_pkg_list,
-          long *package_count,
+          long *task_count,
           int  media_id)
 {
     GQueue queue = G_QUEUE_INIT;
@@ -259,13 +259,13 @@ fill_pool(GThreadPool *pool,
 
     // Push sorted tasks into the thread pool
     while ((task = g_queue_pop_head(&queue)) != NULL) {
-        task->id = *package_count;
+        task->id = *task_count;
         task->media_id = media_id;
         g_thread_pool_push(pool, task, NULL);
-        ++*package_count;
+        ++*task_count;
     }
 
-    return *package_count;
+    return *task_count;
 }
 
 
@@ -321,6 +321,27 @@ prepare_cache_dir(struct CmdOptions *cmd_options,
     return TRUE;
 }
 
+/** Check if task finished without error, if yes
+ *  use content stats of the new file
+ *
+ * @param task          Rewrite pkg count task
+ * @param filename      Name of file with wrong package count
+ * @param exit_val      If errors occured set createrepo_c exit value
+ * @param content_stat  Content stats for filename
+ *
+ */
+static void
+error_check_and_set_content_stat(cr_CompressionTask *task, char *filename, int *exit_val, cr_ContentStat **content_stat){
+    if (task->err) {
+        g_critical("Cannot rewrite pkg count in %s: %s",
+                   filename, task->err->message);
+        *exit_val = 2;
+    }else{
+        cr_contentstat_free(*content_stat, NULL);
+        *content_stat = task->stat;
+        task->stat = NULL;
+    }
+}
 
 int
 main(int argc, char **argv)
@@ -478,7 +499,7 @@ main(int argc, char **argv)
                                           NULL);
     g_debug("Thread pool ready");
 
-    long package_count = 0;
+    long task_count = 0;
     GSList *current_pkglist = NULL;
     /* ^^^ List with basenames of files which will be processed */
 
@@ -490,26 +511,26 @@ main(int argc, char **argv)
                   cmd_options,
                   &current_pkglist,
                   output_pkg_list,
-                  &package_count,
+                  &task_count,
                   media_id);
         g_free(tmp_in_dir);
     }
 
-    g_debug("Package count: %ld", package_count);
-    g_message("Directory walk done - %ld packages", package_count);
+    g_debug("Package count: %ld", task_count);
+    g_message("Directory walk done - %ld packages", task_count);
 
     if (output_pkg_list)
         fclose(output_pkg_list);
 
 
     // Load old metadata if --update
     cr_Metadata *old_metadata = NULL;
     struct cr_MetadataLocation *old_metadata_location = NULL;
 
-    if (!package_count)
+    if (!task_count)
         g_debug("No packages found - skipping metadata loading");
 
-    if (package_count && cmd_options->update) {
+    if (task_count && cmd_options->update) {
         int ret;
         old_metadata = cr_metadata_new(CR_HT_KEY_FILENAME, 1, current_pkglist);
         cr_metadata_set_dupaction(old_metadata, CR_HT_DUPACT_REMOVEALL);
@@ -741,9 +762,9 @@ main(int argc, char **argv)
 
     // Set number of packages
     g_debug("Setting number of packages");
-    cr_xmlfile_set_num_of_pkgs(pri_cr_file, package_count, NULL);
-    cr_xmlfile_set_num_of_pkgs(fil_cr_file, package_count, NULL);
-    cr_xmlfile_set_num_of_pkgs(oth_cr_file, package_count, NULL);
+    cr_xmlfile_set_num_of_pkgs(pri_cr_file, task_count, NULL);
+    cr_xmlfile_set_num_of_pkgs(fil_cr_file, task_count, NULL);
+    cr_xmlfile_set_num_of_pkgs(oth_cr_file, task_count, NULL);
 
     // Open sqlite databases
     gchar *pri_db_filename = NULL;
@@ -832,7 +853,8 @@ main(int argc, char **argv)
     user_data.checksum_cachedir = cmd_options->checksum_cachedir;
     user_data.skip_symlinks     = cmd_options->skip_symlinks;
     user_data.repodir_name_len  = strlen(in_dir);
-    user_data.package_count     = package_count;
+    user_data.task_count        = task_count;
+    user_data.package_count     = 0;
     user_data.skip_stat         = cmd_options->skip_stat;
     user_data.old_metadata      = old_metadata;
     user_data.mutex_pri         = g_mutex_new();
@@ -876,6 +898,59 @@ main(int argc, char **argv)
     cr_xmlfile_close(fil_cr_file, NULL);
     cr_xmlfile_close(oth_cr_file, NULL);
 
+
+    /* At the time of writing xml metadata headers we haven't yet parsed all
+     * the packages and we don't know whether there were some invalid ones,
+     * therefore we write the task count into the headers instead of the actual package count.
+     * If there actually were some invalid packages we have to correct this value
+     * that unfortunately means we have to decompress metadata files change package
+     * count value and compress them again.
+     */
+    if (user_data.package_count != user_data.task_count){
+        g_message("Warning: There were some invalid packages: we have to recompress other, filelists and primary xml metadata files in order to have correct package counts");
+
+        GThreadPool *rewrite_pkg_count_pool = g_thread_pool_new(cr_rewrite_pkg_count_thread,
+                                                                &user_data, 3, FALSE, NULL);
+
+        cr_CompressionTask *pri_rewrite_pkg_count_task;
+        cr_CompressionTask *fil_rewrite_pkg_count_task;
+        cr_CompressionTask *oth_rewrite_pkg_count_task;
+
+        pri_rewrite_pkg_count_task = cr_compressiontask_new(pri_xml_filename,
+                                                            NULL,
+                                                            xml_compression,
+                                                            cmd_options->repomd_checksum_type,
+                                                            1,
+                                                            &tmp_err);
+        g_thread_pool_push(rewrite_pkg_count_pool, pri_rewrite_pkg_count_task, NULL);
+
+        fil_rewrite_pkg_count_task = cr_compressiontask_new(fil_xml_filename,
+                                                            NULL,
+                                                            xml_compression,
+                                                            cmd_options->repomd_checksum_type,
+                                                            1,
+                                                            &tmp_err);
+        g_thread_pool_push(rewrite_pkg_count_pool, fil_rewrite_pkg_count_task, NULL);
+
+        oth_rewrite_pkg_count_task = cr_compressiontask_new(oth_xml_filename,
+                                                            NULL,
+                                                            xml_compression,
+                                                            cmd_options->repomd_checksum_type,
+                                                            1,
+                                                            &tmp_err);
+        g_thread_pool_push(rewrite_pkg_count_pool, oth_rewrite_pkg_count_task, NULL);
+
+        g_thread_pool_free(rewrite_pkg_count_pool, FALSE, TRUE);
+
+        error_check_and_set_content_stat(pri_rewrite_pkg_count_task, pri_xml_filename, &exit_val, &pri_stat);
+        error_check_and_set_content_stat(fil_rewrite_pkg_count_task, fil_xml_filename, &exit_val, &fil_stat);
+        error_check_and_set_content_stat(oth_rewrite_pkg_count_task, oth_xml_filename, &exit_val, &oth_stat);
+
+        cr_compressiontask_free(pri_rewrite_pkg_count_task, NULL);
+        cr_compressiontask_free(fil_rewrite_pkg_count_task, NULL);
+        cr_compressiontask_free(oth_rewrite_pkg_count_task, NULL);
+    }
+
     g_queue_free(user_data.buffer);
     g_mutex_free(user_data.mutex_buffer);
     g_cond_free(user_data.cond_pri);
diff --git a/src/dumper_thread.c b/src/dumper_thread.c
index fbaa5be..e282f96 100644
--- a/src/dumper_thread.c
+++ b/src/dumper_thread.c
@@ -74,6 +74,8 @@ write_pkg(long id,
     g_mutex_lock(udata->mutex_pri);
     while (udata->id_pri != id)
         g_cond_wait (udata->cond_pri, udata->mutex_pri);
+
+    udata->package_count++;
     ++udata->id_pri;
     cr_xmlfile_add_chunk(udata->pri_f, (const char *) res.primary, &tmp_err);
     if (tmp_err) {
@@ -476,7 +478,7 @@ cr_dumper_thread(gpointer data, gpointer user_data)
 
     if (g_queue_get_length(udata->buffer) < MAX_TASK_BUFFER_LEN
         && udata->id_pri != task->id
-        && udata->package_count > (task->id + 1))
+        && udata->task_count > (task->id + 1))
     {
         // If:
         //  * this isn't our turn
diff --git a/src/dumper_thread.h b/src/dumper_thread.h
index ed21053..4e18869 100644
--- a/src/dumper_thread.h
+++ b/src/dumper_thread.h
@@ -61,7 +61,8 @@ struct UserData {
     cr_ChecksumType checksum_type;  // Constant representing selected checksum
     const char *checksum_cachedir;  // Dir with cached checksums
     gboolean skip_symlinks;         // Skip symlinks
-    long package_count;             // Total number of packages to process
+    long task_count;                // Total number of task to process
+    long package_count;             // Total number of packages processed
 
     // Update stuff
     gboolean skip_stat;             // Skip stat() while updating
diff --git a/src/threads.c b/src/threads.c
index aee07d1..844e900 100644
--- a/src/threads.c
+++ b/src/threads.c
@@ -21,6 +21,7 @@
 #include "threads.h"
 #include "error.h"
 #include "misc.h"
+#include "dumper_thread.h"
 
 #define ERR_DOMAIN      CREATEREPO_C_ERROR
 
@@ -108,6 +109,28 @@ cr_compressing_thread(gpointer data, G_GNUC_UNUSED gpointer user_data)
     }
 }
 
+void
+cr_rewrite_pkg_count_thread(gpointer data, gpointer user_data)
+{
+    cr_CompressionTask *task = data;
+    struct UserData *ud = user_data;
+    GError *tmp_err = NULL;
+
+    assert(task);
+
+    cr_rewrite_header_package_count(task->src,
+                                    task->type,
+                                    ud->package_count,
+                                    ud->task_count,
+                                    task->stat,
+                                    &tmp_err);
+
+    if (tmp_err) {
+        // Error encountered
+        g_propagate_error(&task->err, tmp_err);
+    }
+}
+
 /** Parallel Repomd Record Fill */
 
 cr_RepomdRecordFillTask *
diff --git a/src/threads.h b/src/threads.h
index 2d554cd..19ba917 100644
--- a/src/threads.h
+++ b/src/threads.h
@@ -150,6 +150,11 @@ cr_repomdrecordfilltask_free(cr_RepomdRecordFillTask *task, GError **err);
 void
 cr_repomd_record_fill_thread(gpointer data, gpointer user_data);
 
+/** Function for GThread Pool.
+ */
+void
+cr_rewrite_pkg_count_thread(gpointer data, gpointer user_data);
+
 /** @} */
 
 #ifdef __cplusplus
diff --git a/src/xml_file.c b/src/xml_file.c
index 65fb945..1d670ae 100644
--- a/src/xml_file.c
+++ b/src/xml_file.c
@@ -18,8 +18,10 @@
  */
 
 #include <glib.h>
+#include <glib/gstdio.h>
 #include <assert.h>
 #include "xml_file.h"
+#include <errno.h>
 #include "error.h"
 #include "xml_dump.h"
 #include "compression_wrapper.h"
@@ -40,6 +42,9 @@
 #define XML_PRESTODELTA_HEADER  XML_HEADER"<prestodelta>\n"
 #define XML_UPDATEINFO_HEADER   XML_HEADER"<updates>\n"
 
+#define XML_MAX_HEADER_SIZE     300
+#define XML_RECOMPRESS_BUFFER_SIZE   8192
+
 #define XML_PRIMARY_FOOTER      "</metadata>"
 #define XML_FILELISTS_FOOTER    "</filelists>"
 #define XML_OTHER_FOOTER        "</otherdata>"
@@ -317,3 +322,121 @@ cr_xmlfile_close(cr_XmlFile *f, GError **err)
 
     return CRE_OK;
 }
+
+static int
+write_modified_header(int task_count,
+                      int package_count,
+                      cr_XmlFile *cr_file,
+                      gchar *header_buf,
+                      int header_len,
+                      GError **err)
+{
+    GError *tmp_err = NULL;
+    gchar *package_count_string;
+    gchar *task_count_string;
+    int bytes_written = 0;
+    int package_count_string_len = rasprintf(&package_count_string, "packages=\"%i\"", package_count);
+    int task_count_string_len = rasprintf(&task_count_string, "packages=\"%i\"", task_count);
+
+    gchar *pointer_to_pkgs = strstr(header_buf, task_count_string);
+    if (!pointer_to_pkgs){
+        g_free(package_count_string);
+        g_free(task_count_string);
+        return 0;
+    }
+    gchar *pointer_to_pkgs_end = pointer_to_pkgs + task_count_string_len;
+
+    bytes_written += cr_write(cr_file->f, header_buf, pointer_to_pkgs - header_buf, &tmp_err);
+    if (!tmp_err)
+        bytes_written += cr_write(cr_file->f, package_count_string, package_count_string_len, &tmp_err);
+    if (!tmp_err)
+        bytes_written += cr_write(cr_file->f, pointer_to_pkgs_end, header_len - (pointer_to_pkgs_end - header_buf), &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while writing header part:");
+        g_free(package_count_string);
+        g_free(task_count_string);
+        return 0;
+    }
+    g_free(package_count_string);
+    g_free(task_count_string);
+    return bytes_written;
+}
+
+void
+cr_rewrite_header_package_count(gchar *original_filename,
+                                cr_CompressionType xml_compression,
+                                int package_count,
+                                int task_count,
+                                cr_ContentStat *file_stat,
+                                GError **err)
+{
+    GError *tmp_err = NULL;
+    CR_FILE *original_file = cr_open(original_filename, CR_CW_MODE_READ, CR_CW_AUTO_DETECT_COMPRESSION, &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while reopening for reading:");
+        return;
+    }
+
+    gchar *tmp_xml_filename = g_strconcat(original_filename, ".tmp", NULL);
+    cr_XmlFile *new_file = cr_xmlfile_sopen_primary(tmp_xml_filename,
+                                                    xml_compression,
+                                                    file_stat,
+                                                    &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while opening for writing:");
+        cr_close(original_file, NULL);
+        g_free(tmp_xml_filename);
+        return;
+    }
+
+    gchar header_buf[XML_MAX_HEADER_SIZE];
+    int len_read = cr_read(original_file, header_buf, XML_MAX_HEADER_SIZE, &tmp_err);
+    if (!tmp_err)
+        write_modified_header(task_count, package_count, new_file, header_buf, len_read, &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while recompressing:");
+        cr_xmlfile_close(new_file, NULL);
+        cr_close(original_file, NULL);
+        g_free(tmp_xml_filename);
+        return;
+    }
+    //Copy the rest of the file
+    gchar copy_buf[XML_RECOMPRESS_BUFFER_SIZE];
+    while(len_read)
+    {
+        len_read = cr_read(original_file, copy_buf, XML_RECOMPRESS_BUFFER_SIZE, &tmp_err);
+        if (!tmp_err)
+            cr_write(new_file->f, copy_buf, len_read, &tmp_err);
+        if (tmp_err) {
+            g_propagate_prefixed_error(err, tmp_err, "Error encountered while recompressing:");
+            cr_xmlfile_close(new_file, NULL);
+            cr_close(original_file, NULL);
+            g_free(tmp_xml_filename);
+            return;
+        }
+    }
+
+    new_file->header = 1;
+    new_file->footer = 1;
+
+    cr_xmlfile_close(new_file, &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while writing:");
+        cr_close(original_file, NULL);
+        g_free(tmp_xml_filename);
+        return;
+    }
+    cr_close(original_file, &tmp_err);
+    if (tmp_err) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while writing:");
+        g_free(tmp_xml_filename);
+        return;
+    }
+
+    if (g_rename(tmp_xml_filename, original_filename) == -1) {
+        g_propagate_prefixed_error(err, tmp_err, "Error encountered while renaming:");
+        g_free(tmp_xml_filename);
+        return;
+    }
+    g_free(tmp_xml_filename);
+}
diff --git a/src/xml_file.h b/src/xml_file.h
index 96ef5e3..6ac4c97 100644
--- a/src/xml_file.h
+++ b/src/xml_file.h
@@ -221,6 +221,21 @@ int cr_xmlfile_add_chunk(cr_XmlFile *f, const char *chunk, GError **err);
  */
 int cr_xmlfile_close(cr_XmlFile *f, GError **err);
 
+/** Rewrite package count field in repodata header in xml file.
+ * In order to do this we have to decompress and after the change
+ * compress the whole file again, so entirely new file is created.
+ * @param original_filename     Current file with wrong value in header
+ * @param package_count         Actual package count (desired value in header)
+ * @param task_count            Task count (current value in header)
+ * @param file_stat             cr_ContentStat for stats of the new file, it will be modified
+ * @param err                   **GError
+ */
+void cr_rewrite_header_package_count(gchar *original_filename,
+                                     cr_CompressionType xml_compression,
+                                     int package_count,
+                                     int task_count,
+                                     cr_ContentStat *file_stat,
+                                     GError **err);
 
 /** @} */
 
--
libgit2 0.27.8