diff --git a/scripts/mvd_csv.py b/scripts/mvd_csv.py new file mode 100644 index 0000000..18f7830 --- /dev/null +++ b/scripts/mvd_csv.py @@ -0,0 +1,747 @@ +#!/usr/bin/env python3 +""" +MVD2 demo parser — generates a CSV report from all .mvd2 files in CWD. + +Usage: + cd /path/to/demos && python3 mvd_csv.py + python3 mvd_csv.py /path/to/demos/ # specify dir + python3 mvd_csv.py -o report.csv # specify output file +""" + +import struct +import re +import sys +import csv +import argparse +from pathlib import Path +from datetime import datetime + +# --------------------------------------------------------------------------- +# Protocol constants (q2pro inc/common/protocol.h, inc/shared/shared.h) +# --------------------------------------------------------------------------- + +MVD_MAGIC = b'MVD2' + +MVD_NOP = 1 +MVD_SERVERDATA = 4 +MVD_CONFIGSTRING = 5 +MVD_FRAME = 6 +MVD_UNICAST = 8 +MVD_UNICAST_R = 9 +MVD_MULTICAST_ALL = 10 +MVD_MULTICAST_ALL_R = 13 + +SVCMD_BITS = 5 +SVCMD_MASK = 0x1F + +# SVC opcodes used inside unicast messages +SVC_LAYOUT = 4 +SVC_PRINT = 10 +SVC_STUFFTEXT = 11 +SVC_CONFIGSTRING = 13 + +PROTO_MVD_EXTLIMITS = 2011 +PROTO_MVD_EXTLIMITS_2 = 2012 +MVF_EXTLIMITS = 1 + +# Old configstring remap (default, used for version < 2011) +CS_PLAYERSKINS_OLD = 1312 +CS_GENERAL_OLD = 1568 +MAX_CS_OLD = 2080 + +# Extended configstring remap (version >= 2011 with MVF_EXTLIMITS) +CS_PLAYERSKINS_NEW = 12862 +CS_GENERAL_NEW = 13118 +MAX_CS_NEW = 13630 + +# OpenTDM configstring offsets from CS_GENERAL (opentdm/g_local.h) +# CS_GENERAL+0 team A name +# CS_GENERAL+1 team B name +# CS_GENERAL+2 team A score +# CS_GENERAL+3 team B score +# CS_GENERAL+4 timelimit countdown string (initial value = configured limit) +# CS_GENERAL+6 game status ("Warmup", "Match End", …) +# CS_GENERAL+MAX_CLIENTS(256)+slot "playername (Hometeam|Visitors)" + +MAX_CLIENTS = 256 + +_HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping', + 'net', 'eff', 'fph', 'time', 'player', 'rank'} + + +# --------------------------------------------------------------------------- +# Buffer reader +# --------------------------------------------------------------------------- + +class ParseError(Exception): + pass + + +class Buffer: + def __init__(self, data): + self.data = bytes(data) + self.pos = 0 + + def remaining(self): + return len(self.data) - self.pos + + def read_byte(self): + if self.pos >= len(self.data): + raise ParseError("Unexpected end (byte)") + v = self.data[self.pos]; self.pos += 1 + return v + + def read_short(self): + if self.pos + 2 > len(self.data): + raise ParseError("Unexpected end (short)") + v = struct.unpack_from(' len(self.data): + raise ParseError("Unexpected end (ushort)") + v = struct.unpack_from(' len(self.data): + raise ParseError("Unexpected end (long)") + v = struct.unpack_from(' len(self.data): + raise ParseError(f"Cannot skip {n} bytes") + self.pos += n + + +# --------------------------------------------------------------------------- +# Demo state +# --------------------------------------------------------------------------- + +class MvdState: + def __init__(self): + self.mapname = '' + self.gamedir = '' + self.configstrings = {} + self.configstrings_first = {} # first-seen value per index + self.cvars = {} + self.layouts = [] + self.truncated = False + self.cs_playerskins = CS_PLAYERSKINS_OLD + self.cs_general = CS_GENERAL_OLD + self.cs_end = MAX_CS_OLD + self._in_serverdata = False # True only while processing MVD_SERVERDATA configs + self._match_phase = False # True after CS_GENERAL+6 transitions to non-warmup + self.match_confirmed_slots = {} # slot → raw value, set via MVD_CONFIGSTRING after match starts + + def set_cs(self, idx, val): + if idx not in self.configstrings_first: + self.configstrings_first[idx] = val + self.configstrings[idx] = val + if idx == 0: + self.mapname = val.strip() + + # Detect match phase start via explicit MVD_CONFIGSTRING (not SERVERDATA) + if not self._in_serverdata and idx == self.cs_general + 6: + vl = ''.join(chr(c & 0x7F) for c in val.encode('latin-1')).strip().lower() + if vl not in ('', 'warmup', 'countdown'): + self._match_phase = True + + # Track spectator string updates that happen during match phase + if not self._in_serverdata and self._match_phase: + base = self.cs_general + MAX_CLIENTS + if base <= idx < base + MAX_CLIENTS: + self.match_confirmed_slots[idx - base] = val + + +# --------------------------------------------------------------------------- +# Unicast SVC scanner (layout + stufftext) +# --------------------------------------------------------------------------- + +def _scan_unicast(buf, length, state): + """ + Scan unicast payload for svc_layout and svc_stufftext. + length is the payload size NOT including the clientNum byte. + """ + client_num = buf.read_byte() # consume clientNum (1 byte, outside length) + end = buf.pos + length + if end > len(buf.data): + end = len(buf.data) + + while buf.pos < end: + try: + cmd = buf.read_byte() + except ParseError: + break + + if cmd == SVC_LAYOUT: + try: + layout = buf.read_string() + state.layouts.append(layout) + except ParseError: + break + + elif cmd == SVC_STUFFTEXT: + try: + text = buf.read_string() + for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text): + state.cvars[m.group(1)] = m.group(2) + except ParseError: + break + + elif cmd == SVC_PRINT: + try: + buf.read_byte() + buf.read_string() + except ParseError: + break + + elif cmd == SVC_CONFIGSTRING: + try: + buf.read_ushort() + buf.read_string() + except ParseError: + break + + else: + break # unknown/complex SVC — bail on rest of unicast + + buf.pos = end + + +# --------------------------------------------------------------------------- +# MVD message parser +# --------------------------------------------------------------------------- + +def _process_mvd_message(buf, state): + while buf.remaining() > 0: + try: + raw = buf.read_byte() + except ParseError: + break + + extrabits = raw >> SVCMD_BITS + cmd = raw & SVCMD_MASK + + if cmd == MVD_NOP: + continue + + elif cmd == MVD_SERVERDATA: + try: + protocol = buf.read_long() + if protocol != 37: + return + version = buf.read_ushort() + if version >= PROTO_MVD_EXTLIMITS_2: + flags = buf.read_ushort() + else: + flags = extrabits + use_new = (version >= PROTO_MVD_EXTLIMITS and bool(flags & MVF_EXTLIMITS)) + if use_new: + state.cs_playerskins = CS_PLAYERSKINS_NEW + state.cs_general = CS_GENERAL_NEW + state.cs_end = MAX_CS_NEW + else: + state.cs_playerskins = CS_PLAYERSKINS_OLD + state.cs_general = CS_GENERAL_OLD + state.cs_end = MAX_CS_OLD + + buf.read_long() # servercount + state.gamedir = buf.read_string() + buf.read_short() # clientNum + + state._in_serverdata = True + while True: + idx = buf.read_ushort() + if idx == state.cs_end: + break + val = buf.read_string() + state.set_cs(idx, val) + state._in_serverdata = False + except ParseError: + state._in_serverdata = False + state.truncated = True + return # entity baselines follow — bail on rest of packet + + elif cmd == MVD_CONFIGSTRING: + try: + idx = buf.read_ushort() + val = buf.read_string() + state.set_cs(idx, val) + except ParseError: + state.truncated = True + return + + elif cmd in (MVD_UNICAST, MVD_UNICAST_R): + try: + length = buf.read_byte() | (extrabits << 8) + _scan_unicast(buf, length, state) + except ParseError: + state.truncated = True + return + + elif MVD_MULTICAST_ALL <= cmd <= MVD_MULTICAST_ALL_R + 2: + # multicast: length byte, optional leafnum, data payload + try: + length = buf.read_byte() | (extrabits << 8) + to = cmd - MVD_MULTICAST_ALL + if to >= 3: + to -= 3 + if to != 0: + buf.read_ushort() # leafnum + buf.skip(length) + except ParseError: + state.truncated = True + return + + elif cmd == 17: # mvd_print + try: + buf.read_byte() + buf.read_string() + except ParseError: + state.truncated = True + return + + else: + return # mvd_frame (6) or mvd_sound (16) — bail + + +# --------------------------------------------------------------------------- +# File parser +# --------------------------------------------------------------------------- + +def parse_mvd2(path): + state = MvdState() + + with open(path, 'rb') as f: + data = f.read() + + if len(data) < 4 or data[:4] != MVD_MAGIC: + raise ParseError(f"Not an MVD2 file: {path}") + + pos = 4 + had_eof = False + + while pos < len(data): + if pos + 2 > len(data): + state.truncated = True + break + msglen = struct.unpack_from(' len(data): + state.truncated = True + msglen = len(data) - pos + packet = data[pos:pos + msglen] + pos += msglen + try: + _process_mvd_message(Buffer(packet), state) + except ParseError: + state.truncated = True + + if not had_eof: + state.truncated = True + + return state + + +# --------------------------------------------------------------------------- +# Layout parsing (fallback, from dem_parser.py) +# --------------------------------------------------------------------------- + +def _is_header(text): + return bool(set(text.lower().split()) & _HEADER_WORDS) + + +def _parse_layout_entries(layout): + return [(int(m.group(1)), m.group(2)) + for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout)] + + +def _parse_player_tdm(content): + if len(content) < 20: + return None + name = content[:15].rstrip() + if not name or _is_header(name): + return None + nums = re.findall(r'-?\d+', content[15:]) + if len(nums) < 2: + return None + try: + return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])} + except ValueError: + return None + + +def _best_scoreboard_layout(layouts): + """Return the last layout that has a proper scoreboard header.""" + for layout in reversed(layouts): + entries = _parse_layout_entries(layout) + if any('Name' in c and 'Frags' in c for _, c in entries): + return layout + return None + + +def parse_layout_tdm(layout): + entries = _parse_layout_entries(layout) + + team_scores = [] + for yv, content in sorted(entries, key=lambda x: x[0]): + if yv > 30 or _is_header(content) or not content.strip(): + continue + parts = content.rsplit(None, 1) + if len(parts) == 2: + try: + team_scores.append(int(parts[1])) + except ValueError: + pass + + ta_score = team_scores[0] if len(team_scores) > 0 else 0 + tb_score = team_scores[1] if len(team_scores) > 1 else 0 + + header_yvs = sorted(yv for yv, c in entries if 'Name' in c and 'Frags' in c) + split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999 + first_header_yv = header_yvs[0] if header_yvs else 0 + + ta_players, tb_players = [], [] + for yv, content in entries: + if _is_header(content) or not content.strip(): + continue + p = _parse_player_tdm(content) + if p is None: + continue + if first_header_yv < yv <= split_yv: + ta_players.append(p) + elif yv > split_yv: + tb_players.append(p) + + return ta_players, tb_players, ta_score, tb_score + + +# --------------------------------------------------------------------------- +# State helpers +# --------------------------------------------------------------------------- + +def _strip_q2color(s): + """Remove Quake II high-bit color encoding and strip whitespace.""" + return ''.join(chr(c & 0x7F) for c in s.encode('latin-1')).strip() + + +def get_team_names(state): + gen = state.cs_general + a = _strip_q2color(state.configstrings.get(gen, '')) or 'Team A' + b = _strip_q2color(state.configstrings.get(gen + 1, '')) or 'Team B' + return a, b + + +def get_team_scores_from_cs(state): + gen = state.cs_general + try: + sa = int(_strip_q2color(state.configstrings.get(gen + 2, ''))) + except ValueError: + sa = None + try: + sb = int(_strip_q2color(state.configstrings.get(gen + 3, ''))) + except ValueError: + sb = None + return sa, sb + + +def get_timelimit(state): + """ + Extract configured timelimit (minutes) from the initial value of + CS_TDM_TIMELIMIT_STRING (CS_GENERAL+4), which OpenTDM sets to 'MM:SS' + at server startup before the countdown begins. + """ + gen = state.cs_general + raw = state.configstrings_first.get(gen + 4, '') + decoded = _strip_q2color(raw) + m = re.match(r'^(\d+):(\d{2})$', decoded) + if m: + mins = int(m.group(1)) + secs = int(m.group(2)) + # round up to nearest minute + return mins + (1 if secs > 0 else 0) + return None + + +def get_players_from_spectator_strings(state): + """ + Return (team_a_names, team_b_names) using CS_GENERAL+256+slot entries. + Format: "playername ()" where may be the actual + team name from CS_GENERAL+0/1 OR generic "Hometeam"/"Visitors". + Filters out [MVDSPEC], empty names, and duplicates. + """ + gen = state.cs_general + base = gen + MAX_CLIENTS # CS_TDM_SPECTATOR_STRINGS + + name_a = _strip_q2color(state.configstrings.get(gen, '')).lower() + name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower() + + def _is_team_a(label): + l = label.lower() + return l == 'hometeam' or (name_a and l == name_a) + + def _is_team_b(label): + l = label.lower() + return l == 'visitors' or (name_b and l == name_b) + + team_a, team_b = [], [] + seen = set() + + for i in range(MAX_CLIENTS): + val = _strip_q2color(state.configstrings.get(base + i, '')) + if not val: + continue + m = re.match(r'^(.+?)\s+\((.+)\)$', val) + if not m: + continue + name = m.group(1).strip() + label = m.group(2).strip() + if not name or '[MVDSPEC]' in name or name in seen: + continue + seen.add(name) + if _is_team_a(label): + team_a.append(name) + elif _is_team_b(label): + team_b.append(name) + # unknown label → skip (spectator or unrecognised) + + return team_a, team_b + + +def get_players_from_skins(state, team_a_name, team_b_name): + """ + Last-resort fallback: assign players from CS_PLAYERSKINS by matching + player name against team names. Works well for 1v1 where team name = player. + """ + base = state.cs_playerskins + team_a, team_b, unknown = [], [], [] + seen = set() + + for i in range(MAX_CLIENTS): + raw = state.configstrings.get(base + i, '').strip() + if not raw: + continue + name = raw.split('\\')[0].strip() + if not name or '[MVDSPEC]' in name or name in seen: + continue + seen.add(name) + if name.lower() == team_a_name.lower(): + team_a.append(name) + elif name.lower() == team_b_name.lower(): + team_b.append(name) + else: + unknown.append(name) + + # distribute unmatched players evenly (best effort) + if unknown and (not team_a or not team_b): + half = len(unknown) // 2 or len(unknown) + if not team_a: + team_a, unknown = unknown[:half], unknown[half:] + if not team_b: + team_b, unknown = unknown[:len(unknown)], [] + + return team_a, team_b + + +def get_players_from_match_phase(state): + """ + Return (team_a, team_b) using spectator string entries that were explicitly + re-sent via MVD_CONFIGSTRING after the game status transitioned from warmup + to a match state. These are the confirmed match participants. + """ + gen = state.cs_general + name_a = _strip_q2color(state.configstrings.get(gen, '')).lower() + name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower() + + def _is_team_a(label): + l = label.lower() + return l == 'hometeam' or (name_a and l == name_a) + + def _is_team_b(label): + l = label.lower() + return l == 'visitors' or (name_b and l == name_b) + + team_a, team_b = [], [] + seen = set() + + for slot, val in state.match_confirmed_slots.items(): + stripped = _strip_q2color(val) + m = re.match(r'^(.+?)\s+\((.+)\)$', stripped) + if not m: + continue + name = m.group(1).strip() + label = m.group(2).strip() + if not name or '[MVDSPEC]' in name or name in seen: + continue + seen.add(name) + if _is_team_a(label): + team_a.append(name) + elif _is_team_b(label): + team_b.append(name) + + return team_a, team_b + + +def _date_from_filename(name): + m = re.search(r'(\d{8})-(\d{6})', name) + if m: + try: + dt = datetime.strptime(m.group(1) + m.group(2), '%Y%m%d%H%M%S') + return dt.strftime('%Y-%m-%d %H:%M:%S') + except ValueError: + pass + return '' + + +# --------------------------------------------------------------------------- +# Row builder +# --------------------------------------------------------------------------- + +def build_row(path, state): + filename = path.name + date = _date_from_filename(filename) + # CS_MODELS_OLD+1=33 or CS_MODELS_NEW+1=63 holds "maps/q2dm1.bsp" + cs_bsp_idx = 33 if state.cs_playerskins == CS_PLAYERSKINS_OLD else 63 + bsp = state.configstrings.get(cs_bsp_idx, '').strip() + mapname = Path(bsp).stem if bsp else (state.mapname or path.stem.split('_')[-1]) + + team_a_name, team_b_name = get_team_names(state) + cs_score_a, cs_score_b = get_team_scores_from_cs(state) + + # Primary: spectator strings explicitly re-confirmed after match start. + # OpenTDM re-sends entries for actual match participants when game status + # transitions from Warmup/Countdown to Match. + ta_names, tb_names = get_players_from_match_phase(state) + + # Pre-compute best layout (used as fallback at multiple levels). + layout_ta, layout_tb, layout_sa, layout_sb = [], [], None, None + best_layout = _best_scoreboard_layout(state.layouts) + if best_layout: + ta_p, tb_p, layout_sa, layout_sb = parse_layout_tdm(best_layout) + layout_ta = [p['name'] for p in ta_p] + layout_tb = [p['name'] for p in tb_p] + + # If match-phase gave nothing, prefer layout over all spectator strings. + # (OpenTDM may transition status without re-sending spectator confirmations.) + if not ta_names and not tb_names: + if layout_ta or layout_tb: + ta_names, tb_names = layout_ta, layout_tb + if cs_score_a is None: cs_score_a = layout_sa + if cs_score_b is None: cs_score_b = layout_sb + else: + ta_names, tb_names = get_players_from_spectator_strings(state) + + # Fill any single missing team from layout. + if not ta_names and layout_ta: + ta_names = layout_ta + if cs_score_a is None: cs_score_a = layout_sa + if not tb_names and layout_tb: + tb_names = layout_tb + if cs_score_b is None: cs_score_b = layout_sb + + # If one team is still missing (no layout either), try spectator strings. + if not ta_names or not tb_names: + spec_a, spec_b = get_players_from_spectator_strings(state) + if not ta_names: ta_names = spec_a + if not tb_names: tb_names = spec_b + + # Last resort: player skin configstrings + team name matching. + if not ta_names and not tb_names: + ta_names, tb_names = get_players_from_skins(state, team_a_name, team_b_name) + + players_a = ','.join(ta_names) + players_b = ','.join(tb_names) + + score_a = cs_score_a if cs_score_a is not None else 0 + score_b = cs_score_b if cs_score_b is not None else 0 + + timelimit = get_timelimit(state) + timelimit_str = str(timelimit) if timelimit is not None else '' + + return { + 'file': filename, + 'date': date, + 'map': mapname, + 'players_a': players_a, + 'players_b': players_b, + 'score': f'{score_a}:{score_b}', + 'timelimit_min': timelimit_str, + 'truncated': 'yes' if state.truncated else 'no', + } + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +FIELDNAMES = [ + 'file', 'date', 'map', + 'players_a', 'score', 'players_b', + 'timelimit_min', 'truncated', +] + + +def main(): + ap = argparse.ArgumentParser( + description='Parse all .mvd2 demos in a directory and output a CSV report.' + ) + ap.add_argument('directory', nargs='?', default='.', + help='Directory containing .mvd2 files (default: current dir)') + ap.add_argument('-o', '--output', default='', + help='Output CSV file (default: mvd_report.csv in the demo dir)') + args = ap.parse_args() + + demo_dir = Path(args.directory).resolve() + if not demo_dir.is_dir(): + print(f"Error: not a directory: {demo_dir}", file=sys.stderr) + sys.exit(1) + + files = sorted(demo_dir.glob('*.mvd2')) + if not files: + print(f"No .mvd2 files found in {demo_dir}", file=sys.stderr) + sys.exit(1) + + out_path = (Path(args.output).resolve() if args.output + else demo_dir / 'mvd_report.csv') + + rows = [] + for demo_path in files: + try: + state = parse_mvd2(demo_path) + row = build_row(demo_path, state) + rows.append(row) + trunc = ' [TRUNCATED]' if state.truncated else '' + print(f"OK {demo_path.name}{trunc}") + except Exception as exc: + print(f"ERR {demo_path.name}: {exc}", file=sys.stderr) + rows.append({ + 'file': demo_path.name, + 'date': _date_from_filename(demo_path.name), + 'map': '', 'players_a': '', 'players_b': '', + 'score': '', 'timelimit_min': '', 'truncated': 'error', + }) + + with open(out_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=FIELDNAMES, delimiter=';') + writer.writeheader() + writer.writerows(rows) + + print(f"\nReport saved: {out_path} ({len(rows)} demos)") + + +if __name__ == '__main__': + main()