#!/usr/bin/env python3 """ Q2Pro demo (.dm2) parser - extracts player stats and generates Markdown reports. Usage: python dem_parser.py demo.dm2 # stdout python dem_parser.py demo.dm2 -o out.md # output file python dem_parser.py *.dm2 # batch - foo_report.md per file python dem_parser.py /path/to/dir/ # batch - all .dm2 in directory python dem_parser.py *.dm2 -y # batch, always continue on errors """ import struct import re import sys import os import zlib import argparse from pathlib import Path # svc_ message types (inc/common/protocol.h) SVC_MUZZLEFLASH = 1 SVC_MUZZLEFLASH2 = 2 SVC_TEMP_ENTITY = 3 SVC_LAYOUT = 4 SVC_INVENTORY = 5 SVC_NOP = 6 SVC_DISCONNECT = 7 SVC_RECONNECT = 8 SVC_SOUND = 9 SVC_PRINT = 10 SVC_STUFFTEXT = 11 SVC_SERVERDATA = 12 SVC_CONFIGSTRING = 13 SVC_SPAWNBASELINE = 14 SVC_CENTERPRINT = 15 SVC_DOWNLOAD = 16 SVC_PLAYERINFO = 17 SVC_PACKETENTITIES = 18 SVC_DELTAPACKETENTITIES = 19 SVC_FRAME = 20 SVC_ZPACKET = 21 SVC_ZDOWNLOAD = 22 SVC_GAMESTATE = 23 SVC_SETTING = 24 # Configstring indices - new protocol (34/36, inc/shared/shared.h) CS_MAXCLIENTS = 60 CS_MODELS = 62 CS_SOUNDS = CS_MODELS + 8192 CS_IMAGES = CS_SOUNDS + 2048 CS_LIGHTS = CS_IMAGES + 2048 CS_ITEMS = CS_LIGHTS + 256 CS_PLAYERSKINS = CS_ITEMS + 256 # = 12862 — "name\model\skin" CS_GENERAL = CS_PLAYERSKINS + 256 # = 13118 # OpenTDM configstring offsets (CS_GENERAL + N, g_local.h:139) CS_TDM_TEAM_A_NAME = CS_GENERAL + 0 CS_TDM_TEAM_B_NAME = CS_GENERAL + 1 CS_TDM_TEAM_A_STATUS = CS_GENERAL + 2 CS_TDM_TEAM_B_STATUS = CS_GENERAL + 3 # Old protocol configstring indices (protocol 26, also used by opentdm/openffa) CS_PLAYERSKINS_OLD = 1312 CS_GENERAL_OLD = 1568 STAT_FRAGS = 14 # g_local.h:65 (opentdm), q_shared.h:1029 (openffa) EOF_MARKER = 0xFFFFFFFF class ParseError(Exception): pass # --------------------------------------------------------------------------- # Binary buffer reader # --------------------------------------------------------------------------- class Buffer: def __init__(self, data): self.data = data if isinstance(data, (bytes, bytearray)) else bytes(data) self.pos = 0 def remaining(self): return len(self.data) - self.pos def read_byte(self): if self.pos >= len(self.data): raise ParseError("Unexpected end of buffer (byte)") v = self.data[self.pos] self.pos += 1 return v def read_short(self): if self.pos + 2 > len(self.data): raise ParseError("Unexpected end of buffer (short)") v = struct.unpack_from(' len(self.data): raise ParseError("Unexpected end of buffer (ushort)") v = struct.unpack_from(' len(self.data): raise ParseError("Unexpected end of buffer (long)") v = struct.unpack_from(' len(self.data): raise ParseError(f"Cannot skip {n} bytes ({self.remaining()} remaining)") self.pos += n def read_bytes(self, n): if self.pos + n > len(self.data): raise ParseError(f"Cannot read {n} bytes") v = self.data[self.pos:self.pos + n] self.pos += n return v # --------------------------------------------------------------------------- # Demo state # --------------------------------------------------------------------------- class DemoState: def __init__(self): self.protocol = 34 self.gamedir = '' self.mapname = '' self.client_num = -1 self.configstrings = {} self.cvars = {} # server cvars from svc_stufftext "set" commands self.layouts = [] # all layout strings found self.truncated = False # demo ended without svc_disconnect self.had_disconnect = False # --------------------------------------------------------------------------- # Message processing # --------------------------------------------------------------------------- def process_messages(buf, state): """Process messages from buffer until svc_frame or unknown message.""" while buf.remaining() > 0: cmd = buf.read_byte() if cmd == SVC_NOP: continue elif cmd in (SVC_DISCONNECT, SVC_RECONNECT): if cmd == SVC_DISCONNECT: state.had_disconnect = True break elif cmd == SVC_SERVERDATA: state.protocol = buf.read_long() buf.read_long() # server count buf.read_byte() # attract loop state.gamedir = buf.read_string() state.client_num = buf.read_short() state.mapname = buf.read_string() elif cmd == SVC_CONFIGSTRING: index = buf.read_ushort() value = buf.read_string() state.configstrings[index] = value elif cmd == SVC_LAYOUT: layout = buf.read_string() state.layouts.append(layout) elif cmd == SVC_PRINT: buf.read_byte() # print level buf.read_string() elif cmd == SVC_STUFFTEXT: text = buf.read_string() # capture "set varname value" server-info cvars for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text): state.cvars[m.group(1)] = m.group(2) elif cmd == SVC_CENTERPRINT: buf.read_string() elif cmd == SVC_SETTING: buf.skip(8) # 2 × int32 elif cmd in (SVC_MUZZLEFLASH, SVC_MUZZLEFLASH2): buf.skip(3) # entity (short) + weapon (byte) elif cmd == SVC_ZPACKET: len_in = buf.read_ushort() len_out = buf.read_ushort() # decompressed size hint compressed = buf.read_bytes(len_in) try: decompressed = zlib.decompress(compressed) inner = Buffer(decompressed) process_messages(inner, state) except zlib.error: break # corrupt compressed data, skip packet elif cmd in (SVC_FRAME, SVC_PLAYERINFO, SVC_PACKETENTITIES, SVC_DELTAPACKETENTITIES, SVC_SPAWNBASELINE, SVC_GAMESTATE, SVC_DOWNLOAD, SVC_TEMP_ENTITY, SVC_INVENTORY, SVC_ZDOWNLOAD, SVC_SOUND): # Frame data or too complex to skip — stop processing this packet break else: # Unknown message type — stop break # --------------------------------------------------------------------------- # File parsing # --------------------------------------------------------------------------- def parse_demo(path): """Read and parse a .dm2 demo file. Returns DemoState.""" state = DemoState() with open(path, 'rb') as f: data = f.read() pos = 0 while pos < len(data): if pos + 4 > len(data): state.truncated = True break msglen = struct.unpack_from(' len(data): # Truncated final packet — parse what we have state.truncated = True msglen = len(data) - pos packet = data[pos:pos + msglen] pos += msglen try: buf = Buffer(packet) process_messages(buf, state) except ParseError: state.truncated = True # Continue with next packet if not state.had_disconnect: state.truncated = True return state # --------------------------------------------------------------------------- # Layout string parsing # --------------------------------------------------------------------------- _HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping', 'net', 'eff', 'fph', 'time', 'player', 'rank'} def _is_header(text): words = set(text.lower().split()) return bool(words & _HEADER_WORDS) def _parse_layout_entries(layout): """Parse layout into list of (yv_position, string_content) tuples.""" entries = [] for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout): entries.append((int(m.group(1)), m.group(2))) return entries def _parse_player_tdm(content): """ Parse a single OpenTDM player line (fixed-width format). Format: "%-15.15s %4d %3d %3d %3d" → name, frags, deaths, net, ping Returns dict or None. """ if len(content) < 20: return None name = content[:15].rstrip() if not name or _is_header(name): return None nums = re.findall(r'-?\d+', content[15:]) if len(nums) < 2: return None try: return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])} except ValueError: return None def _parse_player_ffa(content): """ Parse a single OpenFFA player line (fixed-width format). Format: "%2d %-15s %3d %3d %3d %4d %4s %4d" → rank, name, frags, deaths, eff, ... Offsets: [0:2]=rank, [3:18]=name, [19:22]=frags, [23:26]=deaths, [27:30]=eff Returns dict or None. """ if len(content) < 27: return None rank_s = content[0:2].strip() if not rank_s.isdigit(): return None name = content[3:18].rstrip() if len(content) >= 18 else content[3:].rstrip() if not name: return None nums = re.findall(r'-?\d+', content[18:]) if len(nums) < 3: return None try: return { 'rank': int(rank_s), 'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1]), 'eff': int(nums[2]), } except ValueError: return None def parse_layout_tdm(layout): """ Parse an OpenTDM scoreboard layout string. Returns (team_a_players, team_b_players, team_a_score, team_b_score). Each player: {'name', 'frags', 'deaths'}. OpenTDM layout places both teams side by side: Column A: yv 56, 64, ... (between header A at yv 48 and header B) Column B: yv 96, 104, ... (below header B at yv 88) """ entries = _parse_layout_entries(layout) # Team scores appear at small yv positions (yv 8, yv 16) # Format: "TeamName SCORE" — last token is the score team_scores = [] for yv, content in sorted(entries, key=lambda x: x[0]): if yv > 30 or _is_header(content) or not content.strip(): continue parts = content.rsplit(None, 1) if len(parts) == 2: try: score = int(parts[1]) team_scores.append(score) except ValueError: pass team_a_score = team_scores[0] if len(team_scores) > 0 else 0 team_b_score = team_scores[1] if len(team_scores) > 1 else 0 # Column header positions (string2 containing " Name " and "Frags") header_yvs = sorted( yv for yv, c in entries if 'Name' in c and 'Frags' in c ) # Team A: entries between first and second header yv # Team B: entries below second header yv split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999 first_header_yv = header_yvs[0] if header_yvs else 0 team_a_players = [] team_b_players = [] for yv, content in entries: if _is_header(content) or not content.strip(): continue p = _parse_player_tdm(content) if p is None: continue if first_header_yv < yv <= split_yv: team_a_players.append(p) elif yv > split_yv: team_b_players.append(p) return team_a_players, team_b_players, team_a_score, team_b_score def parse_layout_ffa(layout): """ Parse an OpenFFA/DM scoreboard layout string. Returns list of {'rank', 'name', 'frags', 'deaths', 'eff'}. """ entries = _parse_layout_entries(layout) players = [] for _yv, content in entries: if _is_header(content): continue p = _parse_player_ffa(content) if p: players.append(p) return players # --------------------------------------------------------------------------- # State helpers # --------------------------------------------------------------------------- def _cs_gen(state): """Return CS_GENERAL base index — old or new, detected from demo content.""" if any(state.configstrings.get(CS_GENERAL_OLD + i) for i in range(8)): return CS_GENERAL_OLD return CS_GENERAL def _cs_skins(state): """Return CS_PLAYERSKINS base index — old or new.""" if any(state.configstrings.get(CS_PLAYERSKINS_OLD + i) for i in range(8)): return CS_PLAYERSKINS_OLD return CS_PLAYERSKINS def is_opentdm(state): cs_gen = _cs_gen(state) return ( 'opentdm' in state.gamedir.lower() or state.configstrings.get(cs_gen + 0, '').strip() != '' or state.configstrings.get(cs_gen + 1, '').strip() != '' ) def get_team_names(state): cs_gen = _cs_gen(state) team_a = state.configstrings.get(cs_gen + 0, '').strip() or 'Team A' team_b = state.configstrings.get(cs_gen + 1, '').strip() or 'Team B' return team_a, team_b def get_team_scores_from_cs(state): """Return (score_a, score_b) from CS_TDM_TEAM_A/B_STATUS configstrings.""" cs_gen = _cs_gen(state) try: score_a = int(state.configstrings.get(cs_gen + 2, '').strip()) except ValueError: score_a = None try: score_b = int(state.configstrings.get(cs_gen + 3, '').strip()) except ValueError: score_b = None return score_a, score_b def get_timelimit(state): """Return timelimit in minutes, or None if not found.""" # OpenTDM uses g_match_time (seconds); fallback to timelimit (minutes) if 'g_match_time' in state.cvars: try: secs = float(state.cvars['g_match_time']) if secs > 0: return int(secs // 60) except ValueError: pass if 'timelimit' in state.cvars: try: mins = float(state.cvars['timelimit']) if mins > 0: return int(mins) except ValueError: pass return None def get_player_names(state): """Return {slot_index: name} from CS_PLAYERSKINS configstrings.""" cs_base = _cs_skins(state) players = {} for i in range(256): val = state.configstrings.get(cs_base + i, '').strip() if val: name = val.split('\\')[0].strip() if name: players[i] = name return players # --------------------------------------------------------------------------- # Markdown report generation # --------------------------------------------------------------------------- _WARN_TRUNCATED = ( "\n> ⚠ **Demo truncated** — the recording may have ended abruptly (e.g. system reset). " "Stats are from the last available scoreboard.\n" ) _WARN_NO_LAYOUT = ( "\n> ⚠ **Demo truncated** — no scoreboard found in file. " "Deaths unavailable (`N/A`). Frags from last known frame.\n" ) def _md_table(headers, rows): lines = [] lines.append('| ' + ' | '.join(headers) + ' |') lines.append('|' + '|'.join('-' * (len(h) + 2) for h in headers) + '|') for row in rows: lines.append('| ' + ' | '.join(str(c) for c in row) + ' |') return '\n'.join(lines) def _meta_line(state): """Return a metadata line with map name and timelimit.""" mapname = state.mapname or 'unknown' timelimit = get_timelimit(state) tl_str = f"{timelimit} min" if timelimit else "N/A" return f"**Map:** {mapname} | **Timelimit:** {tl_str}" def generate_report_ffa(state, players, truncated, no_layout=False): mod = state.gamedir or 'DM' mapname = state.mapname or 'unknown' lines = [f"# Demo: {mapname} — {mod}", "", _meta_line(state), ""] if no_layout: lines.append(_WARN_NO_LAYOUT) elif truncated: lines.append(_WARN_TRUNCATED) if not players: lines.append("*No player data found in demo file.*") return '\n'.join(lines) players.sort(key=lambda p: p.get('frags', 0), reverse=True) has_eff = any('eff' in p for p in players) if has_eff: headers = ['#', 'Player', 'Frags', 'Deaths', 'Eff%'] rows = [ (p.get('rank', i + 1), p['name'], p['frags'], p.get('deaths', 'N/A'), f"{p.get('eff', 'N/A')}%") for i, p in enumerate(players) ] else: headers = ['#', 'Player', 'Frags', 'Deaths'] rows = [ (i + 1, p['name'], p['frags'], p.get('deaths', 'N/A')) for i, p in enumerate(players) ] lines.append(_md_table(headers, rows)) return '\n'.join(lines) def generate_report_tdm(state, team_a_name, team_b_name, team_a_players, team_b_players, team_a_score, team_b_score, truncated): mapname = state.mapname or 'unknown' lines = [f"# Demo: {mapname} — OpenTDM", "", _meta_line(state), ""] if truncated: lines.append(_WARN_TRUNCATED) lines.append("## Team scores") lines.append("") a_deaths = sum(p['deaths'] for p in team_a_players) b_deaths = sum(p['deaths'] for p in team_b_players) lines.append(_md_table( ['Team', 'Frags', 'Deaths'], [ (team_a_name, team_a_score, a_deaths), (team_b_name, team_b_score, b_deaths), ] )) lines.append("") for team_name, team_players in [ (team_a_name, team_a_players), (team_b_name, team_b_players), ]: lines.append(f"## {team_name}") lines.append("") if not team_players: lines.append("*No data.*") else: team_players.sort(key=lambda p: p['frags'], reverse=True) lines.append(_md_table( ['Player', 'Frags', 'Deaths'], [(p['name'], p['frags'], p['deaths']) for p in team_players] )) lines.append("") return '\n'.join(lines) # --------------------------------------------------------------------------- # Report builder # --------------------------------------------------------------------------- def build_report(state): last_layout = state.layouts[-1] if state.layouts else None truncated = state.truncated no_layout = last_layout is None if is_opentdm(state): team_a_name, team_b_name = get_team_names(state) if last_layout: ta_p, tb_p, ta_score, tb_score = parse_layout_tdm(last_layout) else: ta_p, tb_p, ta_score, tb_score = [], [], 0, 0 # Prefer team scores from configstrings (more reliable than layout parsing) cs_a, cs_b = get_team_scores_from_cs(state) if cs_a is not None: ta_score = cs_a if cs_b is not None: tb_score = cs_b return generate_report_tdm( state, team_a_name, team_b_name, ta_p, tb_p, ta_score, tb_score, truncated ) else: if last_layout: players = parse_layout_ffa(last_layout) if not players: # Fallback: try TDM pattern (some mods use similar format) tdm_all = parse_layout_tdm(last_layout) players = [ {'name': p['name'], 'frags': p['frags'], 'deaths': p['deaths']} for p in tdm_all[0] + tdm_all[1] ] else: # No layout — fall back to player names from configstrings, no deaths player_names = get_player_names(state) players = [{'name': name, 'frags': 0} for name in player_names.values()] return generate_report_ffa(state, players, truncated, no_layout) # --------------------------------------------------------------------------- # File processing # --------------------------------------------------------------------------- def process_file(path, output_path=None): """Parse a single demo file and write/return the report.""" state = parse_demo(path) report = build_report(state) if output_path: with open(output_path, 'w', encoding='utf-8') as f: f.write(report) return None return report # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): ap = argparse.ArgumentParser( description='Q2Pro .dm2 demo parser — generates Markdown player stats reports.' ) ap.add_argument('inputs', nargs='+', help='.dm2 file(s) or a directory containing .dm2 files') ap.add_argument('-o', '--output', help='Output file (single input file only)') ap.add_argument('-y', '--always-continue', action='store_true', help='Always continue on errors without prompting') args = ap.parse_args() # Collect .dm2 files files = [] for inp in args.inputs: p = Path(inp) if p.is_dir(): files.extend(sorted(p.glob('*.dm2'))) elif p.suffix.lower() == '.dm2': if p.exists(): files.append(p) else: print(f"File not found: {inp}", file=sys.stderr) else: print(f"Skipping (not .dm2 or directory): {inp}", file=sys.stderr) if not files: print("No .dm2 files found.", file=sys.stderr) sys.exit(1) if len(files) > 1 and args.output: print("Error: -o/--output can only be used with a single input file.", file=sys.stderr) sys.exit(1) always_continue = args.always_continue for demo_path in files: try: if len(files) == 1 and not args.output: # Single file → stdout report = process_file(demo_path) print(report) elif len(files) == 1 and args.output: # Single file → specified output file process_file(demo_path, args.output) print(f"Report saved: {args.output}") else: # Batch mode → foo_report.md out_path = demo_path.parent / (demo_path.stem + '_report.md') process_file(demo_path, out_path) print(f"OK {demo_path.name} -> {out_path.name}") except Exception as exc: print(f"\n[ERROR] {demo_path.name}: {exc}", file=sys.stderr) if not always_continue and len(files) > 1: try: ans = input( "Continue? [y]es / [n]o / [a]lways continue: " ).strip().lower() except (EOFError, KeyboardInterrupt): print("\nAborted.", file=sys.stderr) sys.exit(1) if ans == 'n': print("Aborted.", file=sys.stderr) sys.exit(1) elif ans == 'a': always_continue = True # 'y' or anything else → continue if __name__ == '__main__': main()