From dfe7218f07ffa70b73c51c71b0f051be926b6d92 Mon Sep 17 00:00:00 2001 From: Aleš Matěj Date: Tue, 14 May 2019 16:48:13 +0200 Subject: [PATCH] Correct pkg count in headers if there were invalid pkgs (RhBug:1596211) --- src/createrepo_c.c | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------- src/dumper_thread.c | 4 +++- src/dumper_thread.h | 3 ++- src/threads.c | 23 +++++++++++++++++++++++ src/threads.h | 5 +++++ src/xml_file.c | 123 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/xml_file.h | 15 +++++++++++++++ 7 files changed, 260 insertions(+), 16 deletions(-) diff --git a/src/createrepo_c.c b/src/createrepo_c.c index e16ae34..67c2752 100644 --- a/src/createrepo_c.c +++ b/src/createrepo_c.c @@ -124,7 +124,7 @@ fill_pool(GThreadPool *pool, struct CmdOptions *cmd_options, GSList **current_pkglist, FILE *output_pkg_list, - long *package_count, + long *task_count, int media_id) { GQueue queue = G_QUEUE_INIT; @@ -259,13 +259,13 @@ fill_pool(GThreadPool *pool, // Push sorted tasks into the thread pool while ((task = g_queue_pop_head(&queue)) != NULL) { - task->id = *package_count; + task->id = *task_count; task->media_id = media_id; g_thread_pool_push(pool, task, NULL); - ++*package_count; + ++*task_count; } - return *package_count; + return *task_count; } @@ -321,6 +321,27 @@ prepare_cache_dir(struct CmdOptions *cmd_options, return TRUE; } +/** Check if task finished without error, if yes + * use content stats of the new file + * + * @param task Rewrite pkg count task + * @param filename Name of file with wrong package count + * @param exit_val If errors occured set createrepo_c exit value + * @param content_stat Content stats for filename + * + */ +static void +error_check_and_set_content_stat(cr_CompressionTask *task, char *filename, int *exit_val, cr_ContentStat **content_stat){ + if (task->err) { + g_critical("Cannot rewrite pkg count in %s: %s", + filename, task->err->message); + *exit_val = 2; + }else{ + cr_contentstat_free(*content_stat, NULL); + *content_stat = task->stat; + task->stat = NULL; + } +} int main(int argc, char **argv) @@ -478,7 +499,7 @@ main(int argc, char **argv) NULL); g_debug("Thread pool ready"); - long package_count = 0; + long task_count = 0; GSList *current_pkglist = NULL; /* ^^^ List with basenames of files which will be processed */ @@ -490,26 +511,26 @@ main(int argc, char **argv) cmd_options, ¤t_pkglist, output_pkg_list, - &package_count, + &task_count, media_id); g_free(tmp_in_dir); } - g_debug("Package count: %ld", package_count); - g_message("Directory walk done - %ld packages", package_count); + g_debug("Package count: %ld", task_count); + g_message("Directory walk done - %ld packages", task_count); if (output_pkg_list) fclose(output_pkg_list); // Load old metadata if --update cr_Metadata *old_metadata = NULL; struct cr_MetadataLocation *old_metadata_location = NULL; - if (!package_count) + if (!task_count) g_debug("No packages found - skipping metadata loading"); - if (package_count && cmd_options->update) { + if (task_count && cmd_options->update) { int ret; old_metadata = cr_metadata_new(CR_HT_KEY_FILENAME, 1, current_pkglist); cr_metadata_set_dupaction(old_metadata, CR_HT_DUPACT_REMOVEALL); @@ -741,9 +762,9 @@ main(int argc, char **argv) // Set number of packages g_debug("Setting number of packages"); - cr_xmlfile_set_num_of_pkgs(pri_cr_file, package_count, NULL); - cr_xmlfile_set_num_of_pkgs(fil_cr_file, package_count, NULL); - cr_xmlfile_set_num_of_pkgs(oth_cr_file, package_count, NULL); + cr_xmlfile_set_num_of_pkgs(pri_cr_file, task_count, NULL); + cr_xmlfile_set_num_of_pkgs(fil_cr_file, task_count, NULL); + cr_xmlfile_set_num_of_pkgs(oth_cr_file, task_count, NULL); // Open sqlite databases gchar *pri_db_filename = NULL; @@ -832,7 +853,8 @@ main(int argc, char **argv) user_data.checksum_cachedir = cmd_options->checksum_cachedir; user_data.skip_symlinks = cmd_options->skip_symlinks; user_data.repodir_name_len = strlen(in_dir); - user_data.package_count = package_count; + user_data.task_count = task_count; + user_data.package_count = 0; user_data.skip_stat = cmd_options->skip_stat; user_data.old_metadata = old_metadata; user_data.mutex_pri = g_mutex_new(); @@ -876,6 +898,59 @@ main(int argc, char **argv) cr_xmlfile_close(fil_cr_file, NULL); cr_xmlfile_close(oth_cr_file, NULL); + + /* At the time of writing xml metadata headers we haven't yet parsed all + * the packages and we don't know whether there were some invalid ones, + * therefore we write the task count into the headers instead of the actual package count. + * If there actually were some invalid packages we have to correct this value + * that unfortunately means we have to decompress metadata files change package + * count value and compress them again. + */ + if (user_data.package_count != user_data.task_count){ + g_message("Warning: There were some invalid packages: we have to recompress other, filelists and primary xml metadata files in order to have correct package counts"); + + GThreadPool *rewrite_pkg_count_pool = g_thread_pool_new(cr_rewrite_pkg_count_thread, + &user_data, 3, FALSE, NULL); + + cr_CompressionTask *pri_rewrite_pkg_count_task; + cr_CompressionTask *fil_rewrite_pkg_count_task; + cr_CompressionTask *oth_rewrite_pkg_count_task; + + pri_rewrite_pkg_count_task = cr_compressiontask_new(pri_xml_filename, + NULL, + xml_compression, + cmd_options->repomd_checksum_type, + 1, + &tmp_err); + g_thread_pool_push(rewrite_pkg_count_pool, pri_rewrite_pkg_count_task, NULL); + + fil_rewrite_pkg_count_task = cr_compressiontask_new(fil_xml_filename, + NULL, + xml_compression, + cmd_options->repomd_checksum_type, + 1, + &tmp_err); + g_thread_pool_push(rewrite_pkg_count_pool, fil_rewrite_pkg_count_task, NULL); + + oth_rewrite_pkg_count_task = cr_compressiontask_new(oth_xml_filename, + NULL, + xml_compression, + cmd_options->repomd_checksum_type, + 1, + &tmp_err); + g_thread_pool_push(rewrite_pkg_count_pool, oth_rewrite_pkg_count_task, NULL); + + g_thread_pool_free(rewrite_pkg_count_pool, FALSE, TRUE); + + error_check_and_set_content_stat(pri_rewrite_pkg_count_task, pri_xml_filename, &exit_val, &pri_stat); + error_check_and_set_content_stat(fil_rewrite_pkg_count_task, fil_xml_filename, &exit_val, &fil_stat); + error_check_and_set_content_stat(oth_rewrite_pkg_count_task, oth_xml_filename, &exit_val, &oth_stat); + + cr_compressiontask_free(pri_rewrite_pkg_count_task, NULL); + cr_compressiontask_free(fil_rewrite_pkg_count_task, NULL); + cr_compressiontask_free(oth_rewrite_pkg_count_task, NULL); + } + g_queue_free(user_data.buffer); g_mutex_free(user_data.mutex_buffer); g_cond_free(user_data.cond_pri); diff --git a/src/dumper_thread.c b/src/dumper_thread.c index fbaa5be..e282f96 100644 --- a/src/dumper_thread.c +++ b/src/dumper_thread.c @@ -74,6 +74,8 @@ write_pkg(long id, g_mutex_lock(udata->mutex_pri); while (udata->id_pri != id) g_cond_wait (udata->cond_pri, udata->mutex_pri); + + udata->package_count++; ++udata->id_pri; cr_xmlfile_add_chunk(udata->pri_f, (const char *) res.primary, &tmp_err); if (tmp_err) { @@ -476,7 +478,7 @@ cr_dumper_thread(gpointer data, gpointer user_data) if (g_queue_get_length(udata->buffer) < MAX_TASK_BUFFER_LEN && udata->id_pri != task->id - && udata->package_count > (task->id + 1)) + && udata->task_count > (task->id + 1)) { // If: // * this isn't our turn diff --git a/src/dumper_thread.h b/src/dumper_thread.h index ed21053..4e18869 100644 --- a/src/dumper_thread.h +++ b/src/dumper_thread.h @@ -61,7 +61,8 @@ struct UserData { cr_ChecksumType checksum_type; // Constant representing selected checksum const char *checksum_cachedir; // Dir with cached checksums gboolean skip_symlinks; // Skip symlinks - long package_count; // Total number of packages to process + long task_count; // Total number of task to process + long package_count; // Total number of packages processed // Update stuff gboolean skip_stat; // Skip stat() while updating diff --git a/src/threads.c b/src/threads.c index aee07d1..844e900 100644 --- a/src/threads.c +++ b/src/threads.c @@ -21,6 +21,7 @@ #include "threads.h" #include "error.h" #include "misc.h" +#include "dumper_thread.h" #define ERR_DOMAIN CREATEREPO_C_ERROR @@ -108,6 +109,28 @@ cr_compressing_thread(gpointer data, G_GNUC_UNUSED gpointer user_data) } } +void +cr_rewrite_pkg_count_thread(gpointer data, gpointer user_data) +{ + cr_CompressionTask *task = data; + struct UserData *ud = user_data; + GError *tmp_err = NULL; + + assert(task); + + cr_rewrite_header_package_count(task->src, + task->type, + ud->package_count, + ud->task_count, + task->stat, + &tmp_err); + + if (tmp_err) { + // Error encountered + g_propagate_error(&task->err, tmp_err); + } +} + /** Parallel Repomd Record Fill */ cr_RepomdRecordFillTask * diff --git a/src/threads.h b/src/threads.h index 2d554cd..19ba917 100644 --- a/src/threads.h +++ b/src/threads.h @@ -150,6 +150,11 @@ cr_repomdrecordfilltask_free(cr_RepomdRecordFillTask *task, GError **err); void cr_repomd_record_fill_thread(gpointer data, gpointer user_data); +/** Function for GThread Pool. + */ +void +cr_rewrite_pkg_count_thread(gpointer data, gpointer user_data); + /** @} */ #ifdef __cplusplus diff --git a/src/xml_file.c b/src/xml_file.c index 65fb945..1d670ae 100644 --- a/src/xml_file.c +++ b/src/xml_file.c @@ -18,8 +18,10 @@ */ #include +#include #include #include "xml_file.h" +#include #include "error.h" #include "xml_dump.h" #include "compression_wrapper.h" @@ -40,6 +42,9 @@ #define XML_PRESTODELTA_HEADER XML_HEADER"\n" #define XML_UPDATEINFO_HEADER XML_HEADER"\n" +#define XML_MAX_HEADER_SIZE 300 +#define XML_RECOMPRESS_BUFFER_SIZE 8192 + #define XML_PRIMARY_FOOTER "" #define XML_FILELISTS_FOOTER "" #define XML_OTHER_FOOTER "" @@ -317,3 +322,121 @@ cr_xmlfile_close(cr_XmlFile *f, GError **err) return CRE_OK; } + +static int +write_modified_header(int task_count, + int package_count, + cr_XmlFile *cr_file, + gchar *header_buf, + int header_len, + GError **err) +{ + GError *tmp_err = NULL; + gchar *package_count_string; + gchar *task_count_string; + int bytes_written = 0; + int package_count_string_len = rasprintf(&package_count_string, "packages=\"%i\"", package_count); + int task_count_string_len = rasprintf(&task_count_string, "packages=\"%i\"", task_count); + + gchar *pointer_to_pkgs = strstr(header_buf, task_count_string); + if (!pointer_to_pkgs){ + g_free(package_count_string); + g_free(task_count_string); + return 0; + } + gchar *pointer_to_pkgs_end = pointer_to_pkgs + task_count_string_len; + + bytes_written += cr_write(cr_file->f, header_buf, pointer_to_pkgs - header_buf, &tmp_err); + if (!tmp_err) + bytes_written += cr_write(cr_file->f, package_count_string, package_count_string_len, &tmp_err); + if (!tmp_err) + bytes_written += cr_write(cr_file->f, pointer_to_pkgs_end, header_len - (pointer_to_pkgs_end - header_buf), &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while writing header part:"); + g_free(package_count_string); + g_free(task_count_string); + return 0; + } + g_free(package_count_string); + g_free(task_count_string); + return bytes_written; +} + +void +cr_rewrite_header_package_count(gchar *original_filename, + cr_CompressionType xml_compression, + int package_count, + int task_count, + cr_ContentStat *file_stat, + GError **err) +{ + GError *tmp_err = NULL; + CR_FILE *original_file = cr_open(original_filename, CR_CW_MODE_READ, CR_CW_AUTO_DETECT_COMPRESSION, &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while reopening for reading:"); + return; + } + + gchar *tmp_xml_filename = g_strconcat(original_filename, ".tmp", NULL); + cr_XmlFile *new_file = cr_xmlfile_sopen_primary(tmp_xml_filename, + xml_compression, + file_stat, + &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while opening for writing:"); + cr_close(original_file, NULL); + g_free(tmp_xml_filename); + return; + } + + gchar header_buf[XML_MAX_HEADER_SIZE]; + int len_read = cr_read(original_file, header_buf, XML_MAX_HEADER_SIZE, &tmp_err); + if (!tmp_err) + write_modified_header(task_count, package_count, new_file, header_buf, len_read, &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while recompressing:"); + cr_xmlfile_close(new_file, NULL); + cr_close(original_file, NULL); + g_free(tmp_xml_filename); + return; + } + //Copy the rest of the file + gchar copy_buf[XML_RECOMPRESS_BUFFER_SIZE]; + while(len_read) + { + len_read = cr_read(original_file, copy_buf, XML_RECOMPRESS_BUFFER_SIZE, &tmp_err); + if (!tmp_err) + cr_write(new_file->f, copy_buf, len_read, &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while recompressing:"); + cr_xmlfile_close(new_file, NULL); + cr_close(original_file, NULL); + g_free(tmp_xml_filename); + return; + } + } + + new_file->header = 1; + new_file->footer = 1; + + cr_xmlfile_close(new_file, &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while writing:"); + cr_close(original_file, NULL); + g_free(tmp_xml_filename); + return; + } + cr_close(original_file, &tmp_err); + if (tmp_err) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while writing:"); + g_free(tmp_xml_filename); + return; + } + + if (g_rename(tmp_xml_filename, original_filename) == -1) { + g_propagate_prefixed_error(err, tmp_err, "Error encountered while renaming:"); + g_free(tmp_xml_filename); + return; + } + g_free(tmp_xml_filename); +} diff --git a/src/xml_file.h b/src/xml_file.h index 96ef5e3..6ac4c97 100644 --- a/src/xml_file.h +++ b/src/xml_file.h @@ -221,6 +221,21 @@ int cr_xmlfile_add_chunk(cr_XmlFile *f, const char *chunk, GError **err); */ int cr_xmlfile_close(cr_XmlFile *f, GError **err); +/** Rewrite package count field in repodata header in xml file. + * In order to do this we have to decompress and after the change + * compress the whole file again, so entirely new file is created. + * @param original_filename Current file with wrong value in header + * @param package_count Actual package count (desired value in header) + * @param task_count Task count (current value in header) + * @param file_stat cr_ContentStat for stats of the new file, it will be modified + * @param err **GError + */ +void cr_rewrite_header_package_count(gchar *original_filename, + cr_CompressionType xml_compression, + int package_count, + int task_count, + cr_ContentStat *file_stat, + GError **err); /** @} */ -- libgit2 0.27.8