#!/usr/bin/env python3 """ MVD2 demo parser — generates a CSV report from all .mvd2 files in CWD. Usage: cd /path/to/demos && python3 mvd_csv.py python3 mvd_csv.py /path/to/demos/ # specify dir python3 mvd_csv.py -o report.csv # specify output file """ import struct import re import sys import csv import argparse from pathlib import Path from datetime import datetime # --------------------------------------------------------------------------- # Protocol constants (q2pro inc/common/protocol.h, inc/shared/shared.h) # --------------------------------------------------------------------------- MVD_MAGIC = b'MVD2' MVD_NOP = 1 MVD_SERVERDATA = 4 MVD_CONFIGSTRING = 5 MVD_FRAME = 6 MVD_UNICAST = 8 MVD_UNICAST_R = 9 MVD_MULTICAST_ALL = 10 MVD_MULTICAST_ALL_R = 13 SVCMD_BITS = 5 SVCMD_MASK = 0x1F # SVC opcodes used inside unicast messages SVC_LAYOUT = 4 SVC_PRINT = 10 SVC_STUFFTEXT = 11 SVC_CONFIGSTRING = 13 PROTO_MVD_EXTLIMITS = 2011 PROTO_MVD_EXTLIMITS_2 = 2012 MVF_EXTLIMITS = 1 # Old configstring remap (default, used for version < 2011) CS_PLAYERSKINS_OLD = 1312 CS_GENERAL_OLD = 1568 MAX_CS_OLD = 2080 # Extended configstring remap (version >= 2011 with MVF_EXTLIMITS) CS_PLAYERSKINS_NEW = 12862 CS_GENERAL_NEW = 13118 MAX_CS_NEW = 13630 # OpenTDM configstring offsets from CS_GENERAL (opentdm/g_local.h) # CS_GENERAL+0 team A name # CS_GENERAL+1 team B name # CS_GENERAL+2 team A score # CS_GENERAL+3 team B score # CS_GENERAL+4 timelimit countdown string (initial value = configured limit) # CS_GENERAL+6 game status ("Warmup", "Match End", …) # CS_GENERAL+MAX_CLIENTS(256)+slot "playername (Hometeam|Visitors)" MAX_CLIENTS = 256 _HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping', 'net', 'eff', 'fph', 'time', 'player', 'rank'} # --------------------------------------------------------------------------- # Buffer reader # --------------------------------------------------------------------------- class ParseError(Exception): pass class Buffer: def __init__(self, data): self.data = bytes(data) self.pos = 0 def remaining(self): return len(self.data) - self.pos def read_byte(self): if self.pos >= len(self.data): raise ParseError("Unexpected end (byte)") v = self.data[self.pos]; self.pos += 1 return v def read_short(self): if self.pos + 2 > len(self.data): raise ParseError("Unexpected end (short)") v = struct.unpack_from(' len(self.data): raise ParseError("Unexpected end (ushort)") v = struct.unpack_from(' len(self.data): raise ParseError("Unexpected end (long)") v = struct.unpack_from(' len(self.data): raise ParseError(f"Cannot skip {n} bytes") self.pos += n # --------------------------------------------------------------------------- # Demo state # --------------------------------------------------------------------------- class MvdState: def __init__(self): self.mapname = '' self.gamedir = '' self.configstrings = {} self.configstrings_first = {} # first-seen value per index self.cvars = {} self.layouts = [] self.truncated = False self.cs_playerskins = CS_PLAYERSKINS_OLD self.cs_general = CS_GENERAL_OLD self.cs_end = MAX_CS_OLD self._in_serverdata = False # True only while processing MVD_SERVERDATA configs self._match_phase = False # True after CS_GENERAL+6 transitions to non-warmup self.match_confirmed_slots = {} # slot → raw value, set via MVD_CONFIGSTRING after match starts def set_cs(self, idx, val): if idx not in self.configstrings_first: self.configstrings_first[idx] = val self.configstrings[idx] = val if idx == 0: self.mapname = val.strip() # Detect match phase start via explicit MVD_CONFIGSTRING (not SERVERDATA) if not self._in_serverdata and idx == self.cs_general + 6: vl = ''.join(chr(c & 0x7F) for c in val.encode('latin-1')).strip().lower() if vl not in ('', 'warmup', 'countdown'): self._match_phase = True # Track spectator string updates that happen during match phase if not self._in_serverdata and self._match_phase: base = self.cs_general + MAX_CLIENTS if base <= idx < base + MAX_CLIENTS: self.match_confirmed_slots[idx - base] = val # --------------------------------------------------------------------------- # Unicast SVC scanner (layout + stufftext) # --------------------------------------------------------------------------- def _scan_unicast(buf, length, state): """ Scan unicast payload for svc_layout and svc_stufftext. length is the payload size NOT including the clientNum byte. """ client_num = buf.read_byte() # consume clientNum (1 byte, outside length) end = buf.pos + length if end > len(buf.data): end = len(buf.data) while buf.pos < end: try: cmd = buf.read_byte() except ParseError: break if cmd == SVC_LAYOUT: try: layout = buf.read_string() state.layouts.append(layout) except ParseError: break elif cmd == SVC_STUFFTEXT: try: text = buf.read_string() for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text): state.cvars[m.group(1)] = m.group(2) except ParseError: break elif cmd == SVC_PRINT: try: buf.read_byte() buf.read_string() except ParseError: break elif cmd == SVC_CONFIGSTRING: try: buf.read_ushort() buf.read_string() except ParseError: break else: break # unknown/complex SVC — bail on rest of unicast buf.pos = end # --------------------------------------------------------------------------- # MVD message parser # --------------------------------------------------------------------------- def _process_mvd_message(buf, state): while buf.remaining() > 0: try: raw = buf.read_byte() except ParseError: break extrabits = raw >> SVCMD_BITS cmd = raw & SVCMD_MASK if cmd == MVD_NOP: continue elif cmd == MVD_SERVERDATA: try: protocol = buf.read_long() if protocol != 37: return version = buf.read_ushort() if version >= PROTO_MVD_EXTLIMITS_2: flags = buf.read_ushort() else: flags = extrabits use_new = (version >= PROTO_MVD_EXTLIMITS and bool(flags & MVF_EXTLIMITS)) if use_new: state.cs_playerskins = CS_PLAYERSKINS_NEW state.cs_general = CS_GENERAL_NEW state.cs_end = MAX_CS_NEW else: state.cs_playerskins = CS_PLAYERSKINS_OLD state.cs_general = CS_GENERAL_OLD state.cs_end = MAX_CS_OLD buf.read_long() # servercount state.gamedir = buf.read_string() buf.read_short() # clientNum state._in_serverdata = True while True: idx = buf.read_ushort() if idx == state.cs_end: break val = buf.read_string() state.set_cs(idx, val) state._in_serverdata = False except ParseError: state._in_serverdata = False state.truncated = True return # entity baselines follow — bail on rest of packet elif cmd == MVD_CONFIGSTRING: try: idx = buf.read_ushort() val = buf.read_string() state.set_cs(idx, val) except ParseError: state.truncated = True return elif cmd in (MVD_UNICAST, MVD_UNICAST_R): try: length = buf.read_byte() | (extrabits << 8) _scan_unicast(buf, length, state) except ParseError: state.truncated = True return elif MVD_MULTICAST_ALL <= cmd <= MVD_MULTICAST_ALL_R + 2: # multicast: length byte, optional leafnum, data payload try: length = buf.read_byte() | (extrabits << 8) to = cmd - MVD_MULTICAST_ALL if to >= 3: to -= 3 if to != 0: buf.read_ushort() # leafnum buf.skip(length) except ParseError: state.truncated = True return elif cmd == 17: # mvd_print try: buf.read_byte() buf.read_string() except ParseError: state.truncated = True return else: return # mvd_frame (6) or mvd_sound (16) — bail # --------------------------------------------------------------------------- # File parser # --------------------------------------------------------------------------- def parse_mvd2(path): state = MvdState() with open(path, 'rb') as f: data = f.read() if len(data) < 4 or data[:4] != MVD_MAGIC: raise ParseError(f"Not an MVD2 file: {path}") pos = 4 had_eof = False while pos < len(data): if pos + 2 > len(data): state.truncated = True break msglen = struct.unpack_from(' len(data): state.truncated = True msglen = len(data) - pos packet = data[pos:pos + msglen] pos += msglen try: _process_mvd_message(Buffer(packet), state) except ParseError: state.truncated = True if not had_eof: state.truncated = True return state # --------------------------------------------------------------------------- # Layout parsing (fallback, from dem_parser.py) # --------------------------------------------------------------------------- def _is_header(text): return bool(set(text.lower().split()) & _HEADER_WORDS) def _parse_layout_entries(layout): return [(int(m.group(1)), m.group(2)) for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout)] def _parse_player_tdm(content): if len(content) < 20: return None name = content[:15].rstrip() if not name or _is_header(name): return None nums = re.findall(r'-?\d+', content[15:]) if len(nums) < 2: return None try: return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])} except ValueError: return None def _best_scoreboard_layout(layouts): """Return the last layout that has a proper scoreboard header.""" for layout in reversed(layouts): entries = _parse_layout_entries(layout) if any('Name' in c and 'Frags' in c for _, c in entries): return layout return None def parse_layout_tdm(layout): entries = _parse_layout_entries(layout) team_scores = [] for yv, content in sorted(entries, key=lambda x: x[0]): if yv > 30 or _is_header(content) or not content.strip(): continue parts = content.rsplit(None, 1) if len(parts) == 2: try: team_scores.append(int(parts[1])) except ValueError: pass ta_score = team_scores[0] if len(team_scores) > 0 else 0 tb_score = team_scores[1] if len(team_scores) > 1 else 0 header_yvs = sorted(yv for yv, c in entries if 'Name' in c and 'Frags' in c) split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999 first_header_yv = header_yvs[0] if header_yvs else 0 ta_players, tb_players = [], [] for yv, content in entries: if _is_header(content) or not content.strip(): continue p = _parse_player_tdm(content) if p is None: continue if first_header_yv < yv <= split_yv: ta_players.append(p) elif yv > split_yv: tb_players.append(p) return ta_players, tb_players, ta_score, tb_score # --------------------------------------------------------------------------- # State helpers # --------------------------------------------------------------------------- def _strip_q2color(s): """Remove Quake II high-bit color encoding and strip whitespace.""" return ''.join(chr(c & 0x7F) for c in s.encode('latin-1')).strip() def get_team_names(state): gen = state.cs_general a = _strip_q2color(state.configstrings.get(gen, '')) or 'Team A' b = _strip_q2color(state.configstrings.get(gen + 1, '')) or 'Team B' return a, b def get_team_scores_from_cs(state): gen = state.cs_general try: sa = int(_strip_q2color(state.configstrings.get(gen + 2, ''))) except ValueError: sa = None try: sb = int(_strip_q2color(state.configstrings.get(gen + 3, ''))) except ValueError: sb = None return sa, sb def get_timelimit(state): """ Extract configured timelimit (minutes) from the initial value of CS_TDM_TIMELIMIT_STRING (CS_GENERAL+4), which OpenTDM sets to 'MM:SS' at server startup before the countdown begins. """ gen = state.cs_general raw = state.configstrings_first.get(gen + 4, '') decoded = _strip_q2color(raw) m = re.match(r'^(\d+):(\d{2})$', decoded) if m: mins = int(m.group(1)) secs = int(m.group(2)) # round up to nearest minute return mins + (1 if secs > 0 else 0) return None def get_players_from_spectator_strings(state): """ Return (team_a_names, team_b_names) using CS_GENERAL+256+slot entries. Format: "playername ()" where may be the actual team name from CS_GENERAL+0/1 OR generic "Hometeam"/"Visitors". Filters out [MVDSPEC], empty names, and duplicates. """ gen = state.cs_general base = gen + MAX_CLIENTS # CS_TDM_SPECTATOR_STRINGS name_a = _strip_q2color(state.configstrings.get(gen, '')).lower() name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower() def _is_team_a(label): l = label.lower() return l == 'hometeam' or (name_a and l == name_a) def _is_team_b(label): l = label.lower() return l == 'visitors' or (name_b and l == name_b) team_a, team_b = [], [] seen = set() for i in range(MAX_CLIENTS): val = _strip_q2color(state.configstrings.get(base + i, '')) if not val: continue m = re.match(r'^(.+?)\s+\((.+)\)$', val) if not m: continue name = m.group(1).strip() label = m.group(2).strip() if not name or '[MVDSPEC]' in name or name in seen: continue seen.add(name) if _is_team_a(label): team_a.append(name) elif _is_team_b(label): team_b.append(name) # unknown label → skip (spectator or unrecognised) return team_a, team_b def get_players_from_skins(state, team_a_name, team_b_name): """ Last-resort fallback: assign players from CS_PLAYERSKINS by matching player name against team names. Works well for 1v1 where team name = player. """ base = state.cs_playerskins team_a, team_b, unknown = [], [], [] seen = set() for i in range(MAX_CLIENTS): raw = state.configstrings.get(base + i, '').strip() if not raw: continue name = raw.split('\\')[0].strip() if not name or '[MVDSPEC]' in name or name in seen: continue seen.add(name) if name.lower() == team_a_name.lower(): team_a.append(name) elif name.lower() == team_b_name.lower(): team_b.append(name) else: unknown.append(name) # distribute unmatched players evenly (best effort) if unknown and (not team_a or not team_b): half = len(unknown) // 2 or len(unknown) if not team_a: team_a, unknown = unknown[:half], unknown[half:] if not team_b: team_b, unknown = unknown[:len(unknown)], [] return team_a, team_b def get_players_from_match_phase(state): """ Return (team_a, team_b) using spectator string entries that were explicitly re-sent via MVD_CONFIGSTRING after the game status transitioned from warmup to a match state. These are the confirmed match participants. """ gen = state.cs_general name_a = _strip_q2color(state.configstrings.get(gen, '')).lower() name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower() def _is_team_a(label): l = label.lower() return l == 'hometeam' or (name_a and l == name_a) def _is_team_b(label): l = label.lower() return l == 'visitors' or (name_b and l == name_b) team_a, team_b = [], [] seen = set() for slot, val in state.match_confirmed_slots.items(): stripped = _strip_q2color(val) m = re.match(r'^(.+?)\s+\((.+)\)$', stripped) if not m: continue name = m.group(1).strip() label = m.group(2).strip() if not name or '[MVDSPEC]' in name or name in seen: continue seen.add(name) if _is_team_a(label): team_a.append(name) elif _is_team_b(label): team_b.append(name) return team_a, team_b def _date_from_filename(name): m = re.search(r'(\d{8})-(\d{6})', name) if m: try: dt = datetime.strptime(m.group(1) + m.group(2), '%Y%m%d%H%M%S') return dt.strftime('%Y-%m-%d %H:%M:%S') except ValueError: pass return '' # --------------------------------------------------------------------------- # Row builder # --------------------------------------------------------------------------- def build_row(path, state): filename = path.name date = _date_from_filename(filename) # CS_MODELS_OLD+1=33 or CS_MODELS_NEW+1=63 holds "maps/q2dm1.bsp" cs_bsp_idx = 33 if state.cs_playerskins == CS_PLAYERSKINS_OLD else 63 bsp = state.configstrings.get(cs_bsp_idx, '').strip() mapname = Path(bsp).stem if bsp else (state.mapname or path.stem.split('_')[-1]) team_a_name, team_b_name = get_team_names(state) cs_score_a, cs_score_b = get_team_scores_from_cs(state) # Primary: spectator strings explicitly re-confirmed after match start. # OpenTDM re-sends entries for actual match participants when game status # transitions from Warmup/Countdown to Match. ta_names, tb_names = get_players_from_match_phase(state) # Pre-compute best layout (used as fallback at multiple levels). layout_ta, layout_tb, layout_sa, layout_sb = [], [], None, None best_layout = _best_scoreboard_layout(state.layouts) if best_layout: ta_p, tb_p, layout_sa, layout_sb = parse_layout_tdm(best_layout) layout_ta = [p['name'] for p in ta_p] layout_tb = [p['name'] for p in tb_p] # If match-phase gave nothing, prefer layout over all spectator strings. # (OpenTDM may transition status without re-sending spectator confirmations.) if not ta_names and not tb_names: if layout_ta or layout_tb: ta_names, tb_names = layout_ta, layout_tb if cs_score_a is None: cs_score_a = layout_sa if cs_score_b is None: cs_score_b = layout_sb else: ta_names, tb_names = get_players_from_spectator_strings(state) # Fill any single missing team from layout. if not ta_names and layout_ta: ta_names = layout_ta if cs_score_a is None: cs_score_a = layout_sa if not tb_names and layout_tb: tb_names = layout_tb if cs_score_b is None: cs_score_b = layout_sb # If one team is still missing (no layout either), try spectator strings. if not ta_names or not tb_names: spec_a, spec_b = get_players_from_spectator_strings(state) if not ta_names: ta_names = spec_a if not tb_names: tb_names = spec_b # Last resort: player skin configstrings + team name matching. if not ta_names and not tb_names: ta_names, tb_names = get_players_from_skins(state, team_a_name, team_b_name) players_a = ','.join(ta_names) players_b = ','.join(tb_names) score_a = cs_score_a if cs_score_a is not None else 0 score_b = cs_score_b if cs_score_b is not None else 0 timelimit = get_timelimit(state) timelimit_str = str(timelimit) if timelimit is not None else '' return { 'file': filename, 'date': date, 'map': mapname, 'players_a': players_a, 'players_b': players_b, 'score': f'{score_a}:{score_b}', 'timelimit_min': timelimit_str, 'truncated': 'yes' if state.truncated else 'no', } # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- FIELDNAMES = [ 'file', 'date', 'map', 'players_a', 'score', 'players_b', 'timelimit_min', 'truncated', ] def main(): ap = argparse.ArgumentParser( description='Parse all .mvd2 demos in a directory and output a CSV report.' ) ap.add_argument('directory', nargs='?', default='.', help='Directory containing .mvd2 files (default: current dir)') ap.add_argument('-o', '--output', default='', help='Output CSV file (default: mvd_report.csv in the demo dir)') args = ap.parse_args() demo_dir = Path(args.directory).resolve() if not demo_dir.is_dir(): print(f"Error: not a directory: {demo_dir}", file=sys.stderr) sys.exit(1) files = sorted(demo_dir.glob('*.mvd2')) if not files: print(f"No .mvd2 files found in {demo_dir}", file=sys.stderr) sys.exit(1) out_path = (Path(args.output).resolve() if args.output else demo_dir / 'mvd_report.csv') rows = [] for demo_path in files: try: state = parse_mvd2(demo_path) row = build_row(demo_path, state) rows.append(row) trunc = ' [TRUNCATED]' if state.truncated else '' print(f"OK {demo_path.name}{trunc}") except Exception as exc: print(f"ERR {demo_path.name}: {exc}", file=sys.stderr) rows.append({ 'file': demo_path.name, 'date': _date_from_filename(demo_path.name), 'map': '', 'players_a': '', 'players_b': '', 'score': '', 'timelimit_min': '', 'truncated': 'error', }) with open(out_path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=FIELDNAMES, delimiter=';') writer.writeheader() writer.writerows(rows) print(f"\nReport saved: {out_path} ({len(rows)} demos)") if __name__ == '__main__': main()