dcavalca / rpms / rpm

Forked from rpms/rpm 2 years ago
Clone
Blob Blame History Raw
From a5803faa083690526b96484c7e6a4cc915ca3921 Mon Nov 28 16:35:09 2022
From: Aleksandr Kazakov <alexkazakov@meta.com>
Date: Mon, 28 Nov 2022 16:35:09 +0000
Subject: [PATCH] Backport multi-threaded zstd to 4.14.x to support
 multi-threaded zstd compression on centos 8

Signed-off-by: Aleksandr Kazakov <alexkazakov@meta.com>
---
 configure.ac  |  2 +-
 macros.in     |  1 +
 rpmio/rpmio.c | 82 +++++++++++++++++++++++++++++++++++----------------
 3 files changed, 58 insertions(+), 27 deletions(-)

diff --git a/configure.ac b/configure.ac
index 47327bd..b1213ae 100644
--- a/configure.ac
+++ b/configure.ac
@@ -214,7 +214,7 @@ AC_ARG_ENABLE([zstd],
               [enable_zstd=auto])
 
 AS_IF([test "x$enable_zstd" != "xno"], [
-  PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no])
+  PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.3.8], [have_zstd=yes], [have_zstd=no])
   AS_IF([test "$enable_zstd" = "yes"], [
     if test "$have_zstd" = "no"; then
       AC_MSG_ERROR([--enable-zstd specified, but not available])
diff --git a/macros.in b/macros.in
index 9b9fe23..832b60a 100644
--- a/macros.in
+++ b/macros.in
@@ -394,6 +394,7 @@ package or when debugging this package.\
 #		"w9.bzdio"	bzip2 level 9.
 #		"w6.xzdio"	xz level 6, xz's default.
 #		"w7T16.xzdio"	xz level 7 using 16 thread (xz only)
+#		"w19T8.zstdio"	zstd level 19 using 8 threads
 #		"w6.lzdio"	lzma-alone level 6, lzma's default
 #
 #%_source_payload	w9.gzdio
diff --git a/rpmio/rpmio.c b/rpmio/rpmio.c
index 09b5d02..d030a9c 100644
--- a/rpmio/rpmio.c
+++ b/rpmio/rpmio.c
@@ -1070,6 +1070,7 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
     char *t = stdio;
     char *te = t + sizeof(stdio) - 2;
     int c;
+    int threads = 0;
 
     switch ((c = *s++)) {
     case 'a':
@@ -1098,7 +1099,14 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
 	    flags &= ~O_ACCMODE;
 	    flags |= O_RDWR;
 	    continue;
-	    break;
+	case 'T':
+	    if (*s >= '0' && *s <= '9') {
+		threads = strtol(s, (char **)&s, 10);
+		/* T0 means automatic detection */
+		if (threads == 0)
+		    threads = sysconf(_SC_NPROCESSORS_ONLN);
+	    }
+	    continue;
 	default:
 	    if (c >= (int)'0' && c <= (int)'9') {
 		level = strtol(s-1, (char **)&s, 10);
@@ -1132,10 +1140,16 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
 	}
 	nb = ZSTD_DStreamInSize();
     } else {					/* compressing */
-	if ((_stream = (void *) ZSTD_createCStream()) == NULL
-	 || ZSTD_isError(ZSTD_initCStream(_stream, level))) {
+	if ((_stream = (void *) ZSTD_createCCtx()) == NULL
+	 || ZSTD_isError(ZSTD_CCtx_setParameter(_stream, ZSTD_c_compressionLevel, level))) {
 	    goto err;
 	}
+
+	rpmlog(RPMLOG_DEBUG, "using %i threads in zstd compression\n", threads);
+	if (threads > 0) {
+	    if (ZSTD_isError (ZSTD_CCtx_setParameter(_stream, ZSTD_c_nbWorkers, threads)))
+		rpmlog(RPMLOG_WARNING, "zstd library does not support multi-threading\n");
+	}
 	nb = ZSTD_CStreamOutSize();
     }
 
@@ -1155,7 +1169,7 @@ err:
     if ((flags & O_ACCMODE) == O_RDONLY)
 	ZSTD_freeDStream(_stream);
     else
-	ZSTD_freeCStream(_stream);
+	ZSTD_freeCCtx(_stream);
     return NULL;
 }
 
@@ -1181,16 +1195,24 @@ assert(zstd);
 	rc = 0;
     } else {					/* compressing */
 	/* close frame */
-	zstd->zob.dst  = zstd->b;
-	zstd->zob.size = zstd->nb;
-	zstd->zob.pos  = 0;
-	int xx = ZSTD_flushStream(zstd->_stream, &zstd->zob);
-	if (ZSTD_isError(xx))
-	    fps->errcookie = ZSTD_getErrorName(xx);
-	else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp))
-	    fps->errcookie = "zstdFlush fwrite failed.";
-	else
-	    rc = 0;
+	int xx;
+	do {
+	  ZSTD_inBuffer zib = { NULL, 0, 0 };
+	  zstd->zob.dst  = zstd->b;
+	  zstd->zob.size = zstd->nb;
+	  zstd->zob.pos  = 0;
+	  xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_flush);
+	  if (ZSTD_isError(xx)) {
+	      fps->errcookie = ZSTD_getErrorName(xx);
+	      break;
+	  }
+	  else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) {
+	      fps->errcookie = "zstdClose fwrite failed.";
+	      break;
+	  }
+	  else
+	      rc = 0;
+	} while (xx != 0);
     }
     return rc;
 }
@@ -1235,7 +1257,7 @@ assert(zstd);
 	zstd->zob.pos  = 0;
 
 	/* Compress next chunk. */
-        int xx = ZSTD_compressStream(zstd->_stream, &zstd->zob, &zib);
+        int xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_continue);
         if (ZSTD_isError(xx)) {
 	    fps->errcookie = ZSTD_getErrorName(xx);
 	    return -1;
@@ -1264,17 +1286,25 @@ assert(zstd);
 	ZSTD_freeDStream(zstd->_stream);
     } else {					/* compressing */
 	/* close frame */
-	zstd->zob.dst  = zstd->b;
-	zstd->zob.size = zstd->nb;
-	zstd->zob.pos  = 0;
-	int xx = ZSTD_endStream(zstd->_stream, &zstd->zob);
-	if (ZSTD_isError(xx))
-	    fps->errcookie = ZSTD_getErrorName(xx);
-	else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp))
-	    fps->errcookie = "zstdClose fwrite failed.";
-	else
-	    rc = 0;
-	ZSTD_freeCStream(zstd->_stream);
+	int xx;
+	do {
+	  ZSTD_inBuffer zib = { NULL, 0, 0 };
+	  zstd->zob.dst  = zstd->b;
+	  zstd->zob.size = zstd->nb;
+	  zstd->zob.pos  = 0;
+	  xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_end);
+	  if (ZSTD_isError(xx)) {
+	      fps->errcookie = ZSTD_getErrorName(xx);
+	      break;
+	  }
+	  else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) {
+	      fps->errcookie = "zstdClose fwrite failed.";
+	      break;
+	  }
+	  else
+	      rc = 0;
+	} while (xx != 0);
+	ZSTD_freeCCtx(zstd->_stream);
     }
 
     if (zstd->fp && fileno(zstd->fp) > 2)
-- 
2.38.1