Aleksandr Kazakov 3b19d9
From 8c0d382a8c62ec6f4f5d69920b76cd4f6bb24e9b Mon Sep 17 00:00:00 2001
Aleksandr Kazakov 3b19d9
From: Aleksandr Kazakov <alexkazakov@meta.com>
Aleksandr Kazakov 3b19d9
Date: Mon, 6 Feb 2023 19:15:45 +0000
Aleksandr Kazakov 3b19d9
Subject: [PATCH] Backport multi-threaded zstd to 4.16.x
Aleksandr Kazakov 3b19d9
Aleksandr Kazakov 3b19d9
Signed-off-by: Aleksandr Kazakov <alexkazakov@meta.com>
Aleksandr Kazakov 3b19d9
---
Aleksandr Kazakov 3b19d9
 configure.ac  |  2 +-
Aleksandr Kazakov 3b19d9
 macros.in     | 15 ++++-----
Aleksandr Kazakov 3b19d9
 rpmio/rpmio.c | 84 ++++++++++++++++++++++++++++++++++-----------------
Aleksandr Kazakov 3b19d9
 3 files changed, 66 insertions(+), 35 deletions(-)
Aleksandr Kazakov 3b19d9
Aleksandr Kazakov 3b19d9
diff --git a/configure.ac b/configure.ac
Aleksandr Kazakov 3b19d9
index 71e9009..28979af 100644
Aleksandr Kazakov 3b19d9
--- a/configure.ac
Aleksandr Kazakov 3b19d9
+++ b/configure.ac
Aleksandr Kazakov 3b19d9
@@ -215,7 +215,7 @@ AC_ARG_ENABLE([zstd],
Aleksandr Kazakov 3b19d9
               [enable_zstd=auto])
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
 AS_IF([test "x$enable_zstd" != "xno"], [
Aleksandr Kazakov 3b19d9
-  PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no])
Aleksandr Kazakov 3b19d9
+  PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.3.8], [have_zstd=yes], [have_zstd=no])
Aleksandr Kazakov 3b19d9
   AS_IF([test "$enable_zstd" = "yes"], [
Aleksandr Kazakov 3b19d9
     if test "$have_zstd" = "no"; then
Aleksandr Kazakov 3b19d9
       AC_MSG_ERROR([--enable-zstd specified, but not available])
Aleksandr Kazakov 3b19d9
diff --git a/macros.in b/macros.in
Aleksandr Kazakov 3b19d9
index 99eb034..fa85fdb 100644
Aleksandr Kazakov 3b19d9
--- a/macros.in
Aleksandr Kazakov 3b19d9
+++ b/macros.in
Aleksandr Kazakov 3b19d9
@@ -365,13 +365,14 @@ package or when debugging this package.\
Aleksandr Kazakov 3b19d9
 #%packager
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
 #	Compression type and level for source/binary package payloads.
Aleksandr Kazakov 3b19d9
-#		"w9.gzdio"	gzip level 9 (default).
Aleksandr Kazakov 3b19d9
-#		"w9.bzdio"	bzip2 level 9.
Aleksandr Kazakov 3b19d9
-#		"w6.xzdio"	xz level 6, xz's default.
Aleksandr Kazakov 3b19d9
-#		"w7T16.xzdio"	xz level 7 using 16 thread (xz only)
Aleksandr Kazakov 3b19d9
-#		"w6.lzdio"	lzma-alone level 6, lzma's default
Aleksandr Kazakov 3b19d9
-#		"w3.zstdio"	zstd level 3, zstd's default
Aleksandr Kazakov 3b19d9
-#		"w.ufdio"	uncompressed
Aleksandr Kazakov 3b19d9
+#		"w9.gzdio"		gzip level 9 (default).
Aleksandr Kazakov 3b19d9
+#		"w9.bzdio"		bzip2 level 9.
Aleksandr Kazakov 3b19d9
+#		"w6.xzdio"		xz level 6, xz's default.
Aleksandr Kazakov 3b19d9
+#		"w7T16.xzdio"		xz level 7 using 16 threads
Aleksandr Kazakov 3b19d9
+#		"w19T8.zstdio"		zstd level 19 using 8 threads
Aleksandr Kazakov 3b19d9
+#		"w6.lzdio"		lzma-alone level 6, lzma's default
Aleksandr Kazakov 3b19d9
+#		"w3.zstdio"		zstd level 3, zstd's default
Aleksandr Kazakov 3b19d9
+#		"w.ufdio"		uncompressed
Aleksandr Kazakov 3b19d9
 #
Aleksandr Kazakov 3b19d9
 #%_source_payload	w9.gzdio
Aleksandr Kazakov 3b19d9
 #%_binary_payload	w9.gzdio
Aleksandr Kazakov 3b19d9
diff --git a/rpmio/rpmio.c b/rpmio/rpmio.c
Aleksandr Kazakov 3b19d9
index 52be6a3..b5c76a8 100644
Aleksandr Kazakov 3b19d9
--- a/rpmio/rpmio.c
Aleksandr Kazakov 3b19d9
+++ b/rpmio/rpmio.c
Aleksandr Kazakov 3b19d9
@@ -1073,6 +1073,7 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
Aleksandr Kazakov 3b19d9
     char *t = stdio;
Aleksandr Kazakov 3b19d9
     char *te = t + sizeof(stdio) - 2;
Aleksandr Kazakov 3b19d9
     int c;
Aleksandr Kazakov 3b19d9
+    int threads = 0;
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
     switch ((c = *s++)) {
Aleksandr Kazakov 3b19d9
     case 'a':
Aleksandr Kazakov 3b19d9
@@ -1101,7 +1102,14 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
Aleksandr Kazakov 3b19d9
 	    flags &= ~O_ACCMODE;
Aleksandr Kazakov 3b19d9
 	    flags |= O_RDWR;
Aleksandr Kazakov 3b19d9
 	    continue;
Aleksandr Kazakov 3b19d9
-	    break;
Aleksandr Kazakov 3b19d9
+	case 'T':
Aleksandr Kazakov 3b19d9
+	    if (*s >= '0' && *s <= '9') {
Aleksandr Kazakov 3b19d9
+		threads = strtol(s, (char **)&s, 10);
Aleksandr Kazakov 3b19d9
+		/* T0 means automatic detection */
Aleksandr Kazakov 3b19d9
+		if (threads == 0)
Aleksandr Kazakov 3b19d9
+		    threads = sysconf(_SC_NPROCESSORS_ONLN);
Aleksandr Kazakov 3b19d9
+	    }
Aleksandr Kazakov 3b19d9
+	    continue;
Aleksandr Kazakov 3b19d9
 	default:
Aleksandr Kazakov 3b19d9
 	    if (c >= (int)'0' && c <= (int)'9') {
Aleksandr Kazakov 3b19d9
 		level = strtol(s-1, (char **)&s, 10);
Aleksandr Kazakov 3b19d9
@@ -1135,11 +1143,17 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
Aleksandr Kazakov 3b19d9
 	}
Aleksandr Kazakov 3b19d9
 	nb = ZSTD_DStreamInSize();
Aleksandr Kazakov 3b19d9
     } else {					/* compressing */
Aleksandr Kazakov 3b19d9
-	if ((_stream = (void *) ZSTD_createCStream()) == NULL
Aleksandr Kazakov 3b19d9
-	 || ZSTD_isError(ZSTD_initCStream(_stream, level))) {
Aleksandr Kazakov 3b19d9
+	if ((_stream = (void *) ZSTD_createCCtx()) == NULL
Aleksandr Kazakov 3b19d9
+	 || ZSTD_isError(ZSTD_CCtx_setParameter(_stream, ZSTD_c_compressionLevel, level))) {
Aleksandr Kazakov 3b19d9
 	    goto err;
Aleksandr Kazakov 3b19d9
 	}
Aleksandr Kazakov 3b19d9
-	nb = ZSTD_CStreamOutSize();
Aleksandr Kazakov 3b19d9
+
Aleksandr Kazakov 3b19d9
+        rpmlog(RPMLOG_DEBUG, "using %i threads in zstd compression\n", threads);
Aleksandr Kazakov 3b19d9
+        if (threads > 0) {
Aleksandr Kazakov 3b19d9
+            if (ZSTD_isError (ZSTD_CCtx_setParameter(_stream, ZSTD_c_nbWorkers, threads)))
Aleksandr Kazakov 3b19d9
+            rpmlog(RPMLOG_WARNING, "zstd library does not support multi-threading\n");
Aleksandr Kazakov 3b19d9
+	}
Aleksandr Kazakov 3b19d9
+ 	nb = ZSTD_CStreamOutSize();
Aleksandr Kazakov 3b19d9
     }
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
     rpmzstd zstd = (rpmzstd) xcalloc(1, sizeof(*zstd));
Aleksandr Kazakov 3b19d9
@@ -1158,7 +1172,7 @@ err:
Aleksandr Kazakov 3b19d9
     if ((flags & O_ACCMODE) == O_RDONLY)
Aleksandr Kazakov 3b19d9
 	ZSTD_freeDStream(_stream);
Aleksandr Kazakov 3b19d9
     else
Aleksandr Kazakov 3b19d9
-	ZSTD_freeCStream(_stream);
Aleksandr Kazakov 3b19d9
+	ZSTD_freeCCtx(_stream);
Aleksandr Kazakov 3b19d9
     return NULL;
Aleksandr Kazakov 3b19d9
 }
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
@@ -1184,16 +1198,24 @@ assert(zstd);
Aleksandr Kazakov 3b19d9
 	rc = 0;
Aleksandr Kazakov 3b19d9
     } else {					/* compressing */
Aleksandr Kazakov 3b19d9
 	/* close frame */
Aleksandr Kazakov 3b19d9
-	zstd->zob.dst  = zstd->b;
Aleksandr Kazakov 3b19d9
-	zstd->zob.size = zstd->nb;
Aleksandr Kazakov 3b19d9
-	zstd->zob.pos  = 0;
Aleksandr Kazakov 3b19d9
-	int xx = ZSTD_flushStream(zstd->_stream, &zstd->zob);
Aleksandr Kazakov 3b19d9
-	if (ZSTD_isError(xx))
Aleksandr Kazakov 3b19d9
-	    fps->errcookie = ZSTD_getErrorName(xx);
Aleksandr Kazakov 3b19d9
-	else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp))
Aleksandr Kazakov 3b19d9
-	    fps->errcookie = "zstdFlush fwrite failed.";
Aleksandr Kazakov 3b19d9
-	else
Aleksandr Kazakov 3b19d9
-	    rc = 0;
Aleksandr Kazakov 3b19d9
+	int xx;
Aleksandr Kazakov 3b19d9
+	do {
Aleksandr Kazakov 3b19d9
+	  ZSTD_inBuffer zib = { NULL, 0, 0 };
Aleksandr Kazakov 3b19d9
+	  zstd->zob.dst  = zstd->b;
Aleksandr Kazakov 3b19d9
+	  zstd->zob.size = zstd->nb;
Aleksandr Kazakov 3b19d9
+	  zstd->zob.pos  = 0;
Aleksandr Kazakov 3b19d9
+	  xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_flush);
Aleksandr Kazakov 3b19d9
+	  if (ZSTD_isError(xx)) {
Aleksandr Kazakov 3b19d9
+	      fps->errcookie = ZSTD_getErrorName(xx);
Aleksandr Kazakov 3b19d9
+	      break;
Aleksandr Kazakov 3b19d9
+	  }
Aleksandr Kazakov 3b19d9
+	  else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) {
Aleksandr Kazakov 3b19d9
+	      fps->errcookie = "zstdClose fwrite failed.";
Aleksandr Kazakov 3b19d9
+	      break;
Aleksandr Kazakov 3b19d9
+	  }
Aleksandr Kazakov 3b19d9
+	  else
Aleksandr Kazakov 3b19d9
+	      rc = 0;
Aleksandr Kazakov 3b19d9
+	} while (xx != 0);
Aleksandr Kazakov 3b19d9
     }
Aleksandr Kazakov 3b19d9
     return rc;
Aleksandr Kazakov 3b19d9
 }
Aleksandr Kazakov 3b19d9
@@ -1238,7 +1260,7 @@ assert(zstd);
Aleksandr Kazakov 3b19d9
 	zstd->zob.pos  = 0;
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
 	/* Compress next chunk. */
Aleksandr Kazakov 3b19d9
-        int xx = ZSTD_compressStream(zstd->_stream, &zstd->zob, &zib;;
Aleksandr Kazakov 3b19d9
+        int xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_continue);
Aleksandr Kazakov 3b19d9
         if (ZSTD_isError(xx)) {
Aleksandr Kazakov 3b19d9
 	    fps->errcookie = ZSTD_getErrorName(xx);
Aleksandr Kazakov 3b19d9
 	    return -1;
Aleksandr Kazakov 3b19d9
@@ -1267,17 +1289,25 @@ assert(zstd);
Aleksandr Kazakov 3b19d9
 	ZSTD_freeDStream(zstd->_stream);
Aleksandr Kazakov 3b19d9
     } else {					/* compressing */
Aleksandr Kazakov 3b19d9
 	/* close frame */
Aleksandr Kazakov 3b19d9
-	zstd->zob.dst  = zstd->b;
Aleksandr Kazakov 3b19d9
-	zstd->zob.size = zstd->nb;
Aleksandr Kazakov 3b19d9
-	zstd->zob.pos  = 0;
Aleksandr Kazakov 3b19d9
-	int xx = ZSTD_endStream(zstd->_stream, &zstd->zob);
Aleksandr Kazakov 3b19d9
-	if (ZSTD_isError(xx))
Aleksandr Kazakov 3b19d9
-	    fps->errcookie = ZSTD_getErrorName(xx);
Aleksandr Kazakov 3b19d9
-	else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp))
Aleksandr Kazakov 3b19d9
-	    fps->errcookie = "zstdClose fwrite failed.";
Aleksandr Kazakov 3b19d9
-	else
Aleksandr Kazakov 3b19d9
-	    rc = 0;
Aleksandr Kazakov 3b19d9
-	ZSTD_freeCStream(zstd->_stream);
Aleksandr Kazakov 3b19d9
+	int xx;
Aleksandr Kazakov 3b19d9
+	do {
Aleksandr Kazakov 3b19d9
+	  ZSTD_inBuffer zib = { NULL, 0, 0 };
Aleksandr Kazakov 3b19d9
+	  zstd->zob.dst  = zstd->b;
Aleksandr Kazakov 3b19d9
+	  zstd->zob.size = zstd->nb;
Aleksandr Kazakov 3b19d9
+	  zstd->zob.pos  = 0;
Aleksandr Kazakov 3b19d9
+	  xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_end);
Aleksandr Kazakov 3b19d9
+	  if (ZSTD_isError(xx)) {
Aleksandr Kazakov 3b19d9
+	      fps->errcookie = ZSTD_getErrorName(xx);
Aleksandr Kazakov 3b19d9
+	      break;
Aleksandr Kazakov 3b19d9
+	  }
Aleksandr Kazakov 3b19d9
+	  else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) {
Aleksandr Kazakov 3b19d9
+	      fps->errcookie = "zstdClose fwrite failed.";
Aleksandr Kazakov 3b19d9
+	      break;
Aleksandr Kazakov 3b19d9
+	  }
Aleksandr Kazakov 3b19d9
+	  else
Aleksandr Kazakov 3b19d9
+	      rc = 0;
Aleksandr Kazakov 3b19d9
+	} while (xx != 0);
Aleksandr Kazakov 3b19d9
+	ZSTD_freeCCtx(zstd->_stream);
Aleksandr Kazakov 3b19d9
     }
Aleksandr Kazakov 3b19d9
 
Aleksandr Kazakov 3b19d9
     if (zstd->fp && fileno(zstd->fp) > 2)
Aleksandr Kazakov 3b19d9
-- 
Aleksandr Kazakov 3b19d9
2.38.1
Aleksandr Kazakov 3b19d9