From c705a6287f8c7fb5e37dad0ac87257731a41fa69 Mon Sep 17 00:00:00 2001
From: chantra <chantr4@gmail.com>
Date: Sat, 29 Jan 2022 07:00:27 +0000
Subject: [PATCH 08/30] [rpm2extents] Add script to troubleshoot transcoded
file content This script is essentially dumping the metadata written at the
end of the transcoded files, it will also be used as part of the end-to-end
tests.
---
scripts/rpm2extents_dump | 94 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 94 insertions(+)
create mode 100755 scripts/rpm2extents_dump
diff --git a/scripts/rpm2extents_dump b/scripts/rpm2extents_dump
new file mode 100755
index 000000000..596a59a49
--- /dev/null
+++ b/scripts/rpm2extents_dump
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+
+import argparse
+import binascii
+import os
+import struct
+import sys
+
+MAGIC_SIZE = 8
+MAGIC_STR = b'KWTSH100'
+
+POS_SIZE = 8
+
+def keep_position(func):
+ def wrapper(*args, **kwargs):
+ curr = args[0].tell()
+ res = func(*args, **kwargs)
+ f.seek(curr, os.SEEK_SET)
+ return res
+ return wrapper
+
+def read_validation_digest(f, validation_offset):
+ digests = []
+ # validation
+ f.seek(validation_offset, os.SEEK_SET)
+ val_content_len, val_digests_num = struct.unpack('=QI', f.read(8+4))
+ for i in range(val_digests_num):
+ algo_name_len, digest_len = struct.unpack('=II', f.read(8))
+ algo_name, digest = struct.unpack(f'{algo_name_len}s{digest_len}s', f.read(algo_name_len+digest_len))
+ digests.append((algo_name, binascii.hexlify(digest)))
+ return digests
+
+
+def read_digests_table(f, digest_offset):
+ digests = []
+ # validation
+ f.seek(digest_offset, os.SEEK_SET)
+ table_len, digest_len = struct.unpack('=II', f.read(8))
+
+ for i in range(table_len):
+ digest, pos = struct.unpack(f'{digest_len}sQ', f.read(digest_len + 8))
+ digests.append((pos, binascii.hexlify(digest)))
+ return digests
+
+def read_signature_output(f, signature_offset):
+ f.seek(signature_offset, os.SEEK_SET)
+ signature_rc, signature_output_len = struct.unpack('=IQ', f.read(12))
+ return signature_rc, f.read(signature_output_len)
+
+@keep_position
+def parse_file(f):
+ digests = []
+ pos_table_offset = f.seek(-8 - 3*POS_SIZE, os.SEEK_END)
+ signature_offset, digest_offset, validation_offset = struct.unpack('=QQQ', f.read(3*POS_SIZE))
+
+ validation_digests = read_validation_digest(f, validation_offset)
+ digests_table = read_digests_table(f, digest_offset)
+ signature_ouput = read_signature_output(f, signature_offset)
+
+ return validation_digests, digests_table, signature_ouput
+
+@keep_position
+def is_transcoded(f):
+ f.seek(-MAGIC_SIZE, os.SEEK_END)
+ magic = f.read(MAGIC_SIZE)
+ return magic == MAGIC_STR
+
+def arg_parse():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--dump-signature', action='store_true')
+ parser.add_argument('--dump-file-digest-table', action='store_true')
+ parser.add_argument('--dump-digests', action='store_true')
+ parser.add_argument('file')
+
+ return parser.parse_args()
+
+if __name__ == '__main__':
+ args = arg_parse()
+ f = open(args.file, 'rb')
+ if not is_transcoded(f):
+ sys.exit(1)
+
+ validation_digests, digests_table, signature_output = parse_file(f)
+ if(args.dump_file_digest_table):
+ for digest in digests_table:
+ print(f"FileDigest {hex(digest[0])}: {digest[1]}")
+
+ if(args.dump_digests):
+ for validation_digest in validation_digests:
+ print(f"HeaderDigest {validation_digest[0]} {validation_digest[1]}")
+
+ if(args.dump_signature):
+ print(f"RPMSignOutput RC {signature_output[0]}\nRPMSignOutput Content {signature_output[1].decode()}")
+
--
2.35.1