manytags
b01lersc
Task: a service encrypts random one-block AES-GCM messages and corrupts the low 64 bits of each tag with MT19937 output seeded from the AES key. Solution: cancel the affine GHASH term with nullspace relations, solve a large GF(2) system for the MT state, invert CPython seeding, recover the master key, and decrypt the flag.
$ ls tags/ techniques/
manytags — b01lers CTF 2026
Description
too many tags
We are given many_tags.py and a service that reveals the encrypted flag once, then lets us request up to 704 random one-block AES-GCM encryptions. Each query returns (nonce, ciphertext, tag), but the tag is faulty: its lower 64 bits are not the real GCM value.
Goal: recover the AES master key and decrypt the flag.
Analysis
The core bug is in construct_tag:
def construct_tag(real_tag, ghash_value, fault_words): real_tag_int = int.from_bytes(real_tag, "big") fault_value = (fault_words[0] << 32) | fault_words[1] tag_low = (ghash_value ^ fault_value) & MASK64 tag_int = (real_tag_int & (~MASK64)) | tag_low return tag_int.to_bytes(BLOCK_SIZE, "big")
For query messages, the service computes a valid AES-GCM ciphertext, then replaces the low 64 bits of the tag with:
low64(tag) = low64(GHASH_H(C)) XOR fault_value
where fault_value is two consecutive outputs of Python's random.Random.getrandbits(32).
Two details make this exploitable:
- All query plaintexts are exactly one block, so
GHASH_H(C)has a very simple structure. - The fault RNG is seeded as
random.Random(int.from_bytes(master_key, "big")), so recovering the MT19937 state recovers the AES key source.
1. GHASH becomes an affine map on ciphertext bits
For one-block ciphertexts,
GHASH_H(C) = (C · H ⊕ L) · H
where L is the fixed length block for 16-byte messages. Therefore the low 64 bits of GHASH are an affine linear function of the 128 ciphertext bits:
low64(GHASH_H(C)) = A(C) XOR b
for some unknown affine map A and constant b depending only on H.
So every query gives:
tag_low_i = A(C_i) XOR b XOR mt64_i
with mt64_i the 64-bit mask formed from two MT outputs.
2. Nullspace relations cancel the unknown GHASH map
Represent each ciphertext as a 129-bit column [C_i || 1]. Because A(C) XOR b is affine, any XOR relation among those augmented columns also cancels the unknown GHASH contribution.
With 704 one-block samples, we have 704 vectors in a 129-dimensional space, so there are many nullspace relations. For any relation r:
XOR_{i in r} [C_i || 1] = 0
we also get:
XOR_{i in r} tag_low_i = XOR_{i in r} mt64_i
This eliminates AES-GCM entirely and leaves pure linear equations in MT19937 output bits.
3. Symbolic CPython MT19937 over GF(2)
Each query consumes two consecutive getrandbits(32) outputs, so 704 queries give 1408 symbolic 32-bit outputs.
The solver models CPython's MT19937 state transition and tempering over GF(2): each bit of each output word is represented as a linear form in the 19968 unknown state bits (624 * 32). Combining all nullspace relations produces 36832 linear equations.
Those equations are solved with custom bitset Gaussian elimination over GF(2), yielding the seeded MT state.
4. Inverting init_by_array recovers the AES key
CPython does not seed MT with a single 32-bit word here; it seeds from the full big integer derived from the 32-byte AES key. Internally this becomes 8 little-endian 32-bit words passed through the standard init_by_array routine.
Once the seeded state is known, that seeding process can be inverted to recover the 8 seed words, then reassembled into the original 32-byte key:
6497e7d37f36dd962bb195a7ebfd27876053dc4fd1994a32eb2be4c028b12d18
Using that key with the provided flag_nonce, flag_ciphertext, and flag_tag decrypts the flag.
Solution
- Fetch
many_tags.pyand notice that query tags are corrupted only in the low 64 bits. - Observe that for one-block ciphertexts,
low64(GHASH_H(C))is affine in the ciphertext bits. - Collect all 704 query tuples and build nullspace relations on
[ciphertext || 1]. - XOR tags along each relation so the unknown affine GHASH map cancels.
- Model the 1408 MT19937 output words symbolically over GF(2).
- Build and solve the resulting linear system to recover the seeded MT state.
- Invert CPython's long-int
init_by_arrayseeding to recover the 8 seed words. - Reconstruct the 32-byte AES master key and decrypt the flag with AES-GCM.
Recovered master key:
6497e7d37f36dd962bb195a7ebfd27876053dc4fd1994a32eb2be4c028b12d18
Full solver (tasks/b01lersc/manytags/solve.py):
#!/usr/bin/env python3 import argparse import importlib.util import random import re import secrets import socket import ssl import time from cryptography.hazmat.primitives.ciphers.aead import AESGCM MASK32 = 0xFFFFFFFF N = 624 M = 397 MATRIX_A = 0x9908B0DF UPPER_MASK = 0x80000000 LOWER_MASK = 0x7FFFFFFF NVARS = N * 32 QUERY_BUDGET = 704 def recv_until(sock, marker): data = b"" while marker not in data: chunk = sock.recv(4096) if not chunk: raise EOFError(f"connection closed before marker {marker!r}") data += chunk return data def collect_remote(host, port): ctx = ssl.create_default_context() banner_re = re.compile( rb"flag_nonce = ([0-9a-f]+)\s+flag_ciphertext = ([0-9a-f]+)\s+flag_tag = ([0-9a-f]+)", re.S, ) query_re = re.compile( rb"nonce = ([0-9a-f]+)\s+ciphertext = ([0-9a-f]+)\s+tag = ([0-9a-f]+)", re.S ) with socket.create_connection((host, port), timeout=30) as raw: with ctx.wrap_socket(raw, server_hostname=host) as sock: banner = recv_until(sock, b"> ") match = banner_re.search(banner) if not match: raise ValueError("failed to parse banner") flag_nonce = bytes.fromhex(match.group(1).decode()) flag_ciphertext = bytes.fromhex(match.group(2).decode()) flag_tag = bytes.fromhex(match.group(3).decode()) queries = [] for i in range(QUERY_BUDGET): sock.sendall(b"1\n") response = recv_until(sock, b"> ") m = query_re.search(response) if not m: raise ValueError(f"failed to parse query {i}") queries.append( { "nonce": bytes.fromhex(m.group(1).decode()), "ciphertext": bytes.fromhex(m.group(2).decode()), "tag": bytes.fromhex(m.group(3).decode()), } ) if (i + 1) % 100 == 0: print(f"[*] collected {i + 1}/{QUERY_BUDGET} queries") return { "flag_nonce": flag_nonce, "flag_ciphertext": flag_ciphertext, "flag_tag": flag_tag, "queries": queries, } def collect_local(): spec = importlib.util.spec_from_file_location("many_tags", "many_tags.py") mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) queries = [] for _ in range(QUERY_BUDGET): nonce = secrets.token_bytes(12) message = secrets.token_bytes(16) result = mod.encrypt_query_with_rng(mod.master_key, nonce, message, mod.mask_rng) queries.append( { "nonce": nonce, "ciphertext": result["ciphertext"], "tag": result["tag"], } ) flag = b"testflag12345678" flag_nonce = secrets.token_bytes(12) flag_ciphertext, flag_tag = mod.encrypt_flag_message(mod.master_key, flag_nonce, flag) return { "flag_nonce": flag_nonce, "flag_ciphertext": flag_ciphertext, "flag_tag": flag_tag, "queries": queries, "expected_flag": flag, "expected_key": mod.master_key, } def sym_word_var(word_idx): base = word_idx * 32 return [1 << (base + bit) for bit in range(32)] def xorw(a, b): return [x ^ y for x, y in zip(a, b)] def shr(a, n): return [a[i + n] if i + n < 32 else 0 for i in range(32)] def shl(a, n): return [a[i - n] if i >= n else 0 for i in range(32)] def and_const(a, c): return [a[i] if ((c >> i) & 1) else 0 for i in range(32)] def mul_bit_by_const(bit_form, c): return [bit_form if ((c >> i) & 1) else 0 for i in range(32)] def temper_word(x): y = x y = xorw(y, shr(y, 11)) y = xorw(y, and_const(shl(y, 7), 0x9D2C5680)) y = xorw(y, and_const(shl(y, 15), 0xEFC60000)) y = xorw(y, shr(y, 18)) return y def twist_symbolic(mt): mt = mt[:] for i in range(N - M): y = xorw(and_const(mt[i], UPPER_MASK), and_const(mt[i + 1], LOWER_MASK)) mt[i] = xorw(xorw(mt[i + M], shr(y, 1)), mul_bit_by_const(y[0], MATRIX_A)) for i in range(N - M, N - 1): y = xorw(and_const(mt[i], UPPER_MASK), and_const(mt[i + 1], LOWER_MASK)) mt[i] = xorw( xorw(mt[i + (M - N)], shr(y, 1)), mul_bit_by_const(y[0], MATRIX_A), ) y = xorw(and_const(mt[N - 1], UPPER_MASK), and_const(mt[0], LOWER_MASK)) mt[N - 1] = xorw(xorw(mt[M - 1], shr(y, 1)), mul_bit_by_const(y[0], MATRIX_A)) return mt def symbolic_mt_outputs(num_outputs): mt = [sym_word_var(i) for i in range(N)] outputs = [] idx = N twists = 0 for _ in range(num_outputs): if idx >= N: twists += 1 print(f"[*] symbolic twist {twists}") mt = twist_symbolic(mt) idx = 0 outputs.append(temper_word(mt[idx])) idx += 1 return outputs def iter_set_bits(x): while x: lsb = x & -x idx = lsb.bit_length() - 1 yield idx x ^= lsb def nullspace_relations(columns): basis = {} relations = [] for i, value in enumerate(columns): x = value combo = 1 << i while x: pivot = x.bit_length() - 1 if pivot not in basis: basis[pivot] = (x, combo) break x ^= basis[pivot][0] combo ^= basis[pivot][1] else: relations.append(combo) return relations def build_reduced_system(relations, tag_low64, outputs): rows = [] for rel_idx, rel in enumerate(relations): if rel_idx % 50 == 0: print(f"[*] building relation block {rel_idx}/{len(relations)}") rhs64 = 0 acc_hi = [0] * 32 acc_lo = [0] * 32 for i in iter_set_bits(rel): rhs64 ^= tag_low64[i] out_hi = outputs[2 * i] out_lo = outputs[2 * i + 1] for bit in range(32): acc_hi[bit] ^= out_hi[bit] acc_lo[bit] ^= out_lo[bit] for bit in range(32): rows.append(acc_lo[bit] | (((rhs64 >> bit) & 1) << NVARS)) for bit in range(32): rows.append(acc_hi[bit] | (((rhs64 >> (32 + bit)) & 1) << NVARS)) for bit in range(32): rows.append((1 << bit) | (((1 if bit == 31 else 0)) << NVARS)) return rows def gf2_solve(rows, nvars): pivots = {} varmask = (1 << nvars) - 1 for row_idx, row in enumerate(rows): if row_idx % 2000 == 0: print(f"[*] eliminating row {row_idx}/{len(rows)} pivots={len(pivots)}") x = row while x & varmask: pivot = (x & varmask).bit_length() - 1 if pivot not in pivots: pivots[pivot] = x break x ^= pivots[pivot] else: if (x >> nvars) & 1: raise ValueError("inconsistent linear system") print(f"[*] rank = {len(pivots)}") solution = 0 for pivot in sorted(pivots): row = pivots[pivot] rhs = (row >> nvars) & 1 lower = row & ((1 << pivot) - 1) value = rhs ^ ((lower & solution).bit_count() & 1) if value: solution |= 1 << pivot return solution def solution_to_state_words(solution): state = [] for i in range(N): word = 0 for bit in range(32): if (solution >> (32 * i + bit)) & 1: word |= 1 << bit state.append(word) return state def init_genrand_state(seed): mt = [0] * N mt[0] = seed & MASK32 for i in range(1, N): mt[i] = (1812433253 * (mt[i - 1] ^ (mt[i - 1] >> 30)) + i) & MASK32 return mt def f166(x): return ((x ^ (x >> 30)) * 1664525) & MASK32 def f156(x): return ((x ^ (x >> 30)) * 1566083941) & MASK32 def invert_seed_state_to_key_words(state): if state[0] != 0x80000000: raise ValueError("bad seeded MT state") b_state = state[:] a_state = [0] * N a_state[1] = ((b_state[1] + 1) & MASK32) ^ f156(b_state[623]) a_state[2] = ((b_state[2] + 2) & MASK32) ^ f156(a_state[1]) for i in range(3, N): a_state[i] = ((b_state[i] + i) & MASK32) ^ f156(b_state[i - 1]) a_state[0] = a_state[623] g_state = init_genrand_state(19650218) key_words = [] for j in range(8): candidates = set() for i in range(3, N): if (i - 1) % 8 == j: word = (a_state[i] - (g_state[i] ^ f166(a_state[i - 1])) - j) & MASK32 candidates.add(word) if len(candidates) != 1: raise ValueError(f"failed to recover key word {j}: {candidates}") key_words.append(candidates.pop()) return key_words def key_words_to_bytes(key_words): seed_int = 0 for i, word in enumerate(key_words): seed_int |= (word & MASK32) << (32 * i) return seed_int.to_bytes(32, "big") def recover_master_key(queries): columns = [((int.from_bytes(q["ciphertext"], "big")) << 1) | 1 for q in queries] relations = nullspace_relations(columns) print(f"[*] found {len(relations)} nullspace relations") outputs = symbolic_mt_outputs(2 * len(queries)) tag_low64 = [int.from_bytes(q["tag"], "big") & ((1 << 64) - 1) for q in queries] rows = build_reduced_system(relations, tag_low64, outputs) solution = gf2_solve(rows, NVARS) seeded_state = solution_to_state_words(solution) key_words = invert_seed_state_to_key_words(seeded_state) return key_words_to_bytes(key_words) def main(): parser = argparse.ArgumentParser() parser.add_argument("--host", default="manytags.opus4-7.b01le.rs") parser.add_argument("--port", type=int, default=8443) parser.add_argument("--local-test", action="store_true") args = parser.parse_args() start = time.time() if args.local_test: data = collect_local() else: data = collect_remote(args.host, args.port) print(f"[*] collection done in {time.time() - start:.2f}s") master_key = recover_master_key(data["queries"]) print(f"[*] recovered master key: {master_key.hex()}") flag = AESGCM(master_key).decrypt( data["flag_nonce"], data["flag_ciphertext"] + data["flag_tag"], None, ) print(f"[+] flag = {flag.decode(errors='replace')}") if args.local_test: print(f"[*] expected key ok: {master_key == data['expected_key']}") print(f"[*] expected flag ok: {flag == data['expected_flag']}") print(f"[*] total time {time.time() - start:.2f}s") if __name__ == "__main__": main()
$ cat /etc/motd
Liked this one?
Pro unlocks every writeup, every flag, and API access. $9/mo.
$ cat pricing.md