From 8c0d382a8c62ec6f4f5d69920b76cd4f6bb24e9b Mon Sep 17 00:00:00 2001 From: Aleksandr Kazakov Date: Mon, 6 Feb 2023 19:15:45 +0000 Subject: [PATCH] Backport multi-threaded zstd to 4.16.x Signed-off-by: Aleksandr Kazakov --- configure.ac | 2 +- macros.in | 15 ++++----- rpmio/rpmio.c | 84 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/configure.ac b/configure.ac index 71e9009..28979af 100644 --- a/configure.ac +++ b/configure.ac @@ -215,7 +215,7 @@ AC_ARG_ENABLE([zstd], [enable_zstd=auto]) AS_IF([test "x$enable_zstd" != "xno"], [ - PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no]) + PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.3.8], [have_zstd=yes], [have_zstd=no]) AS_IF([test "$enable_zstd" = "yes"], [ if test "$have_zstd" = "no"; then AC_MSG_ERROR([--enable-zstd specified, but not available]) diff --git a/macros.in b/macros.in index 99eb034..fa85fdb 100644 --- a/macros.in +++ b/macros.in @@ -365,13 +365,14 @@ package or when debugging this package.\ #%packager # Compression type and level for source/binary package payloads. -# "w9.gzdio" gzip level 9 (default). -# "w9.bzdio" bzip2 level 9. -# "w6.xzdio" xz level 6, xz's default. -# "w7T16.xzdio" xz level 7 using 16 thread (xz only) -# "w6.lzdio" lzma-alone level 6, lzma's default -# "w3.zstdio" zstd level 3, zstd's default -# "w.ufdio" uncompressed +# "w9.gzdio" gzip level 9 (default). +# "w9.bzdio" bzip2 level 9. +# "w6.xzdio" xz level 6, xz's default. +# "w7T16.xzdio" xz level 7 using 16 threads +# "w19T8.zstdio" zstd level 19 using 8 threads +# "w6.lzdio" lzma-alone level 6, lzma's default +# "w3.zstdio" zstd level 3, zstd's default +# "w.ufdio" uncompressed # #%_source_payload w9.gzdio #%_binary_payload w9.gzdio diff --git a/rpmio/rpmio.c b/rpmio/rpmio.c index 52be6a3..b5c76a8 100644 --- a/rpmio/rpmio.c +++ b/rpmio/rpmio.c @@ -1073,6 +1073,7 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode) char *t = stdio; char *te = t + sizeof(stdio) - 2; int c; + int threads = 0; switch ((c = *s++)) { case 'a': @@ -1101,7 +1102,14 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode) flags &= ~O_ACCMODE; flags |= O_RDWR; continue; - break; + case 'T': + if (*s >= '0' && *s <= '9') { + threads = strtol(s, (char **)&s, 10); + /* T0 means automatic detection */ + if (threads == 0) + threads = sysconf(_SC_NPROCESSORS_ONLN); + } + continue; default: if (c >= (int)'0' && c <= (int)'9') { level = strtol(s-1, (char **)&s, 10); @@ -1135,11 +1143,17 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode) } nb = ZSTD_DStreamInSize(); } else { /* compressing */ - if ((_stream = (void *) ZSTD_createCStream()) == NULL - || ZSTD_isError(ZSTD_initCStream(_stream, level))) { + if ((_stream = (void *) ZSTD_createCCtx()) == NULL + || ZSTD_isError(ZSTD_CCtx_setParameter(_stream, ZSTD_c_compressionLevel, level))) { goto err; } - nb = ZSTD_CStreamOutSize(); + + rpmlog(RPMLOG_DEBUG, "using %i threads in zstd compression\n", threads); + if (threads > 0) { + if (ZSTD_isError (ZSTD_CCtx_setParameter(_stream, ZSTD_c_nbWorkers, threads))) + rpmlog(RPMLOG_WARNING, "zstd library does not support multi-threading\n"); + } + nb = ZSTD_CStreamOutSize(); } rpmzstd zstd = (rpmzstd) xcalloc(1, sizeof(*zstd)); @@ -1158,7 +1172,7 @@ err: if ((flags & O_ACCMODE) == O_RDONLY) ZSTD_freeDStream(_stream); else - ZSTD_freeCStream(_stream); + ZSTD_freeCCtx(_stream); return NULL; } @@ -1184,16 +1198,24 @@ assert(zstd); rc = 0; } else { /* compressing */ /* close frame */ - zstd->zob.dst = zstd->b; - zstd->zob.size = zstd->nb; - zstd->zob.pos = 0; - int xx = ZSTD_flushStream(zstd->_stream, &zstd->zob); - if (ZSTD_isError(xx)) - fps->errcookie = ZSTD_getErrorName(xx); - else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) - fps->errcookie = "zstdFlush fwrite failed."; - else - rc = 0; + int xx; + do { + ZSTD_inBuffer zib = { NULL, 0, 0 }; + zstd->zob.dst = zstd->b; + zstd->zob.size = zstd->nb; + zstd->zob.pos = 0; + xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_flush); + if (ZSTD_isError(xx)) { + fps->errcookie = ZSTD_getErrorName(xx); + break; + } + else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) { + fps->errcookie = "zstdClose fwrite failed."; + break; + } + else + rc = 0; + } while (xx != 0); } return rc; } @@ -1238,7 +1260,7 @@ assert(zstd); zstd->zob.pos = 0; /* Compress next chunk. */ - int xx = ZSTD_compressStream(zstd->_stream, &zstd->zob, &zib); + int xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_continue); if (ZSTD_isError(xx)) { fps->errcookie = ZSTD_getErrorName(xx); return -1; @@ -1267,17 +1289,25 @@ assert(zstd); ZSTD_freeDStream(zstd->_stream); } else { /* compressing */ /* close frame */ - zstd->zob.dst = zstd->b; - zstd->zob.size = zstd->nb; - zstd->zob.pos = 0; - int xx = ZSTD_endStream(zstd->_stream, &zstd->zob); - if (ZSTD_isError(xx)) - fps->errcookie = ZSTD_getErrorName(xx); - else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) - fps->errcookie = "zstdClose fwrite failed."; - else - rc = 0; - ZSTD_freeCStream(zstd->_stream); + int xx; + do { + ZSTD_inBuffer zib = { NULL, 0, 0 }; + zstd->zob.dst = zstd->b; + zstd->zob.size = zstd->nb; + zstd->zob.pos = 0; + xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_end); + if (ZSTD_isError(xx)) { + fps->errcookie = ZSTD_getErrorName(xx); + break; + } + else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) { + fps->errcookie = "zstdClose fwrite failed."; + break; + } + else + rc = 0; + } while (xx != 0); + ZSTD_freeCCtx(zstd->_stream); } if (zstd->fp && fileno(zstd->fp) > 2) -- 2.38.1