You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
747 lines
24 KiB
747 lines
24 KiB
#!/usr/bin/env python3
|
|
"""
|
|
MVD2 demo parser — generates a CSV report from all .mvd2 files in CWD.
|
|
|
|
Usage:
|
|
cd /path/to/demos && python3 mvd_csv.py
|
|
python3 mvd_csv.py /path/to/demos/ # specify dir
|
|
python3 mvd_csv.py -o report.csv # specify output file
|
|
"""
|
|
|
|
import struct
|
|
import re
|
|
import sys
|
|
import csv
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Protocol constants (q2pro inc/common/protocol.h, inc/shared/shared.h)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MVD_MAGIC = b'MVD2'
|
|
|
|
MVD_NOP = 1
|
|
MVD_SERVERDATA = 4
|
|
MVD_CONFIGSTRING = 5
|
|
MVD_FRAME = 6
|
|
MVD_UNICAST = 8
|
|
MVD_UNICAST_R = 9
|
|
MVD_MULTICAST_ALL = 10
|
|
MVD_MULTICAST_ALL_R = 13
|
|
|
|
SVCMD_BITS = 5
|
|
SVCMD_MASK = 0x1F
|
|
|
|
# SVC opcodes used inside unicast messages
|
|
SVC_LAYOUT = 4
|
|
SVC_PRINT = 10
|
|
SVC_STUFFTEXT = 11
|
|
SVC_CONFIGSTRING = 13
|
|
|
|
PROTO_MVD_EXTLIMITS = 2011
|
|
PROTO_MVD_EXTLIMITS_2 = 2012
|
|
MVF_EXTLIMITS = 1
|
|
|
|
# Old configstring remap (default, used for version < 2011)
|
|
CS_PLAYERSKINS_OLD = 1312
|
|
CS_GENERAL_OLD = 1568
|
|
MAX_CS_OLD = 2080
|
|
|
|
# Extended configstring remap (version >= 2011 with MVF_EXTLIMITS)
|
|
CS_PLAYERSKINS_NEW = 12862
|
|
CS_GENERAL_NEW = 13118
|
|
MAX_CS_NEW = 13630
|
|
|
|
# OpenTDM configstring offsets from CS_GENERAL (opentdm/g_local.h)
|
|
# CS_GENERAL+0 team A name
|
|
# CS_GENERAL+1 team B name
|
|
# CS_GENERAL+2 team A score
|
|
# CS_GENERAL+3 team B score
|
|
# CS_GENERAL+4 timelimit countdown string (initial value = configured limit)
|
|
# CS_GENERAL+6 game status ("Warmup", "Match End", …)
|
|
# CS_GENERAL+MAX_CLIENTS(256)+slot "playername (Hometeam|Visitors)"
|
|
|
|
MAX_CLIENTS = 256
|
|
|
|
_HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping',
|
|
'net', 'eff', 'fph', 'time', 'player', 'rank'}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Buffer reader
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ParseError(Exception):
|
|
pass
|
|
|
|
|
|
class Buffer:
|
|
def __init__(self, data):
|
|
self.data = bytes(data)
|
|
self.pos = 0
|
|
|
|
def remaining(self):
|
|
return len(self.data) - self.pos
|
|
|
|
def read_byte(self):
|
|
if self.pos >= len(self.data):
|
|
raise ParseError("Unexpected end (byte)")
|
|
v = self.data[self.pos]; self.pos += 1
|
|
return v
|
|
|
|
def read_short(self):
|
|
if self.pos + 2 > len(self.data):
|
|
raise ParseError("Unexpected end (short)")
|
|
v = struct.unpack_from('<h', self.data, self.pos)[0]; self.pos += 2
|
|
return v
|
|
|
|
def read_ushort(self):
|
|
if self.pos + 2 > len(self.data):
|
|
raise ParseError("Unexpected end (ushort)")
|
|
v = struct.unpack_from('<H', self.data, self.pos)[0]; self.pos += 2
|
|
return v
|
|
|
|
def read_long(self):
|
|
if self.pos + 4 > len(self.data):
|
|
raise ParseError("Unexpected end (long)")
|
|
v = struct.unpack_from('<i', self.data, self.pos)[0]; self.pos += 4
|
|
return v
|
|
|
|
def read_string(self):
|
|
try:
|
|
end = self.data.index(b'\x00', self.pos)
|
|
except ValueError:
|
|
raise ParseError("Unterminated string")
|
|
s = self.data[self.pos:end].decode('latin-1', errors='replace')
|
|
self.pos = end + 1
|
|
return s
|
|
|
|
def skip(self, n):
|
|
if self.pos + n > len(self.data):
|
|
raise ParseError(f"Cannot skip {n} bytes")
|
|
self.pos += n
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Demo state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MvdState:
|
|
def __init__(self):
|
|
self.mapname = ''
|
|
self.gamedir = ''
|
|
self.configstrings = {}
|
|
self.configstrings_first = {} # first-seen value per index
|
|
self.cvars = {}
|
|
self.layouts = []
|
|
self.truncated = False
|
|
self.cs_playerskins = CS_PLAYERSKINS_OLD
|
|
self.cs_general = CS_GENERAL_OLD
|
|
self.cs_end = MAX_CS_OLD
|
|
self._in_serverdata = False # True only while processing MVD_SERVERDATA configs
|
|
self._match_phase = False # True after CS_GENERAL+6 transitions to non-warmup
|
|
self.match_confirmed_slots = {} # slot → raw value, set via MVD_CONFIGSTRING after match starts
|
|
|
|
def set_cs(self, idx, val):
|
|
if idx not in self.configstrings_first:
|
|
self.configstrings_first[idx] = val
|
|
self.configstrings[idx] = val
|
|
if idx == 0:
|
|
self.mapname = val.strip()
|
|
|
|
# Detect match phase start via explicit MVD_CONFIGSTRING (not SERVERDATA)
|
|
if not self._in_serverdata and idx == self.cs_general + 6:
|
|
vl = ''.join(chr(c & 0x7F) for c in val.encode('latin-1')).strip().lower()
|
|
if vl not in ('', 'warmup', 'countdown'):
|
|
self._match_phase = True
|
|
|
|
# Track spectator string updates that happen during match phase
|
|
if not self._in_serverdata and self._match_phase:
|
|
base = self.cs_general + MAX_CLIENTS
|
|
if base <= idx < base + MAX_CLIENTS:
|
|
self.match_confirmed_slots[idx - base] = val
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unicast SVC scanner (layout + stufftext)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _scan_unicast(buf, length, state):
|
|
"""
|
|
Scan unicast payload for svc_layout and svc_stufftext.
|
|
length is the payload size NOT including the clientNum byte.
|
|
"""
|
|
client_num = buf.read_byte() # consume clientNum (1 byte, outside length)
|
|
end = buf.pos + length
|
|
if end > len(buf.data):
|
|
end = len(buf.data)
|
|
|
|
while buf.pos < end:
|
|
try:
|
|
cmd = buf.read_byte()
|
|
except ParseError:
|
|
break
|
|
|
|
if cmd == SVC_LAYOUT:
|
|
try:
|
|
layout = buf.read_string()
|
|
state.layouts.append(layout)
|
|
except ParseError:
|
|
break
|
|
|
|
elif cmd == SVC_STUFFTEXT:
|
|
try:
|
|
text = buf.read_string()
|
|
for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text):
|
|
state.cvars[m.group(1)] = m.group(2)
|
|
except ParseError:
|
|
break
|
|
|
|
elif cmd == SVC_PRINT:
|
|
try:
|
|
buf.read_byte()
|
|
buf.read_string()
|
|
except ParseError:
|
|
break
|
|
|
|
elif cmd == SVC_CONFIGSTRING:
|
|
try:
|
|
buf.read_ushort()
|
|
buf.read_string()
|
|
except ParseError:
|
|
break
|
|
|
|
else:
|
|
break # unknown/complex SVC — bail on rest of unicast
|
|
|
|
buf.pos = end
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MVD message parser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _process_mvd_message(buf, state):
|
|
while buf.remaining() > 0:
|
|
try:
|
|
raw = buf.read_byte()
|
|
except ParseError:
|
|
break
|
|
|
|
extrabits = raw >> SVCMD_BITS
|
|
cmd = raw & SVCMD_MASK
|
|
|
|
if cmd == MVD_NOP:
|
|
continue
|
|
|
|
elif cmd == MVD_SERVERDATA:
|
|
try:
|
|
protocol = buf.read_long()
|
|
if protocol != 37:
|
|
return
|
|
version = buf.read_ushort()
|
|
if version >= PROTO_MVD_EXTLIMITS_2:
|
|
flags = buf.read_ushort()
|
|
else:
|
|
flags = extrabits
|
|
use_new = (version >= PROTO_MVD_EXTLIMITS and bool(flags & MVF_EXTLIMITS))
|
|
if use_new:
|
|
state.cs_playerskins = CS_PLAYERSKINS_NEW
|
|
state.cs_general = CS_GENERAL_NEW
|
|
state.cs_end = MAX_CS_NEW
|
|
else:
|
|
state.cs_playerskins = CS_PLAYERSKINS_OLD
|
|
state.cs_general = CS_GENERAL_OLD
|
|
state.cs_end = MAX_CS_OLD
|
|
|
|
buf.read_long() # servercount
|
|
state.gamedir = buf.read_string()
|
|
buf.read_short() # clientNum
|
|
|
|
state._in_serverdata = True
|
|
while True:
|
|
idx = buf.read_ushort()
|
|
if idx == state.cs_end:
|
|
break
|
|
val = buf.read_string()
|
|
state.set_cs(idx, val)
|
|
state._in_serverdata = False
|
|
except ParseError:
|
|
state._in_serverdata = False
|
|
state.truncated = True
|
|
return # entity baselines follow — bail on rest of packet
|
|
|
|
elif cmd == MVD_CONFIGSTRING:
|
|
try:
|
|
idx = buf.read_ushort()
|
|
val = buf.read_string()
|
|
state.set_cs(idx, val)
|
|
except ParseError:
|
|
state.truncated = True
|
|
return
|
|
|
|
elif cmd in (MVD_UNICAST, MVD_UNICAST_R):
|
|
try:
|
|
length = buf.read_byte() | (extrabits << 8)
|
|
_scan_unicast(buf, length, state)
|
|
except ParseError:
|
|
state.truncated = True
|
|
return
|
|
|
|
elif MVD_MULTICAST_ALL <= cmd <= MVD_MULTICAST_ALL_R + 2:
|
|
# multicast: length byte, optional leafnum, data payload
|
|
try:
|
|
length = buf.read_byte() | (extrabits << 8)
|
|
to = cmd - MVD_MULTICAST_ALL
|
|
if to >= 3:
|
|
to -= 3
|
|
if to != 0:
|
|
buf.read_ushort() # leafnum
|
|
buf.skip(length)
|
|
except ParseError:
|
|
state.truncated = True
|
|
return
|
|
|
|
elif cmd == 17: # mvd_print
|
|
try:
|
|
buf.read_byte()
|
|
buf.read_string()
|
|
except ParseError:
|
|
state.truncated = True
|
|
return
|
|
|
|
else:
|
|
return # mvd_frame (6) or mvd_sound (16) — bail
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File parser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_mvd2(path):
|
|
state = MvdState()
|
|
|
|
with open(path, 'rb') as f:
|
|
data = f.read()
|
|
|
|
if len(data) < 4 or data[:4] != MVD_MAGIC:
|
|
raise ParseError(f"Not an MVD2 file: {path}")
|
|
|
|
pos = 4
|
|
had_eof = False
|
|
|
|
while pos < len(data):
|
|
if pos + 2 > len(data):
|
|
state.truncated = True
|
|
break
|
|
msglen = struct.unpack_from('<H', data, pos)[0]
|
|
pos += 2
|
|
if msglen == 0:
|
|
had_eof = True
|
|
break
|
|
if pos + msglen > len(data):
|
|
state.truncated = True
|
|
msglen = len(data) - pos
|
|
packet = data[pos:pos + msglen]
|
|
pos += msglen
|
|
try:
|
|
_process_mvd_message(Buffer(packet), state)
|
|
except ParseError:
|
|
state.truncated = True
|
|
|
|
if not had_eof:
|
|
state.truncated = True
|
|
|
|
return state
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layout parsing (fallback, from dem_parser.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _is_header(text):
|
|
return bool(set(text.lower().split()) & _HEADER_WORDS)
|
|
|
|
|
|
def _parse_layout_entries(layout):
|
|
return [(int(m.group(1)), m.group(2))
|
|
for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout)]
|
|
|
|
|
|
def _parse_player_tdm(content):
|
|
if len(content) < 20:
|
|
return None
|
|
name = content[:15].rstrip()
|
|
if not name or _is_header(name):
|
|
return None
|
|
nums = re.findall(r'-?\d+', content[15:])
|
|
if len(nums) < 2:
|
|
return None
|
|
try:
|
|
return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])}
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _best_scoreboard_layout(layouts):
|
|
"""Return the last layout that has a proper scoreboard header."""
|
|
for layout in reversed(layouts):
|
|
entries = _parse_layout_entries(layout)
|
|
if any('Name' in c and 'Frags' in c for _, c in entries):
|
|
return layout
|
|
return None
|
|
|
|
|
|
def parse_layout_tdm(layout):
|
|
entries = _parse_layout_entries(layout)
|
|
|
|
team_scores = []
|
|
for yv, content in sorted(entries, key=lambda x: x[0]):
|
|
if yv > 30 or _is_header(content) or not content.strip():
|
|
continue
|
|
parts = content.rsplit(None, 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
team_scores.append(int(parts[1]))
|
|
except ValueError:
|
|
pass
|
|
|
|
ta_score = team_scores[0] if len(team_scores) > 0 else 0
|
|
tb_score = team_scores[1] if len(team_scores) > 1 else 0
|
|
|
|
header_yvs = sorted(yv for yv, c in entries if 'Name' in c and 'Frags' in c)
|
|
split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999
|
|
first_header_yv = header_yvs[0] if header_yvs else 0
|
|
|
|
ta_players, tb_players = [], []
|
|
for yv, content in entries:
|
|
if _is_header(content) or not content.strip():
|
|
continue
|
|
p = _parse_player_tdm(content)
|
|
if p is None:
|
|
continue
|
|
if first_header_yv < yv <= split_yv:
|
|
ta_players.append(p)
|
|
elif yv > split_yv:
|
|
tb_players.append(p)
|
|
|
|
return ta_players, tb_players, ta_score, tb_score
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _strip_q2color(s):
|
|
"""Remove Quake II high-bit color encoding and strip whitespace."""
|
|
return ''.join(chr(c & 0x7F) for c in s.encode('latin-1')).strip()
|
|
|
|
|
|
def get_team_names(state):
|
|
gen = state.cs_general
|
|
a = _strip_q2color(state.configstrings.get(gen, '')) or 'Team A'
|
|
b = _strip_q2color(state.configstrings.get(gen + 1, '')) or 'Team B'
|
|
return a, b
|
|
|
|
|
|
def get_team_scores_from_cs(state):
|
|
gen = state.cs_general
|
|
try:
|
|
sa = int(_strip_q2color(state.configstrings.get(gen + 2, '')))
|
|
except ValueError:
|
|
sa = None
|
|
try:
|
|
sb = int(_strip_q2color(state.configstrings.get(gen + 3, '')))
|
|
except ValueError:
|
|
sb = None
|
|
return sa, sb
|
|
|
|
|
|
def get_timelimit(state):
|
|
"""
|
|
Extract configured timelimit (minutes) from the initial value of
|
|
CS_TDM_TIMELIMIT_STRING (CS_GENERAL+4), which OpenTDM sets to 'MM:SS'
|
|
at server startup before the countdown begins.
|
|
"""
|
|
gen = state.cs_general
|
|
raw = state.configstrings_first.get(gen + 4, '')
|
|
decoded = _strip_q2color(raw)
|
|
m = re.match(r'^(\d+):(\d{2})$', decoded)
|
|
if m:
|
|
mins = int(m.group(1))
|
|
secs = int(m.group(2))
|
|
# round up to nearest minute
|
|
return mins + (1 if secs > 0 else 0)
|
|
return None
|
|
|
|
|
|
def get_players_from_spectator_strings(state):
|
|
"""
|
|
Return (team_a_names, team_b_names) using CS_GENERAL+256+slot entries.
|
|
Format: "playername (<teamlabel>)" where <teamlabel> may be the actual
|
|
team name from CS_GENERAL+0/1 OR generic "Hometeam"/"Visitors".
|
|
Filters out [MVDSPEC], empty names, and duplicates.
|
|
"""
|
|
gen = state.cs_general
|
|
base = gen + MAX_CLIENTS # CS_TDM_SPECTATOR_STRINGS
|
|
|
|
name_a = _strip_q2color(state.configstrings.get(gen, '')).lower()
|
|
name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower()
|
|
|
|
def _is_team_a(label):
|
|
l = label.lower()
|
|
return l == 'hometeam' or (name_a and l == name_a)
|
|
|
|
def _is_team_b(label):
|
|
l = label.lower()
|
|
return l == 'visitors' or (name_b and l == name_b)
|
|
|
|
team_a, team_b = [], []
|
|
seen = set()
|
|
|
|
for i in range(MAX_CLIENTS):
|
|
val = _strip_q2color(state.configstrings.get(base + i, ''))
|
|
if not val:
|
|
continue
|
|
m = re.match(r'^(.+?)\s+\((.+)\)$', val)
|
|
if not m:
|
|
continue
|
|
name = m.group(1).strip()
|
|
label = m.group(2).strip()
|
|
if not name or '[MVDSPEC]' in name or name in seen:
|
|
continue
|
|
seen.add(name)
|
|
if _is_team_a(label):
|
|
team_a.append(name)
|
|
elif _is_team_b(label):
|
|
team_b.append(name)
|
|
# unknown label → skip (spectator or unrecognised)
|
|
|
|
return team_a, team_b
|
|
|
|
|
|
def get_players_from_skins(state, team_a_name, team_b_name):
|
|
"""
|
|
Last-resort fallback: assign players from CS_PLAYERSKINS by matching
|
|
player name against team names. Works well for 1v1 where team name = player.
|
|
"""
|
|
base = state.cs_playerskins
|
|
team_a, team_b, unknown = [], [], []
|
|
seen = set()
|
|
|
|
for i in range(MAX_CLIENTS):
|
|
raw = state.configstrings.get(base + i, '').strip()
|
|
if not raw:
|
|
continue
|
|
name = raw.split('\\')[0].strip()
|
|
if not name or '[MVDSPEC]' in name or name in seen:
|
|
continue
|
|
seen.add(name)
|
|
if name.lower() == team_a_name.lower():
|
|
team_a.append(name)
|
|
elif name.lower() == team_b_name.lower():
|
|
team_b.append(name)
|
|
else:
|
|
unknown.append(name)
|
|
|
|
# distribute unmatched players evenly (best effort)
|
|
if unknown and (not team_a or not team_b):
|
|
half = len(unknown) // 2 or len(unknown)
|
|
if not team_a:
|
|
team_a, unknown = unknown[:half], unknown[half:]
|
|
if not team_b:
|
|
team_b, unknown = unknown[:len(unknown)], []
|
|
|
|
return team_a, team_b
|
|
|
|
|
|
def get_players_from_match_phase(state):
|
|
"""
|
|
Return (team_a, team_b) using spectator string entries that were explicitly
|
|
re-sent via MVD_CONFIGSTRING after the game status transitioned from warmup
|
|
to a match state. These are the confirmed match participants.
|
|
"""
|
|
gen = state.cs_general
|
|
name_a = _strip_q2color(state.configstrings.get(gen, '')).lower()
|
|
name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower()
|
|
|
|
def _is_team_a(label):
|
|
l = label.lower()
|
|
return l == 'hometeam' or (name_a and l == name_a)
|
|
|
|
def _is_team_b(label):
|
|
l = label.lower()
|
|
return l == 'visitors' or (name_b and l == name_b)
|
|
|
|
team_a, team_b = [], []
|
|
seen = set()
|
|
|
|
for slot, val in state.match_confirmed_slots.items():
|
|
stripped = _strip_q2color(val)
|
|
m = re.match(r'^(.+?)\s+\((.+)\)$', stripped)
|
|
if not m:
|
|
continue
|
|
name = m.group(1).strip()
|
|
label = m.group(2).strip()
|
|
if not name or '[MVDSPEC]' in name or name in seen:
|
|
continue
|
|
seen.add(name)
|
|
if _is_team_a(label):
|
|
team_a.append(name)
|
|
elif _is_team_b(label):
|
|
team_b.append(name)
|
|
|
|
return team_a, team_b
|
|
|
|
|
|
def _date_from_filename(name):
|
|
m = re.search(r'(\d{8})-(\d{6})', name)
|
|
if m:
|
|
try:
|
|
dt = datetime.strptime(m.group(1) + m.group(2), '%Y%m%d%H%M%S')
|
|
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
pass
|
|
return ''
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Row builder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_row(path, state):
|
|
filename = path.name
|
|
date = _date_from_filename(filename)
|
|
# CS_MODELS_OLD+1=33 or CS_MODELS_NEW+1=63 holds "maps/q2dm1.bsp"
|
|
cs_bsp_idx = 33 if state.cs_playerskins == CS_PLAYERSKINS_OLD else 63
|
|
bsp = state.configstrings.get(cs_bsp_idx, '').strip()
|
|
mapname = Path(bsp).stem if bsp else (state.mapname or path.stem.split('_')[-1])
|
|
|
|
team_a_name, team_b_name = get_team_names(state)
|
|
cs_score_a, cs_score_b = get_team_scores_from_cs(state)
|
|
|
|
# Primary: spectator strings explicitly re-confirmed after match start.
|
|
# OpenTDM re-sends entries for actual match participants when game status
|
|
# transitions from Warmup/Countdown to Match.
|
|
ta_names, tb_names = get_players_from_match_phase(state)
|
|
|
|
# Pre-compute best layout (used as fallback at multiple levels).
|
|
layout_ta, layout_tb, layout_sa, layout_sb = [], [], None, None
|
|
best_layout = _best_scoreboard_layout(state.layouts)
|
|
if best_layout:
|
|
ta_p, tb_p, layout_sa, layout_sb = parse_layout_tdm(best_layout)
|
|
layout_ta = [p['name'] for p in ta_p]
|
|
layout_tb = [p['name'] for p in tb_p]
|
|
|
|
# If match-phase gave nothing, prefer layout over all spectator strings.
|
|
# (OpenTDM may transition status without re-sending spectator confirmations.)
|
|
if not ta_names and not tb_names:
|
|
if layout_ta or layout_tb:
|
|
ta_names, tb_names = layout_ta, layout_tb
|
|
if cs_score_a is None: cs_score_a = layout_sa
|
|
if cs_score_b is None: cs_score_b = layout_sb
|
|
else:
|
|
ta_names, tb_names = get_players_from_spectator_strings(state)
|
|
|
|
# Fill any single missing team from layout.
|
|
if not ta_names and layout_ta:
|
|
ta_names = layout_ta
|
|
if cs_score_a is None: cs_score_a = layout_sa
|
|
if not tb_names and layout_tb:
|
|
tb_names = layout_tb
|
|
if cs_score_b is None: cs_score_b = layout_sb
|
|
|
|
# If one team is still missing (no layout either), try spectator strings.
|
|
if not ta_names or not tb_names:
|
|
spec_a, spec_b = get_players_from_spectator_strings(state)
|
|
if not ta_names: ta_names = spec_a
|
|
if not tb_names: tb_names = spec_b
|
|
|
|
# Last resort: player skin configstrings + team name matching.
|
|
if not ta_names and not tb_names:
|
|
ta_names, tb_names = get_players_from_skins(state, team_a_name, team_b_name)
|
|
|
|
players_a = ','.join(ta_names)
|
|
players_b = ','.join(tb_names)
|
|
|
|
score_a = cs_score_a if cs_score_a is not None else 0
|
|
score_b = cs_score_b if cs_score_b is not None else 0
|
|
|
|
timelimit = get_timelimit(state)
|
|
timelimit_str = str(timelimit) if timelimit is not None else ''
|
|
|
|
return {
|
|
'file': filename,
|
|
'date': date,
|
|
'map': mapname,
|
|
'players_a': players_a,
|
|
'players_b': players_b,
|
|
'score': f'{score_a}:{score_b}',
|
|
'timelimit_min': timelimit_str,
|
|
'truncated': 'yes' if state.truncated else 'no',
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
FIELDNAMES = [
|
|
'file', 'date', 'map',
|
|
'players_a', 'score', 'players_b',
|
|
'timelimit_min', 'truncated',
|
|
]
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(
|
|
description='Parse all .mvd2 demos in a directory and output a CSV report.'
|
|
)
|
|
ap.add_argument('directory', nargs='?', default='.',
|
|
help='Directory containing .mvd2 files (default: current dir)')
|
|
ap.add_argument('-o', '--output', default='',
|
|
help='Output CSV file (default: mvd_report.csv in the demo dir)')
|
|
args = ap.parse_args()
|
|
|
|
demo_dir = Path(args.directory).resolve()
|
|
if not demo_dir.is_dir():
|
|
print(f"Error: not a directory: {demo_dir}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
files = sorted(demo_dir.glob('*.mvd2'))
|
|
if not files:
|
|
print(f"No .mvd2 files found in {demo_dir}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
out_path = (Path(args.output).resolve() if args.output
|
|
else demo_dir / 'mvd_report.csv')
|
|
|
|
rows = []
|
|
for demo_path in files:
|
|
try:
|
|
state = parse_mvd2(demo_path)
|
|
row = build_row(demo_path, state)
|
|
rows.append(row)
|
|
trunc = ' [TRUNCATED]' if state.truncated else ''
|
|
print(f"OK {demo_path.name}{trunc}")
|
|
except Exception as exc:
|
|
print(f"ERR {demo_path.name}: {exc}", file=sys.stderr)
|
|
rows.append({
|
|
'file': demo_path.name,
|
|
'date': _date_from_filename(demo_path.name),
|
|
'map': '', 'players_a': '', 'players_b': '',
|
|
'score': '', 'timelimit_min': '', 'truncated': 'error',
|
|
})
|
|
|
|
with open(out_path, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=FIELDNAMES, delimiter=';')
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
print(f"\nReport saved: {out_path} ({len(rows)} demos)")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|