dcavalca / rpms / rpm

Forked from rpms/rpm 2 years ago
Clone
Blob Blame History Raw
From c705a6287f8c7fb5e37dad0ac87257731a41fa69 Mon Sep 17 00:00:00 2001
From: chantra <chantr4@gmail.com>
Date: Sat, 29 Jan 2022 07:00:27 +0000
Subject: [PATCH 08/30] [rpm2extents] Add script to troubleshoot transcoded
 file content This script is essentially dumping the metadata written at the
 end of the transcoded files, it will also be used as part of the end-to-end
 tests.

---
 scripts/rpm2extents_dump | 94 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 94 insertions(+)
 create mode 100755 scripts/rpm2extents_dump

diff --git a/scripts/rpm2extents_dump b/scripts/rpm2extents_dump
new file mode 100755
index 000000000..596a59a49
--- /dev/null
+++ b/scripts/rpm2extents_dump
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+
+import argparse
+import binascii
+import os
+import struct
+import sys
+
+MAGIC_SIZE = 8
+MAGIC_STR = b'KWTSH100'
+
+POS_SIZE = 8
+
+def keep_position(func):
+    def wrapper(*args, **kwargs):
+        curr = args[0].tell()
+        res = func(*args, **kwargs)
+        f.seek(curr, os.SEEK_SET)
+        return res
+    return wrapper
+
+def read_validation_digest(f, validation_offset):
+	digests = []
+    # validation
+	f.seek(validation_offset, os.SEEK_SET)
+	val_content_len, val_digests_num = struct.unpack('=QI', f.read(8+4))
+	for i in range(val_digests_num):
+		algo_name_len, digest_len = struct.unpack('=II', f.read(8))
+		algo_name, digest = struct.unpack(f'{algo_name_len}s{digest_len}s', f.read(algo_name_len+digest_len))
+		digests.append((algo_name, binascii.hexlify(digest)))
+	return digests
+
+
+def read_digests_table(f, digest_offset):
+	digests = []
+    # validation
+	f.seek(digest_offset, os.SEEK_SET)
+	table_len, digest_len = struct.unpack('=II', f.read(8))
+
+	for i in range(table_len):
+		digest, pos = struct.unpack(f'{digest_len}sQ', f.read(digest_len + 8))
+		digests.append((pos, binascii.hexlify(digest)))
+	return digests
+
+def read_signature_output(f, signature_offset):
+    f.seek(signature_offset, os.SEEK_SET)
+    signature_rc, signature_output_len = struct.unpack('=IQ', f.read(12))
+    return signature_rc, f.read(signature_output_len)
+
+@keep_position
+def parse_file(f):
+	digests = []
+	pos_table_offset = f.seek(-8 - 3*POS_SIZE, os.SEEK_END)
+	signature_offset, digest_offset, validation_offset = struct.unpack('=QQQ', f.read(3*POS_SIZE))
+
+	validation_digests = read_validation_digest(f, validation_offset)
+	digests_table = read_digests_table(f, digest_offset)
+	signature_ouput = read_signature_output(f, signature_offset)
+
+	return validation_digests, digests_table, signature_ouput
+
+@keep_position
+def is_transcoded(f):
+    f.seek(-MAGIC_SIZE, os.SEEK_END)
+    magic = f.read(MAGIC_SIZE)
+    return magic == MAGIC_STR
+
+def arg_parse():
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--dump-signature', action='store_true')
+    parser.add_argument('--dump-file-digest-table', action='store_true')
+    parser.add_argument('--dump-digests', action='store_true')
+    parser.add_argument('file')
+
+    return parser.parse_args()
+
+if __name__ == '__main__':
+    args = arg_parse()
+    f = open(args.file, 'rb')
+    if not is_transcoded(f):
+        sys.exit(1)
+
+    validation_digests, digests_table, signature_output = parse_file(f)
+    if(args.dump_file_digest_table):
+        for digest in digests_table:
+            print(f"FileDigest {hex(digest[0])}: {digest[1]}")
+
+    if(args.dump_digests):
+        for validation_digest in validation_digests:
+            print(f"HeaderDigest {validation_digest[0]} {validation_digest[1]}")
+
+    if(args.dump_signature):
+        print(f"RPMSignOutput RC {signature_output[0]}\nRPMSignOutput Content {signature_output[1].decode()}")
+
-- 
2.35.1