From edcff1cd4f1a3eb7f44bc260d5f0e48b2d15a6e1 Mon Sep 17 00:00:00 2001 From: Przemyslaw Kadej Date: Tue, 2 Jun 2026 16:12:16 +0200 Subject: [PATCH] script --- scripts/dem_parser.py | 748 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 748 insertions(+) create mode 100644 scripts/dem_parser.py diff --git a/scripts/dem_parser.py b/scripts/dem_parser.py new file mode 100644 index 0000000..e1deb1f --- /dev/null +++ b/scripts/dem_parser.py @@ -0,0 +1,748 @@ +#!/usr/bin/env python3 +""" +Q2Pro demo (.dm2) parser - extracts player stats and generates Markdown reports. + +Usage: + python dem_parser.py demo.dm2 # stdout + python dem_parser.py demo.dm2 -o out.md # output file + python dem_parser.py *.dm2 # batch - foo_report.md per file + python dem_parser.py /path/to/dir/ # batch - all .dm2 in directory + python dem_parser.py *.dm2 -y # batch, always continue on errors +""" + +import struct +import re +import sys +import os +import zlib +import argparse +from pathlib import Path + +# svc_ message types (inc/common/protocol.h) +SVC_MUZZLEFLASH = 1 +SVC_MUZZLEFLASH2 = 2 +SVC_TEMP_ENTITY = 3 +SVC_LAYOUT = 4 +SVC_INVENTORY = 5 +SVC_NOP = 6 +SVC_DISCONNECT = 7 +SVC_RECONNECT = 8 +SVC_SOUND = 9 +SVC_PRINT = 10 +SVC_STUFFTEXT = 11 +SVC_SERVERDATA = 12 +SVC_CONFIGSTRING = 13 +SVC_SPAWNBASELINE = 14 +SVC_CENTERPRINT = 15 +SVC_DOWNLOAD = 16 +SVC_PLAYERINFO = 17 +SVC_PACKETENTITIES = 18 +SVC_DELTAPACKETENTITIES = 19 +SVC_FRAME = 20 +SVC_ZPACKET = 21 +SVC_ZDOWNLOAD = 22 +SVC_GAMESTATE = 23 +SVC_SETTING = 24 + +# Configstring indices - new protocol (34/36, inc/shared/shared.h) +CS_MAXCLIENTS = 60 +CS_MODELS = 62 +CS_SOUNDS = CS_MODELS + 8192 +CS_IMAGES = CS_SOUNDS + 2048 +CS_LIGHTS = CS_IMAGES + 2048 +CS_ITEMS = CS_LIGHTS + 256 +CS_PLAYERSKINS = CS_ITEMS + 256 # = 12862 — "name\model\skin" +CS_GENERAL = CS_PLAYERSKINS + 256 # = 13118 + +# OpenTDM configstring offsets (CS_GENERAL + N, g_local.h:139) +CS_TDM_TEAM_A_NAME = CS_GENERAL + 0 +CS_TDM_TEAM_B_NAME = CS_GENERAL + 1 +CS_TDM_TEAM_A_STATUS = CS_GENERAL + 2 +CS_TDM_TEAM_B_STATUS = CS_GENERAL + 3 + +# Old protocol configstring indices (protocol 26, also used by opentdm/openffa) +CS_PLAYERSKINS_OLD = 1312 +CS_GENERAL_OLD = 1568 + +STAT_FRAGS = 14 # g_local.h:65 (opentdm), q_shared.h:1029 (openffa) + +EOF_MARKER = 0xFFFFFFFF + + +class ParseError(Exception): + pass + + +# --------------------------------------------------------------------------- +# Binary buffer reader +# --------------------------------------------------------------------------- + +class Buffer: + def __init__(self, data): + self.data = data if isinstance(data, (bytes, bytearray)) else bytes(data) + self.pos = 0 + + def remaining(self): + return len(self.data) - self.pos + + def read_byte(self): + if self.pos >= len(self.data): + raise ParseError("Unexpected end of buffer (byte)") + v = self.data[self.pos] + self.pos += 1 + return v + + def read_short(self): + if self.pos + 2 > len(self.data): + raise ParseError("Unexpected end of buffer (short)") + v = struct.unpack_from(' len(self.data): + raise ParseError("Unexpected end of buffer (ushort)") + v = struct.unpack_from(' len(self.data): + raise ParseError("Unexpected end of buffer (long)") + v = struct.unpack_from(' len(self.data): + raise ParseError(f"Cannot skip {n} bytes ({self.remaining()} remaining)") + self.pos += n + + def read_bytes(self, n): + if self.pos + n > len(self.data): + raise ParseError(f"Cannot read {n} bytes") + v = self.data[self.pos:self.pos + n] + self.pos += n + return v + + +# --------------------------------------------------------------------------- +# Demo state +# --------------------------------------------------------------------------- + +class DemoState: + def __init__(self): + self.protocol = 34 + self.gamedir = '' + self.mapname = '' + self.client_num = -1 + self.configstrings = {} + self.cvars = {} # server cvars from svc_stufftext "set" commands + self.layouts = [] # all layout strings found + self.truncated = False # demo ended without svc_disconnect + self.had_disconnect = False + + +# --------------------------------------------------------------------------- +# Message processing +# --------------------------------------------------------------------------- + +def process_messages(buf, state): + """Process messages from buffer until svc_frame or unknown message.""" + while buf.remaining() > 0: + cmd = buf.read_byte() + + if cmd == SVC_NOP: + continue + + elif cmd in (SVC_DISCONNECT, SVC_RECONNECT): + if cmd == SVC_DISCONNECT: + state.had_disconnect = True + break + + elif cmd == SVC_SERVERDATA: + state.protocol = buf.read_long() + buf.read_long() # server count + buf.read_byte() # attract loop + state.gamedir = buf.read_string() + state.client_num = buf.read_short() + state.mapname = buf.read_string() + + elif cmd == SVC_CONFIGSTRING: + index = buf.read_ushort() + value = buf.read_string() + state.configstrings[index] = value + + elif cmd == SVC_LAYOUT: + layout = buf.read_string() + state.layouts.append(layout) + + elif cmd == SVC_PRINT: + buf.read_byte() # print level + buf.read_string() + + elif cmd == SVC_STUFFTEXT: + text = buf.read_string() + # capture "set varname value" server-info cvars + for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text): + state.cvars[m.group(1)] = m.group(2) + + elif cmd == SVC_CENTERPRINT: + buf.read_string() + + elif cmd == SVC_SETTING: + buf.skip(8) # 2 × int32 + + elif cmd in (SVC_MUZZLEFLASH, SVC_MUZZLEFLASH2): + buf.skip(3) # entity (short) + weapon (byte) + + elif cmd == SVC_ZPACKET: + len_in = buf.read_ushort() + len_out = buf.read_ushort() # decompressed size hint + compressed = buf.read_bytes(len_in) + try: + decompressed = zlib.decompress(compressed) + inner = Buffer(decompressed) + process_messages(inner, state) + except zlib.error: + break # corrupt compressed data, skip packet + + elif cmd in (SVC_FRAME, SVC_PLAYERINFO, SVC_PACKETENTITIES, + SVC_DELTAPACKETENTITIES, SVC_SPAWNBASELINE, + SVC_GAMESTATE, SVC_DOWNLOAD, SVC_TEMP_ENTITY, + SVC_INVENTORY, SVC_ZDOWNLOAD, SVC_SOUND): + # Frame data or too complex to skip — stop processing this packet + break + + else: + # Unknown message type — stop + break + + +# --------------------------------------------------------------------------- +# File parsing +# --------------------------------------------------------------------------- + +def parse_demo(path): + """Read and parse a .dm2 demo file. Returns DemoState.""" + state = DemoState() + + with open(path, 'rb') as f: + data = f.read() + + pos = 0 + while pos < len(data): + if pos + 4 > len(data): + state.truncated = True + break + + msglen = struct.unpack_from(' len(data): + # Truncated final packet — parse what we have + state.truncated = True + msglen = len(data) - pos + + packet = data[pos:pos + msglen] + pos += msglen + + try: + buf = Buffer(packet) + process_messages(buf, state) + except ParseError: + state.truncated = True + # Continue with next packet + + if not state.had_disconnect: + state.truncated = True + + return state + + +# --------------------------------------------------------------------------- +# Layout string parsing +# --------------------------------------------------------------------------- + +_HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping', + 'net', 'eff', 'fph', 'time', 'player', 'rank'} + + +def _is_header(text): + words = set(text.lower().split()) + return bool(words & _HEADER_WORDS) + + +def _parse_layout_entries(layout): + """Parse layout into list of (yv_position, string_content) tuples.""" + entries = [] + for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout): + entries.append((int(m.group(1)), m.group(2))) + return entries + + +def _parse_player_tdm(content): + """ + Parse a single OpenTDM player line (fixed-width format). + Format: "%-15.15s %4d %3d %3d %3d" → name, frags, deaths, net, ping + Returns dict or None. + """ + if len(content) < 20: + return None + name = content[:15].rstrip() + if not name or _is_header(name): + return None + nums = re.findall(r'-?\d+', content[15:]) + if len(nums) < 2: + return None + try: + return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])} + except ValueError: + return None + + +def _parse_player_ffa(content): + """ + Parse a single OpenFFA player line (fixed-width format). + Format: "%2d %-15s %3d %3d %3d %4d %4s %4d" → rank, name, frags, deaths, eff, ... + Offsets: [0:2]=rank, [3:18]=name, [19:22]=frags, [23:26]=deaths, [27:30]=eff + Returns dict or None. + """ + if len(content) < 27: + return None + rank_s = content[0:2].strip() + if not rank_s.isdigit(): + return None + name = content[3:18].rstrip() if len(content) >= 18 else content[3:].rstrip() + if not name: + return None + nums = re.findall(r'-?\d+', content[18:]) + if len(nums) < 3: + return None + try: + return { + 'rank': int(rank_s), + 'name': name, + 'frags': int(nums[0]), + 'deaths': int(nums[1]), + 'eff': int(nums[2]), + } + except ValueError: + return None + + +def parse_layout_tdm(layout): + """ + Parse an OpenTDM scoreboard layout string. + Returns (team_a_players, team_b_players, team_a_score, team_b_score). + Each player: {'name', 'frags', 'deaths'}. + + OpenTDM layout places both teams side by side: + Column A: yv 56, 64, ... (between header A at yv 48 and header B) + Column B: yv 96, 104, ... (below header B at yv 88) + """ + entries = _parse_layout_entries(layout) + + # Team scores appear at small yv positions (yv 8, yv 16) + # Format: "TeamName SCORE" — last token is the score + team_scores = [] + for yv, content in sorted(entries, key=lambda x: x[0]): + if yv > 30 or _is_header(content) or not content.strip(): + continue + parts = content.rsplit(None, 1) + if len(parts) == 2: + try: + score = int(parts[1]) + team_scores.append(score) + except ValueError: + pass + + team_a_score = team_scores[0] if len(team_scores) > 0 else 0 + team_b_score = team_scores[1] if len(team_scores) > 1 else 0 + + # Column header positions (string2 containing " Name " and "Frags") + header_yvs = sorted( + yv for yv, c in entries + if 'Name' in c and 'Frags' in c + ) + + # Team A: entries between first and second header yv + # Team B: entries below second header yv + split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999 + first_header_yv = header_yvs[0] if header_yvs else 0 + + team_a_players = [] + team_b_players = [] + + for yv, content in entries: + if _is_header(content) or not content.strip(): + continue + p = _parse_player_tdm(content) + if p is None: + continue + if first_header_yv < yv <= split_yv: + team_a_players.append(p) + elif yv > split_yv: + team_b_players.append(p) + + return team_a_players, team_b_players, team_a_score, team_b_score + + +def parse_layout_ffa(layout): + """ + Parse an OpenFFA/DM scoreboard layout string. + Returns list of {'rank', 'name', 'frags', 'deaths', 'eff'}. + """ + entries = _parse_layout_entries(layout) + players = [] + + for _yv, content in entries: + if _is_header(content): + continue + p = _parse_player_ffa(content) + if p: + players.append(p) + + return players + + +# --------------------------------------------------------------------------- +# State helpers +# --------------------------------------------------------------------------- + +def _cs_gen(state): + """Return CS_GENERAL base index — old or new, detected from demo content.""" + if any(state.configstrings.get(CS_GENERAL_OLD + i) for i in range(8)): + return CS_GENERAL_OLD + return CS_GENERAL + + +def _cs_skins(state): + """Return CS_PLAYERSKINS base index — old or new.""" + if any(state.configstrings.get(CS_PLAYERSKINS_OLD + i) for i in range(8)): + return CS_PLAYERSKINS_OLD + return CS_PLAYERSKINS + + +def is_opentdm(state): + cs_gen = _cs_gen(state) + return ( + 'opentdm' in state.gamedir.lower() + or state.configstrings.get(cs_gen + 0, '').strip() != '' + or state.configstrings.get(cs_gen + 1, '').strip() != '' + ) + + +def get_team_names(state): + cs_gen = _cs_gen(state) + team_a = state.configstrings.get(cs_gen + 0, '').strip() or 'Team A' + team_b = state.configstrings.get(cs_gen + 1, '').strip() or 'Team B' + return team_a, team_b + + +def get_team_scores_from_cs(state): + """Return (score_a, score_b) from CS_TDM_TEAM_A/B_STATUS configstrings.""" + cs_gen = _cs_gen(state) + try: + score_a = int(state.configstrings.get(cs_gen + 2, '').strip()) + except ValueError: + score_a = None + try: + score_b = int(state.configstrings.get(cs_gen + 3, '').strip()) + except ValueError: + score_b = None + return score_a, score_b + + +def get_timelimit(state): + """Return timelimit in minutes, or None if not found.""" + # OpenTDM uses g_match_time (seconds); fallback to timelimit (minutes) + if 'g_match_time' in state.cvars: + try: + secs = float(state.cvars['g_match_time']) + if secs > 0: + return int(secs // 60) + except ValueError: + pass + if 'timelimit' in state.cvars: + try: + mins = float(state.cvars['timelimit']) + if mins > 0: + return int(mins) + except ValueError: + pass + return None + + +def get_player_names(state): + """Return {slot_index: name} from CS_PLAYERSKINS configstrings.""" + cs_base = _cs_skins(state) + players = {} + for i in range(256): + val = state.configstrings.get(cs_base + i, '').strip() + if val: + name = val.split('\\')[0].strip() + if name: + players[i] = name + return players + + +# --------------------------------------------------------------------------- +# Markdown report generation +# --------------------------------------------------------------------------- + +_WARN_TRUNCATED = ( + "\n> ⚠ **Demo truncated** — the recording may have ended abruptly (e.g. system reset). " + "Stats are from the last available scoreboard.\n" +) +_WARN_NO_LAYOUT = ( + "\n> ⚠ **Demo truncated** — no scoreboard found in file. " + "Deaths unavailable (`N/A`). Frags from last known frame.\n" +) + + +def _md_table(headers, rows): + lines = [] + lines.append('| ' + ' | '.join(headers) + ' |') + lines.append('|' + '|'.join('-' * (len(h) + 2) for h in headers) + '|') + for row in rows: + lines.append('| ' + ' | '.join(str(c) for c in row) + ' |') + return '\n'.join(lines) + + +def _meta_line(state): + """Return a metadata line with map name and timelimit.""" + mapname = state.mapname or 'unknown' + timelimit = get_timelimit(state) + tl_str = f"{timelimit} min" if timelimit else "N/A" + return f"**Map:** {mapname} | **Timelimit:** {tl_str}" + + +def generate_report_ffa(state, players, truncated, no_layout=False): + mod = state.gamedir or 'DM' + mapname = state.mapname or 'unknown' + lines = [f"# Demo: {mapname} — {mod}", "", _meta_line(state), ""] + + if no_layout: + lines.append(_WARN_NO_LAYOUT) + elif truncated: + lines.append(_WARN_TRUNCATED) + + if not players: + lines.append("*No player data found in demo file.*") + return '\n'.join(lines) + + players.sort(key=lambda p: p.get('frags', 0), reverse=True) + has_eff = any('eff' in p for p in players) + + if has_eff: + headers = ['#', 'Player', 'Frags', 'Deaths', 'Eff%'] + rows = [ + (p.get('rank', i + 1), p['name'], + p['frags'], p.get('deaths', 'N/A'), f"{p.get('eff', 'N/A')}%") + for i, p in enumerate(players) + ] + else: + headers = ['#', 'Player', 'Frags', 'Deaths'] + rows = [ + (i + 1, p['name'], p['frags'], p.get('deaths', 'N/A')) + for i, p in enumerate(players) + ] + + lines.append(_md_table(headers, rows)) + return '\n'.join(lines) + + +def generate_report_tdm(state, team_a_name, team_b_name, + team_a_players, team_b_players, + team_a_score, team_b_score, + truncated): + mapname = state.mapname or 'unknown' + lines = [f"# Demo: {mapname} — OpenTDM", "", _meta_line(state), ""] + + if truncated: + lines.append(_WARN_TRUNCATED) + + lines.append("## Team scores") + lines.append("") + a_deaths = sum(p['deaths'] for p in team_a_players) + b_deaths = sum(p['deaths'] for p in team_b_players) + lines.append(_md_table( + ['Team', 'Frags', 'Deaths'], + [ + (team_a_name, team_a_score, a_deaths), + (team_b_name, team_b_score, b_deaths), + ] + )) + lines.append("") + + for team_name, team_players in [ + (team_a_name, team_a_players), + (team_b_name, team_b_players), + ]: + lines.append(f"## {team_name}") + lines.append("") + if not team_players: + lines.append("*No data.*") + else: + team_players.sort(key=lambda p: p['frags'], reverse=True) + lines.append(_md_table( + ['Player', 'Frags', 'Deaths'], + [(p['name'], p['frags'], p['deaths']) for p in team_players] + )) + lines.append("") + + return '\n'.join(lines) + + +# --------------------------------------------------------------------------- +# Report builder +# --------------------------------------------------------------------------- + +def build_report(state): + last_layout = state.layouts[-1] if state.layouts else None + truncated = state.truncated + no_layout = last_layout is None + + if is_opentdm(state): + team_a_name, team_b_name = get_team_names(state) + if last_layout: + ta_p, tb_p, ta_score, tb_score = parse_layout_tdm(last_layout) + else: + ta_p, tb_p, ta_score, tb_score = [], [], 0, 0 + # Prefer team scores from configstrings (more reliable than layout parsing) + cs_a, cs_b = get_team_scores_from_cs(state) + if cs_a is not None: + ta_score = cs_a + if cs_b is not None: + tb_score = cs_b + return generate_report_tdm( + state, team_a_name, team_b_name, + ta_p, tb_p, ta_score, tb_score, + truncated + ) + else: + if last_layout: + players = parse_layout_ffa(last_layout) + if not players: + # Fallback: try TDM pattern (some mods use similar format) + tdm_all = parse_layout_tdm(last_layout) + players = [ + {'name': p['name'], 'frags': p['frags'], 'deaths': p['deaths']} + for p in tdm_all[0] + tdm_all[1] + ] + else: + # No layout — fall back to player names from configstrings, no deaths + player_names = get_player_names(state) + players = [{'name': name, 'frags': 0} for name in player_names.values()] + return generate_report_ffa(state, players, truncated, no_layout) + + +# --------------------------------------------------------------------------- +# File processing +# --------------------------------------------------------------------------- + +def process_file(path, output_path=None): + """Parse a single demo file and write/return the report.""" + state = parse_demo(path) + report = build_report(state) + if output_path: + with open(output_path, 'w', encoding='utf-8') as f: + f.write(report) + return None + return report + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + ap = argparse.ArgumentParser( + description='Q2Pro .dm2 demo parser — generates Markdown player stats reports.' + ) + ap.add_argument('inputs', nargs='+', + help='.dm2 file(s) or a directory containing .dm2 files') + ap.add_argument('-o', '--output', + help='Output file (single input file only)') + ap.add_argument('-y', '--always-continue', action='store_true', + help='Always continue on errors without prompting') + args = ap.parse_args() + + # Collect .dm2 files + files = [] + for inp in args.inputs: + p = Path(inp) + if p.is_dir(): + files.extend(sorted(p.glob('*.dm2'))) + elif p.suffix.lower() == '.dm2': + if p.exists(): + files.append(p) + else: + print(f"File not found: {inp}", file=sys.stderr) + else: + print(f"Skipping (not .dm2 or directory): {inp}", file=sys.stderr) + + if not files: + print("No .dm2 files found.", file=sys.stderr) + sys.exit(1) + + if len(files) > 1 and args.output: + print("Error: -o/--output can only be used with a single input file.", file=sys.stderr) + sys.exit(1) + + always_continue = args.always_continue + + for demo_path in files: + try: + if len(files) == 1 and not args.output: + # Single file → stdout + report = process_file(demo_path) + print(report) + elif len(files) == 1 and args.output: + # Single file → specified output file + process_file(demo_path, args.output) + print(f"Report saved: {args.output}") + else: + # Batch mode → foo_report.md + out_path = demo_path.parent / (demo_path.stem + '_report.md') + process_file(demo_path, out_path) + print(f"OK {demo_path.name} -> {out_path.name}") + + except Exception as exc: + print(f"\n[ERROR] {demo_path.name}: {exc}", file=sys.stderr) + + if not always_continue and len(files) > 1: + try: + ans = input( + "Continue? [y]es / [n]o / [a]lways continue: " + ).strip().lower() + except (EOFError, KeyboardInterrupt): + print("\nAborted.", file=sys.stderr) + sys.exit(1) + + if ans == 'n': + print("Aborted.", file=sys.stderr) + sys.exit(1) + elif ans == 'a': + always_continue = True + # 'y' or anything else → continue + + +if __name__ == '__main__': + main()