Browse Source

adding script

master
Przemyslaw Kadej 3 weeks ago
parent
commit
9a93143f14
  1. 747
      scripts/mvd_csv.py

747
scripts/mvd_csv.py

@ -0,0 +1,747 @@
#!/usr/bin/env python3
"""
MVD2 demo parser generates a CSV report from all .mvd2 files in CWD.
Usage:
cd /path/to/demos && python3 mvd_csv.py
python3 mvd_csv.py /path/to/demos/ # specify dir
python3 mvd_csv.py -o report.csv # specify output file
"""
import struct
import re
import sys
import csv
import argparse
from pathlib import Path
from datetime import datetime
# ---------------------------------------------------------------------------
# Protocol constants (q2pro inc/common/protocol.h, inc/shared/shared.h)
# ---------------------------------------------------------------------------
MVD_MAGIC = b'MVD2'
MVD_NOP = 1
MVD_SERVERDATA = 4
MVD_CONFIGSTRING = 5
MVD_FRAME = 6
MVD_UNICAST = 8
MVD_UNICAST_R = 9
MVD_MULTICAST_ALL = 10
MVD_MULTICAST_ALL_R = 13
SVCMD_BITS = 5
SVCMD_MASK = 0x1F
# SVC opcodes used inside unicast messages
SVC_LAYOUT = 4
SVC_PRINT = 10
SVC_STUFFTEXT = 11
SVC_CONFIGSTRING = 13
PROTO_MVD_EXTLIMITS = 2011
PROTO_MVD_EXTLIMITS_2 = 2012
MVF_EXTLIMITS = 1
# Old configstring remap (default, used for version < 2011)
CS_PLAYERSKINS_OLD = 1312
CS_GENERAL_OLD = 1568
MAX_CS_OLD = 2080
# Extended configstring remap (version >= 2011 with MVF_EXTLIMITS)
CS_PLAYERSKINS_NEW = 12862
CS_GENERAL_NEW = 13118
MAX_CS_NEW = 13630
# OpenTDM configstring offsets from CS_GENERAL (opentdm/g_local.h)
# CS_GENERAL+0 team A name
# CS_GENERAL+1 team B name
# CS_GENERAL+2 team A score
# CS_GENERAL+3 team B score
# CS_GENERAL+4 timelimit countdown string (initial value = configured limit)
# CS_GENERAL+6 game status ("Warmup", "Match End", …)
# CS_GENERAL+MAX_CLIENTS(256)+slot "playername (Hometeam|Visitors)"
MAX_CLIENTS = 256
_HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping',
'net', 'eff', 'fph', 'time', 'player', 'rank'}
# ---------------------------------------------------------------------------
# Buffer reader
# ---------------------------------------------------------------------------
class ParseError(Exception):
pass
class Buffer:
def __init__(self, data):
self.data = bytes(data)
self.pos = 0
def remaining(self):
return len(self.data) - self.pos
def read_byte(self):
if self.pos >= len(self.data):
raise ParseError("Unexpected end (byte)")
v = self.data[self.pos]; self.pos += 1
return v
def read_short(self):
if self.pos + 2 > len(self.data):
raise ParseError("Unexpected end (short)")
v = struct.unpack_from('<h', self.data, self.pos)[0]; self.pos += 2
return v
def read_ushort(self):
if self.pos + 2 > len(self.data):
raise ParseError("Unexpected end (ushort)")
v = struct.unpack_from('<H', self.data, self.pos)[0]; self.pos += 2
return v
def read_long(self):
if self.pos + 4 > len(self.data):
raise ParseError("Unexpected end (long)")
v = struct.unpack_from('<i', self.data, self.pos)[0]; self.pos += 4
return v
def read_string(self):
try:
end = self.data.index(b'\x00', self.pos)
except ValueError:
raise ParseError("Unterminated string")
s = self.data[self.pos:end].decode('latin-1', errors='replace')
self.pos = end + 1
return s
def skip(self, n):
if self.pos + n > len(self.data):
raise ParseError(f"Cannot skip {n} bytes")
self.pos += n
# ---------------------------------------------------------------------------
# Demo state
# ---------------------------------------------------------------------------
class MvdState:
def __init__(self):
self.mapname = ''
self.gamedir = ''
self.configstrings = {}
self.configstrings_first = {} # first-seen value per index
self.cvars = {}
self.layouts = []
self.truncated = False
self.cs_playerskins = CS_PLAYERSKINS_OLD
self.cs_general = CS_GENERAL_OLD
self.cs_end = MAX_CS_OLD
self._in_serverdata = False # True only while processing MVD_SERVERDATA configs
self._match_phase = False # True after CS_GENERAL+6 transitions to non-warmup
self.match_confirmed_slots = {} # slot → raw value, set via MVD_CONFIGSTRING after match starts
def set_cs(self, idx, val):
if idx not in self.configstrings_first:
self.configstrings_first[idx] = val
self.configstrings[idx] = val
if idx == 0:
self.mapname = val.strip()
# Detect match phase start via explicit MVD_CONFIGSTRING (not SERVERDATA)
if not self._in_serverdata and idx == self.cs_general + 6:
vl = ''.join(chr(c & 0x7F) for c in val.encode('latin-1')).strip().lower()
if vl not in ('', 'warmup', 'countdown'):
self._match_phase = True
# Track spectator string updates that happen during match phase
if not self._in_serverdata and self._match_phase:
base = self.cs_general + MAX_CLIENTS
if base <= idx < base + MAX_CLIENTS:
self.match_confirmed_slots[idx - base] = val
# ---------------------------------------------------------------------------
# Unicast SVC scanner (layout + stufftext)
# ---------------------------------------------------------------------------
def _scan_unicast(buf, length, state):
"""
Scan unicast payload for svc_layout and svc_stufftext.
length is the payload size NOT including the clientNum byte.
"""
client_num = buf.read_byte() # consume clientNum (1 byte, outside length)
end = buf.pos + length
if end > len(buf.data):
end = len(buf.data)
while buf.pos < end:
try:
cmd = buf.read_byte()
except ParseError:
break
if cmd == SVC_LAYOUT:
try:
layout = buf.read_string()
state.layouts.append(layout)
except ParseError:
break
elif cmd == SVC_STUFFTEXT:
try:
text = buf.read_string()
for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text):
state.cvars[m.group(1)] = m.group(2)
except ParseError:
break
elif cmd == SVC_PRINT:
try:
buf.read_byte()
buf.read_string()
except ParseError:
break
elif cmd == SVC_CONFIGSTRING:
try:
buf.read_ushort()
buf.read_string()
except ParseError:
break
else:
break # unknown/complex SVC — bail on rest of unicast
buf.pos = end
# ---------------------------------------------------------------------------
# MVD message parser
# ---------------------------------------------------------------------------
def _process_mvd_message(buf, state):
while buf.remaining() > 0:
try:
raw = buf.read_byte()
except ParseError:
break
extrabits = raw >> SVCMD_BITS
cmd = raw & SVCMD_MASK
if cmd == MVD_NOP:
continue
elif cmd == MVD_SERVERDATA:
try:
protocol = buf.read_long()
if protocol != 37:
return
version = buf.read_ushort()
if version >= PROTO_MVD_EXTLIMITS_2:
flags = buf.read_ushort()
else:
flags = extrabits
use_new = (version >= PROTO_MVD_EXTLIMITS and bool(flags & MVF_EXTLIMITS))
if use_new:
state.cs_playerskins = CS_PLAYERSKINS_NEW
state.cs_general = CS_GENERAL_NEW
state.cs_end = MAX_CS_NEW
else:
state.cs_playerskins = CS_PLAYERSKINS_OLD
state.cs_general = CS_GENERAL_OLD
state.cs_end = MAX_CS_OLD
buf.read_long() # servercount
state.gamedir = buf.read_string()
buf.read_short() # clientNum
state._in_serverdata = True
while True:
idx = buf.read_ushort()
if idx == state.cs_end:
break
val = buf.read_string()
state.set_cs(idx, val)
state._in_serverdata = False
except ParseError:
state._in_serverdata = False
state.truncated = True
return # entity baselines follow — bail on rest of packet
elif cmd == MVD_CONFIGSTRING:
try:
idx = buf.read_ushort()
val = buf.read_string()
state.set_cs(idx, val)
except ParseError:
state.truncated = True
return
elif cmd in (MVD_UNICAST, MVD_UNICAST_R):
try:
length = buf.read_byte() | (extrabits << 8)
_scan_unicast(buf, length, state)
except ParseError:
state.truncated = True
return
elif MVD_MULTICAST_ALL <= cmd <= MVD_MULTICAST_ALL_R + 2:
# multicast: length byte, optional leafnum, data payload
try:
length = buf.read_byte() | (extrabits << 8)
to = cmd - MVD_MULTICAST_ALL
if to >= 3:
to -= 3
if to != 0:
buf.read_ushort() # leafnum
buf.skip(length)
except ParseError:
state.truncated = True
return
elif cmd == 17: # mvd_print
try:
buf.read_byte()
buf.read_string()
except ParseError:
state.truncated = True
return
else:
return # mvd_frame (6) or mvd_sound (16) — bail
# ---------------------------------------------------------------------------
# File parser
# ---------------------------------------------------------------------------
def parse_mvd2(path):
state = MvdState()
with open(path, 'rb') as f:
data = f.read()
if len(data) < 4 or data[:4] != MVD_MAGIC:
raise ParseError(f"Not an MVD2 file: {path}")
pos = 4
had_eof = False
while pos < len(data):
if pos + 2 > len(data):
state.truncated = True
break
msglen = struct.unpack_from('<H', data, pos)[0]
pos += 2
if msglen == 0:
had_eof = True
break
if pos + msglen > len(data):
state.truncated = True
msglen = len(data) - pos
packet = data[pos:pos + msglen]
pos += msglen
try:
_process_mvd_message(Buffer(packet), state)
except ParseError:
state.truncated = True
if not had_eof:
state.truncated = True
return state
# ---------------------------------------------------------------------------
# Layout parsing (fallback, from dem_parser.py)
# ---------------------------------------------------------------------------
def _is_header(text):
return bool(set(text.lower().split()) & _HEADER_WORDS)
def _parse_layout_entries(layout):
return [(int(m.group(1)), m.group(2))
for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout)]
def _parse_player_tdm(content):
if len(content) < 20:
return None
name = content[:15].rstrip()
if not name or _is_header(name):
return None
nums = re.findall(r'-?\d+', content[15:])
if len(nums) < 2:
return None
try:
return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])}
except ValueError:
return None
def _best_scoreboard_layout(layouts):
"""Return the last layout that has a proper scoreboard header."""
for layout in reversed(layouts):
entries = _parse_layout_entries(layout)
if any('Name' in c and 'Frags' in c for _, c in entries):
return layout
return None
def parse_layout_tdm(layout):
entries = _parse_layout_entries(layout)
team_scores = []
for yv, content in sorted(entries, key=lambda x: x[0]):
if yv > 30 or _is_header(content) or not content.strip():
continue
parts = content.rsplit(None, 1)
if len(parts) == 2:
try:
team_scores.append(int(parts[1]))
except ValueError:
pass
ta_score = team_scores[0] if len(team_scores) > 0 else 0
tb_score = team_scores[1] if len(team_scores) > 1 else 0
header_yvs = sorted(yv for yv, c in entries if 'Name' in c and 'Frags' in c)
split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999
first_header_yv = header_yvs[0] if header_yvs else 0
ta_players, tb_players = [], []
for yv, content in entries:
if _is_header(content) or not content.strip():
continue
p = _parse_player_tdm(content)
if p is None:
continue
if first_header_yv < yv <= split_yv:
ta_players.append(p)
elif yv > split_yv:
tb_players.append(p)
return ta_players, tb_players, ta_score, tb_score
# ---------------------------------------------------------------------------
# State helpers
# ---------------------------------------------------------------------------
def _strip_q2color(s):
"""Remove Quake II high-bit color encoding and strip whitespace."""
return ''.join(chr(c & 0x7F) for c in s.encode('latin-1')).strip()
def get_team_names(state):
gen = state.cs_general
a = _strip_q2color(state.configstrings.get(gen, '')) or 'Team A'
b = _strip_q2color(state.configstrings.get(gen + 1, '')) or 'Team B'
return a, b
def get_team_scores_from_cs(state):
gen = state.cs_general
try:
sa = int(_strip_q2color(state.configstrings.get(gen + 2, '')))
except ValueError:
sa = None
try:
sb = int(_strip_q2color(state.configstrings.get(gen + 3, '')))
except ValueError:
sb = None
return sa, sb
def get_timelimit(state):
"""
Extract configured timelimit (minutes) from the initial value of
CS_TDM_TIMELIMIT_STRING (CS_GENERAL+4), which OpenTDM sets to 'MM:SS'
at server startup before the countdown begins.
"""
gen = state.cs_general
raw = state.configstrings_first.get(gen + 4, '')
decoded = _strip_q2color(raw)
m = re.match(r'^(\d+):(\d{2})$', decoded)
if m:
mins = int(m.group(1))
secs = int(m.group(2))
# round up to nearest minute
return mins + (1 if secs > 0 else 0)
return None
def get_players_from_spectator_strings(state):
"""
Return (team_a_names, team_b_names) using CS_GENERAL+256+slot entries.
Format: "playername (<teamlabel>)" where <teamlabel> may be the actual
team name from CS_GENERAL+0/1 OR generic "Hometeam"/"Visitors".
Filters out [MVDSPEC], empty names, and duplicates.
"""
gen = state.cs_general
base = gen + MAX_CLIENTS # CS_TDM_SPECTATOR_STRINGS
name_a = _strip_q2color(state.configstrings.get(gen, '')).lower()
name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower()
def _is_team_a(label):
l = label.lower()
return l == 'hometeam' or (name_a and l == name_a)
def _is_team_b(label):
l = label.lower()
return l == 'visitors' or (name_b and l == name_b)
team_a, team_b = [], []
seen = set()
for i in range(MAX_CLIENTS):
val = _strip_q2color(state.configstrings.get(base + i, ''))
if not val:
continue
m = re.match(r'^(.+?)\s+\((.+)\)$', val)
if not m:
continue
name = m.group(1).strip()
label = m.group(2).strip()
if not name or '[MVDSPEC]' in name or name in seen:
continue
seen.add(name)
if _is_team_a(label):
team_a.append(name)
elif _is_team_b(label):
team_b.append(name)
# unknown label → skip (spectator or unrecognised)
return team_a, team_b
def get_players_from_skins(state, team_a_name, team_b_name):
"""
Last-resort fallback: assign players from CS_PLAYERSKINS by matching
player name against team names. Works well for 1v1 where team name = player.
"""
base = state.cs_playerskins
team_a, team_b, unknown = [], [], []
seen = set()
for i in range(MAX_CLIENTS):
raw = state.configstrings.get(base + i, '').strip()
if not raw:
continue
name = raw.split('\\')[0].strip()
if not name or '[MVDSPEC]' in name or name in seen:
continue
seen.add(name)
if name.lower() == team_a_name.lower():
team_a.append(name)
elif name.lower() == team_b_name.lower():
team_b.append(name)
else:
unknown.append(name)
# distribute unmatched players evenly (best effort)
if unknown and (not team_a or not team_b):
half = len(unknown) // 2 or len(unknown)
if not team_a:
team_a, unknown = unknown[:half], unknown[half:]
if not team_b:
team_b, unknown = unknown[:len(unknown)], []
return team_a, team_b
def get_players_from_match_phase(state):
"""
Return (team_a, team_b) using spectator string entries that were explicitly
re-sent via MVD_CONFIGSTRING after the game status transitioned from warmup
to a match state. These are the confirmed match participants.
"""
gen = state.cs_general
name_a = _strip_q2color(state.configstrings.get(gen, '')).lower()
name_b = _strip_q2color(state.configstrings.get(gen + 1, '')).lower()
def _is_team_a(label):
l = label.lower()
return l == 'hometeam' or (name_a and l == name_a)
def _is_team_b(label):
l = label.lower()
return l == 'visitors' or (name_b and l == name_b)
team_a, team_b = [], []
seen = set()
for slot, val in state.match_confirmed_slots.items():
stripped = _strip_q2color(val)
m = re.match(r'^(.+?)\s+\((.+)\)$', stripped)
if not m:
continue
name = m.group(1).strip()
label = m.group(2).strip()
if not name or '[MVDSPEC]' in name or name in seen:
continue
seen.add(name)
if _is_team_a(label):
team_a.append(name)
elif _is_team_b(label):
team_b.append(name)
return team_a, team_b
def _date_from_filename(name):
m = re.search(r'(\d{8})-(\d{6})', name)
if m:
try:
dt = datetime.strptime(m.group(1) + m.group(2), '%Y%m%d%H%M%S')
return dt.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
pass
return ''
# ---------------------------------------------------------------------------
# Row builder
# ---------------------------------------------------------------------------
def build_row(path, state):
filename = path.name
date = _date_from_filename(filename)
# CS_MODELS_OLD+1=33 or CS_MODELS_NEW+1=63 holds "maps/q2dm1.bsp"
cs_bsp_idx = 33 if state.cs_playerskins == CS_PLAYERSKINS_OLD else 63
bsp = state.configstrings.get(cs_bsp_idx, '').strip()
mapname = Path(bsp).stem if bsp else (state.mapname or path.stem.split('_')[-1])
team_a_name, team_b_name = get_team_names(state)
cs_score_a, cs_score_b = get_team_scores_from_cs(state)
# Primary: spectator strings explicitly re-confirmed after match start.
# OpenTDM re-sends entries for actual match participants when game status
# transitions from Warmup/Countdown to Match.
ta_names, tb_names = get_players_from_match_phase(state)
# Pre-compute best layout (used as fallback at multiple levels).
layout_ta, layout_tb, layout_sa, layout_sb = [], [], None, None
best_layout = _best_scoreboard_layout(state.layouts)
if best_layout:
ta_p, tb_p, layout_sa, layout_sb = parse_layout_tdm(best_layout)
layout_ta = [p['name'] for p in ta_p]
layout_tb = [p['name'] for p in tb_p]
# If match-phase gave nothing, prefer layout over all spectator strings.
# (OpenTDM may transition status without re-sending spectator confirmations.)
if not ta_names and not tb_names:
if layout_ta or layout_tb:
ta_names, tb_names = layout_ta, layout_tb
if cs_score_a is None: cs_score_a = layout_sa
if cs_score_b is None: cs_score_b = layout_sb
else:
ta_names, tb_names = get_players_from_spectator_strings(state)
# Fill any single missing team from layout.
if not ta_names and layout_ta:
ta_names = layout_ta
if cs_score_a is None: cs_score_a = layout_sa
if not tb_names and layout_tb:
tb_names = layout_tb
if cs_score_b is None: cs_score_b = layout_sb
# If one team is still missing (no layout either), try spectator strings.
if not ta_names or not tb_names:
spec_a, spec_b = get_players_from_spectator_strings(state)
if not ta_names: ta_names = spec_a
if not tb_names: tb_names = spec_b
# Last resort: player skin configstrings + team name matching.
if not ta_names and not tb_names:
ta_names, tb_names = get_players_from_skins(state, team_a_name, team_b_name)
players_a = ','.join(ta_names)
players_b = ','.join(tb_names)
score_a = cs_score_a if cs_score_a is not None else 0
score_b = cs_score_b if cs_score_b is not None else 0
timelimit = get_timelimit(state)
timelimit_str = str(timelimit) if timelimit is not None else ''
return {
'file': filename,
'date': date,
'map': mapname,
'players_a': players_a,
'players_b': players_b,
'score': f'{score_a}:{score_b}',
'timelimit_min': timelimit_str,
'truncated': 'yes' if state.truncated else 'no',
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
FIELDNAMES = [
'file', 'date', 'map',
'players_a', 'score', 'players_b',
'timelimit_min', 'truncated',
]
def main():
ap = argparse.ArgumentParser(
description='Parse all .mvd2 demos in a directory and output a CSV report.'
)
ap.add_argument('directory', nargs='?', default='.',
help='Directory containing .mvd2 files (default: current dir)')
ap.add_argument('-o', '--output', default='',
help='Output CSV file (default: mvd_report.csv in the demo dir)')
args = ap.parse_args()
demo_dir = Path(args.directory).resolve()
if not demo_dir.is_dir():
print(f"Error: not a directory: {demo_dir}", file=sys.stderr)
sys.exit(1)
files = sorted(demo_dir.glob('*.mvd2'))
if not files:
print(f"No .mvd2 files found in {demo_dir}", file=sys.stderr)
sys.exit(1)
out_path = (Path(args.output).resolve() if args.output
else demo_dir / 'mvd_report.csv')
rows = []
for demo_path in files:
try:
state = parse_mvd2(demo_path)
row = build_row(demo_path, state)
rows.append(row)
trunc = ' [TRUNCATED]' if state.truncated else ''
print(f"OK {demo_path.name}{trunc}")
except Exception as exc:
print(f"ERR {demo_path.name}: {exc}", file=sys.stderr)
rows.append({
'file': demo_path.name,
'date': _date_from_filename(demo_path.name),
'map': '', 'players_a': '', 'players_b': '',
'score': '', 'timelimit_min': '', 'truncated': 'error',
})
with open(out_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=FIELDNAMES, delimiter=';')
writer.writeheader()
writer.writerows(rows)
print(f"\nReport saved: {out_path} ({len(rows)} demos)")
if __name__ == '__main__':
main()
Loading…
Cancel
Save