You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
748 lines
24 KiB
748 lines
24 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Q2Pro demo (.dm2) parser - extracts player stats and generates Markdown reports.
|
|
|
|
Usage:
|
|
python dem_parser.py demo.dm2 # stdout
|
|
python dem_parser.py demo.dm2 -o out.md # output file
|
|
python dem_parser.py *.dm2 # batch - foo_report.md per file
|
|
python dem_parser.py /path/to/dir/ # batch - all .dm2 in directory
|
|
python dem_parser.py *.dm2 -y # batch, always continue on errors
|
|
"""
|
|
|
|
import struct
|
|
import re
|
|
import sys
|
|
import os
|
|
import zlib
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# svc_ message types (inc/common/protocol.h)
|
|
SVC_MUZZLEFLASH = 1
|
|
SVC_MUZZLEFLASH2 = 2
|
|
SVC_TEMP_ENTITY = 3
|
|
SVC_LAYOUT = 4
|
|
SVC_INVENTORY = 5
|
|
SVC_NOP = 6
|
|
SVC_DISCONNECT = 7
|
|
SVC_RECONNECT = 8
|
|
SVC_SOUND = 9
|
|
SVC_PRINT = 10
|
|
SVC_STUFFTEXT = 11
|
|
SVC_SERVERDATA = 12
|
|
SVC_CONFIGSTRING = 13
|
|
SVC_SPAWNBASELINE = 14
|
|
SVC_CENTERPRINT = 15
|
|
SVC_DOWNLOAD = 16
|
|
SVC_PLAYERINFO = 17
|
|
SVC_PACKETENTITIES = 18
|
|
SVC_DELTAPACKETENTITIES = 19
|
|
SVC_FRAME = 20
|
|
SVC_ZPACKET = 21
|
|
SVC_ZDOWNLOAD = 22
|
|
SVC_GAMESTATE = 23
|
|
SVC_SETTING = 24
|
|
|
|
# Configstring indices - new protocol (34/36, inc/shared/shared.h)
|
|
CS_MAXCLIENTS = 60
|
|
CS_MODELS = 62
|
|
CS_SOUNDS = CS_MODELS + 8192
|
|
CS_IMAGES = CS_SOUNDS + 2048
|
|
CS_LIGHTS = CS_IMAGES + 2048
|
|
CS_ITEMS = CS_LIGHTS + 256
|
|
CS_PLAYERSKINS = CS_ITEMS + 256 # = 12862 — "name\model\skin"
|
|
CS_GENERAL = CS_PLAYERSKINS + 256 # = 13118
|
|
|
|
# OpenTDM configstring offsets (CS_GENERAL + N, g_local.h:139)
|
|
CS_TDM_TEAM_A_NAME = CS_GENERAL + 0
|
|
CS_TDM_TEAM_B_NAME = CS_GENERAL + 1
|
|
CS_TDM_TEAM_A_STATUS = CS_GENERAL + 2
|
|
CS_TDM_TEAM_B_STATUS = CS_GENERAL + 3
|
|
|
|
# Old protocol configstring indices (protocol 26, also used by opentdm/openffa)
|
|
CS_PLAYERSKINS_OLD = 1312
|
|
CS_GENERAL_OLD = 1568
|
|
|
|
STAT_FRAGS = 14 # g_local.h:65 (opentdm), q_shared.h:1029 (openffa)
|
|
|
|
EOF_MARKER = 0xFFFFFFFF
|
|
|
|
|
|
class ParseError(Exception):
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Binary buffer reader
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class Buffer:
|
|
def __init__(self, data):
|
|
self.data = data if isinstance(data, (bytes, bytearray)) else bytes(data)
|
|
self.pos = 0
|
|
|
|
def remaining(self):
|
|
return len(self.data) - self.pos
|
|
|
|
def read_byte(self):
|
|
if self.pos >= len(self.data):
|
|
raise ParseError("Unexpected end of buffer (byte)")
|
|
v = self.data[self.pos]
|
|
self.pos += 1
|
|
return v
|
|
|
|
def read_short(self):
|
|
if self.pos + 2 > len(self.data):
|
|
raise ParseError("Unexpected end of buffer (short)")
|
|
v = struct.unpack_from('<h', self.data, self.pos)[0]
|
|
self.pos += 2
|
|
return v
|
|
|
|
def read_ushort(self):
|
|
if self.pos + 2 > len(self.data):
|
|
raise ParseError("Unexpected end of buffer (ushort)")
|
|
v = struct.unpack_from('<H', self.data, self.pos)[0]
|
|
self.pos += 2
|
|
return v
|
|
|
|
def read_long(self):
|
|
if self.pos + 4 > len(self.data):
|
|
raise ParseError("Unexpected end of buffer (long)")
|
|
v = struct.unpack_from('<i', self.data, self.pos)[0]
|
|
self.pos += 4
|
|
return v
|
|
|
|
def read_string(self):
|
|
try:
|
|
end = self.data.index(b'\x00', self.pos)
|
|
except ValueError:
|
|
raise ParseError("Unterminated string in buffer")
|
|
s = self.data[self.pos:end].decode('latin-1', errors='replace')
|
|
self.pos = end + 1
|
|
return s
|
|
|
|
def skip(self, n):
|
|
if self.pos + n > len(self.data):
|
|
raise ParseError(f"Cannot skip {n} bytes ({self.remaining()} remaining)")
|
|
self.pos += n
|
|
|
|
def read_bytes(self, n):
|
|
if self.pos + n > len(self.data):
|
|
raise ParseError(f"Cannot read {n} bytes")
|
|
v = self.data[self.pos:self.pos + n]
|
|
self.pos += n
|
|
return v
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Demo state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class DemoState:
|
|
def __init__(self):
|
|
self.protocol = 34
|
|
self.gamedir = ''
|
|
self.mapname = ''
|
|
self.client_num = -1
|
|
self.configstrings = {}
|
|
self.cvars = {} # server cvars from svc_stufftext "set" commands
|
|
self.layouts = [] # all layout strings found
|
|
self.truncated = False # demo ended without svc_disconnect
|
|
self.had_disconnect = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Message processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def process_messages(buf, state):
|
|
"""Process messages from buffer until svc_frame or unknown message."""
|
|
while buf.remaining() > 0:
|
|
cmd = buf.read_byte()
|
|
|
|
if cmd == SVC_NOP:
|
|
continue
|
|
|
|
elif cmd in (SVC_DISCONNECT, SVC_RECONNECT):
|
|
if cmd == SVC_DISCONNECT:
|
|
state.had_disconnect = True
|
|
break
|
|
|
|
elif cmd == SVC_SERVERDATA:
|
|
state.protocol = buf.read_long()
|
|
buf.read_long() # server count
|
|
buf.read_byte() # attract loop
|
|
state.gamedir = buf.read_string()
|
|
state.client_num = buf.read_short()
|
|
state.mapname = buf.read_string()
|
|
|
|
elif cmd == SVC_CONFIGSTRING:
|
|
index = buf.read_ushort()
|
|
value = buf.read_string()
|
|
state.configstrings[index] = value
|
|
|
|
elif cmd == SVC_LAYOUT:
|
|
layout = buf.read_string()
|
|
state.layouts.append(layout)
|
|
|
|
elif cmd == SVC_PRINT:
|
|
buf.read_byte() # print level
|
|
buf.read_string()
|
|
|
|
elif cmd == SVC_STUFFTEXT:
|
|
text = buf.read_string()
|
|
# capture "set varname value" server-info cvars
|
|
for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text):
|
|
state.cvars[m.group(1)] = m.group(2)
|
|
|
|
elif cmd == SVC_CENTERPRINT:
|
|
buf.read_string()
|
|
|
|
elif cmd == SVC_SETTING:
|
|
buf.skip(8) # 2 × int32
|
|
|
|
elif cmd in (SVC_MUZZLEFLASH, SVC_MUZZLEFLASH2):
|
|
buf.skip(3) # entity (short) + weapon (byte)
|
|
|
|
elif cmd == SVC_ZPACKET:
|
|
len_in = buf.read_ushort()
|
|
len_out = buf.read_ushort() # decompressed size hint
|
|
compressed = buf.read_bytes(len_in)
|
|
try:
|
|
decompressed = zlib.decompress(compressed)
|
|
inner = Buffer(decompressed)
|
|
process_messages(inner, state)
|
|
except zlib.error:
|
|
break # corrupt compressed data, skip packet
|
|
|
|
elif cmd in (SVC_FRAME, SVC_PLAYERINFO, SVC_PACKETENTITIES,
|
|
SVC_DELTAPACKETENTITIES, SVC_SPAWNBASELINE,
|
|
SVC_GAMESTATE, SVC_DOWNLOAD, SVC_TEMP_ENTITY,
|
|
SVC_INVENTORY, SVC_ZDOWNLOAD, SVC_SOUND):
|
|
# Frame data or too complex to skip — stop processing this packet
|
|
break
|
|
|
|
else:
|
|
# Unknown message type — stop
|
|
break
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_demo(path):
|
|
"""Read and parse a .dm2 demo file. Returns DemoState."""
|
|
state = DemoState()
|
|
|
|
with open(path, 'rb') as f:
|
|
data = f.read()
|
|
|
|
pos = 0
|
|
while pos < len(data):
|
|
if pos + 4 > len(data):
|
|
state.truncated = True
|
|
break
|
|
|
|
msglen = struct.unpack_from('<I', data, pos)[0]
|
|
pos += 4
|
|
|
|
if msglen == EOF_MARKER:
|
|
break # clean end of file marker
|
|
|
|
if msglen == 0:
|
|
continue
|
|
|
|
if pos + msglen > len(data):
|
|
# Truncated final packet — parse what we have
|
|
state.truncated = True
|
|
msglen = len(data) - pos
|
|
|
|
packet = data[pos:pos + msglen]
|
|
pos += msglen
|
|
|
|
try:
|
|
buf = Buffer(packet)
|
|
process_messages(buf, state)
|
|
except ParseError:
|
|
state.truncated = True
|
|
# Continue with next packet
|
|
|
|
if not state.had_disconnect:
|
|
state.truncated = True
|
|
|
|
return state
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layout string parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping',
|
|
'net', 'eff', 'fph', 'time', 'player', 'rank'}
|
|
|
|
|
|
def _is_header(text):
|
|
words = set(text.lower().split())
|
|
return bool(words & _HEADER_WORDS)
|
|
|
|
|
|
def _parse_layout_entries(layout):
|
|
"""Parse layout into list of (yv_position, string_content) tuples."""
|
|
entries = []
|
|
for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout):
|
|
entries.append((int(m.group(1)), m.group(2)))
|
|
return entries
|
|
|
|
|
|
def _parse_player_tdm(content):
|
|
"""
|
|
Parse a single OpenTDM player line (fixed-width format).
|
|
Format: "%-15.15s %4d %3d %3d %3d" → name, frags, deaths, net, ping
|
|
Returns dict or None.
|
|
"""
|
|
if len(content) < 20:
|
|
return None
|
|
name = content[:15].rstrip()
|
|
if not name or _is_header(name):
|
|
return None
|
|
nums = re.findall(r'-?\d+', content[15:])
|
|
if len(nums) < 2:
|
|
return None
|
|
try:
|
|
return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])}
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _parse_player_ffa(content):
|
|
"""
|
|
Parse a single OpenFFA player line (fixed-width format).
|
|
Format: "%2d %-15s %3d %3d %3d %4d %4s %4d" → rank, name, frags, deaths, eff, ...
|
|
Offsets: [0:2]=rank, [3:18]=name, [19:22]=frags, [23:26]=deaths, [27:30]=eff
|
|
Returns dict or None.
|
|
"""
|
|
if len(content) < 27:
|
|
return None
|
|
rank_s = content[0:2].strip()
|
|
if not rank_s.isdigit():
|
|
return None
|
|
name = content[3:18].rstrip() if len(content) >= 18 else content[3:].rstrip()
|
|
if not name:
|
|
return None
|
|
nums = re.findall(r'-?\d+', content[18:])
|
|
if len(nums) < 3:
|
|
return None
|
|
try:
|
|
return {
|
|
'rank': int(rank_s),
|
|
'name': name,
|
|
'frags': int(nums[0]),
|
|
'deaths': int(nums[1]),
|
|
'eff': int(nums[2]),
|
|
}
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_layout_tdm(layout):
|
|
"""
|
|
Parse an OpenTDM scoreboard layout string.
|
|
Returns (team_a_players, team_b_players, team_a_score, team_b_score).
|
|
Each player: {'name', 'frags', 'deaths'}.
|
|
|
|
OpenTDM layout places both teams side by side:
|
|
Column A: yv 56, 64, ... (between header A at yv 48 and header B)
|
|
Column B: yv 96, 104, ... (below header B at yv 88)
|
|
"""
|
|
entries = _parse_layout_entries(layout)
|
|
|
|
# Team scores appear at small yv positions (yv 8, yv 16)
|
|
# Format: "TeamName SCORE" — last token is the score
|
|
team_scores = []
|
|
for yv, content in sorted(entries, key=lambda x: x[0]):
|
|
if yv > 30 or _is_header(content) or not content.strip():
|
|
continue
|
|
parts = content.rsplit(None, 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
score = int(parts[1])
|
|
team_scores.append(score)
|
|
except ValueError:
|
|
pass
|
|
|
|
team_a_score = team_scores[0] if len(team_scores) > 0 else 0
|
|
team_b_score = team_scores[1] if len(team_scores) > 1 else 0
|
|
|
|
# Column header positions (string2 containing " Name " and "Frags")
|
|
header_yvs = sorted(
|
|
yv for yv, c in entries
|
|
if 'Name' in c and 'Frags' in c
|
|
)
|
|
|
|
# Team A: entries between first and second header yv
|
|
# Team B: entries below second header yv
|
|
split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999
|
|
first_header_yv = header_yvs[0] if header_yvs else 0
|
|
|
|
team_a_players = []
|
|
team_b_players = []
|
|
|
|
for yv, content in entries:
|
|
if _is_header(content) or not content.strip():
|
|
continue
|
|
p = _parse_player_tdm(content)
|
|
if p is None:
|
|
continue
|
|
if first_header_yv < yv <= split_yv:
|
|
team_a_players.append(p)
|
|
elif yv > split_yv:
|
|
team_b_players.append(p)
|
|
|
|
return team_a_players, team_b_players, team_a_score, team_b_score
|
|
|
|
|
|
def parse_layout_ffa(layout):
|
|
"""
|
|
Parse an OpenFFA/DM scoreboard layout string.
|
|
Returns list of {'rank', 'name', 'frags', 'deaths', 'eff'}.
|
|
"""
|
|
entries = _parse_layout_entries(layout)
|
|
players = []
|
|
|
|
for _yv, content in entries:
|
|
if _is_header(content):
|
|
continue
|
|
p = _parse_player_ffa(content)
|
|
if p:
|
|
players.append(p)
|
|
|
|
return players
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _cs_gen(state):
|
|
"""Return CS_GENERAL base index — old or new, detected from demo content."""
|
|
if any(state.configstrings.get(CS_GENERAL_OLD + i) for i in range(8)):
|
|
return CS_GENERAL_OLD
|
|
return CS_GENERAL
|
|
|
|
|
|
def _cs_skins(state):
|
|
"""Return CS_PLAYERSKINS base index — old or new."""
|
|
if any(state.configstrings.get(CS_PLAYERSKINS_OLD + i) for i in range(8)):
|
|
return CS_PLAYERSKINS_OLD
|
|
return CS_PLAYERSKINS
|
|
|
|
|
|
def is_opentdm(state):
|
|
cs_gen = _cs_gen(state)
|
|
return (
|
|
'opentdm' in state.gamedir.lower()
|
|
or state.configstrings.get(cs_gen + 0, '').strip() != ''
|
|
or state.configstrings.get(cs_gen + 1, '').strip() != ''
|
|
)
|
|
|
|
|
|
def get_team_names(state):
|
|
cs_gen = _cs_gen(state)
|
|
team_a = state.configstrings.get(cs_gen + 0, '').strip() or 'Team A'
|
|
team_b = state.configstrings.get(cs_gen + 1, '').strip() or 'Team B'
|
|
return team_a, team_b
|
|
|
|
|
|
def get_team_scores_from_cs(state):
|
|
"""Return (score_a, score_b) from CS_TDM_TEAM_A/B_STATUS configstrings."""
|
|
cs_gen = _cs_gen(state)
|
|
try:
|
|
score_a = int(state.configstrings.get(cs_gen + 2, '').strip())
|
|
except ValueError:
|
|
score_a = None
|
|
try:
|
|
score_b = int(state.configstrings.get(cs_gen + 3, '').strip())
|
|
except ValueError:
|
|
score_b = None
|
|
return score_a, score_b
|
|
|
|
|
|
def get_timelimit(state):
|
|
"""Return timelimit in minutes, or None if not found."""
|
|
# OpenTDM uses g_match_time (seconds); fallback to timelimit (minutes)
|
|
if 'g_match_time' in state.cvars:
|
|
try:
|
|
secs = float(state.cvars['g_match_time'])
|
|
if secs > 0:
|
|
return int(secs // 60)
|
|
except ValueError:
|
|
pass
|
|
if 'timelimit' in state.cvars:
|
|
try:
|
|
mins = float(state.cvars['timelimit'])
|
|
if mins > 0:
|
|
return int(mins)
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_player_names(state):
|
|
"""Return {slot_index: name} from CS_PLAYERSKINS configstrings."""
|
|
cs_base = _cs_skins(state)
|
|
players = {}
|
|
for i in range(256):
|
|
val = state.configstrings.get(cs_base + i, '').strip()
|
|
if val:
|
|
name = val.split('\\')[0].strip()
|
|
if name:
|
|
players[i] = name
|
|
return players
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Markdown report generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_WARN_TRUNCATED = (
|
|
"\n> ⚠ **Demo truncated** — the recording may have ended abruptly (e.g. system reset). "
|
|
"Stats are from the last available scoreboard.\n"
|
|
)
|
|
_WARN_NO_LAYOUT = (
|
|
"\n> ⚠ **Demo truncated** — no scoreboard found in file. "
|
|
"Deaths unavailable (`N/A`). Frags from last known frame.\n"
|
|
)
|
|
|
|
|
|
def _md_table(headers, rows):
|
|
lines = []
|
|
lines.append('| ' + ' | '.join(headers) + ' |')
|
|
lines.append('|' + '|'.join('-' * (len(h) + 2) for h in headers) + '|')
|
|
for row in rows:
|
|
lines.append('| ' + ' | '.join(str(c) for c in row) + ' |')
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def _meta_line(state):
|
|
"""Return a metadata line with map name and timelimit."""
|
|
mapname = state.mapname or 'unknown'
|
|
timelimit = get_timelimit(state)
|
|
tl_str = f"{timelimit} min" if timelimit else "N/A"
|
|
return f"**Map:** {mapname} | **Timelimit:** {tl_str}"
|
|
|
|
|
|
def generate_report_ffa(state, players, truncated, no_layout=False):
|
|
mod = state.gamedir or 'DM'
|
|
mapname = state.mapname or 'unknown'
|
|
lines = [f"# Demo: {mapname} — {mod}", "", _meta_line(state), ""]
|
|
|
|
if no_layout:
|
|
lines.append(_WARN_NO_LAYOUT)
|
|
elif truncated:
|
|
lines.append(_WARN_TRUNCATED)
|
|
|
|
if not players:
|
|
lines.append("*No player data found in demo file.*")
|
|
return '\n'.join(lines)
|
|
|
|
players.sort(key=lambda p: p.get('frags', 0), reverse=True)
|
|
has_eff = any('eff' in p for p in players)
|
|
|
|
if has_eff:
|
|
headers = ['#', 'Player', 'Frags', 'Deaths', 'Eff%']
|
|
rows = [
|
|
(p.get('rank', i + 1), p['name'],
|
|
p['frags'], p.get('deaths', 'N/A'), f"{p.get('eff', 'N/A')}%")
|
|
for i, p in enumerate(players)
|
|
]
|
|
else:
|
|
headers = ['#', 'Player', 'Frags', 'Deaths']
|
|
rows = [
|
|
(i + 1, p['name'], p['frags'], p.get('deaths', 'N/A'))
|
|
for i, p in enumerate(players)
|
|
]
|
|
|
|
lines.append(_md_table(headers, rows))
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def generate_report_tdm(state, team_a_name, team_b_name,
|
|
team_a_players, team_b_players,
|
|
team_a_score, team_b_score,
|
|
truncated):
|
|
mapname = state.mapname or 'unknown'
|
|
lines = [f"# Demo: {mapname} — OpenTDM", "", _meta_line(state), ""]
|
|
|
|
if truncated:
|
|
lines.append(_WARN_TRUNCATED)
|
|
|
|
lines.append("## Team scores")
|
|
lines.append("")
|
|
a_deaths = sum(p['deaths'] for p in team_a_players)
|
|
b_deaths = sum(p['deaths'] for p in team_b_players)
|
|
lines.append(_md_table(
|
|
['Team', 'Frags', 'Deaths'],
|
|
[
|
|
(team_a_name, team_a_score, a_deaths),
|
|
(team_b_name, team_b_score, b_deaths),
|
|
]
|
|
))
|
|
lines.append("")
|
|
|
|
for team_name, team_players in [
|
|
(team_a_name, team_a_players),
|
|
(team_b_name, team_b_players),
|
|
]:
|
|
lines.append(f"## {team_name}")
|
|
lines.append("")
|
|
if not team_players:
|
|
lines.append("*No data.*")
|
|
else:
|
|
team_players.sort(key=lambda p: p['frags'], reverse=True)
|
|
lines.append(_md_table(
|
|
['Player', 'Frags', 'Deaths'],
|
|
[(p['name'], p['frags'], p['deaths']) for p in team_players]
|
|
))
|
|
lines.append("")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Report builder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_report(state):
|
|
last_layout = state.layouts[-1] if state.layouts else None
|
|
truncated = state.truncated
|
|
no_layout = last_layout is None
|
|
|
|
if is_opentdm(state):
|
|
team_a_name, team_b_name = get_team_names(state)
|
|
if last_layout:
|
|
ta_p, tb_p, ta_score, tb_score = parse_layout_tdm(last_layout)
|
|
else:
|
|
ta_p, tb_p, ta_score, tb_score = [], [], 0, 0
|
|
# Prefer team scores from configstrings (more reliable than layout parsing)
|
|
cs_a, cs_b = get_team_scores_from_cs(state)
|
|
if cs_a is not None:
|
|
ta_score = cs_a
|
|
if cs_b is not None:
|
|
tb_score = cs_b
|
|
return generate_report_tdm(
|
|
state, team_a_name, team_b_name,
|
|
ta_p, tb_p, ta_score, tb_score,
|
|
truncated
|
|
)
|
|
else:
|
|
if last_layout:
|
|
players = parse_layout_ffa(last_layout)
|
|
if not players:
|
|
# Fallback: try TDM pattern (some mods use similar format)
|
|
tdm_all = parse_layout_tdm(last_layout)
|
|
players = [
|
|
{'name': p['name'], 'frags': p['frags'], 'deaths': p['deaths']}
|
|
for p in tdm_all[0] + tdm_all[1]
|
|
]
|
|
else:
|
|
# No layout — fall back to player names from configstrings, no deaths
|
|
player_names = get_player_names(state)
|
|
players = [{'name': name, 'frags': 0} for name in player_names.values()]
|
|
return generate_report_ffa(state, players, truncated, no_layout)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def process_file(path, output_path=None):
|
|
"""Parse a single demo file and write/return the report."""
|
|
state = parse_demo(path)
|
|
report = build_report(state)
|
|
if output_path:
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(report)
|
|
return None
|
|
return report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(
|
|
description='Q2Pro .dm2 demo parser — generates Markdown player stats reports.'
|
|
)
|
|
ap.add_argument('inputs', nargs='+',
|
|
help='.dm2 file(s) or a directory containing .dm2 files')
|
|
ap.add_argument('-o', '--output',
|
|
help='Output file (single input file only)')
|
|
ap.add_argument('-y', '--always-continue', action='store_true',
|
|
help='Always continue on errors without prompting')
|
|
args = ap.parse_args()
|
|
|
|
# Collect .dm2 files
|
|
files = []
|
|
for inp in args.inputs:
|
|
p = Path(inp)
|
|
if p.is_dir():
|
|
files.extend(sorted(p.glob('*.dm2')))
|
|
elif p.suffix.lower() == '.dm2':
|
|
if p.exists():
|
|
files.append(p)
|
|
else:
|
|
print(f"File not found: {inp}", file=sys.stderr)
|
|
else:
|
|
print(f"Skipping (not .dm2 or directory): {inp}", file=sys.stderr)
|
|
|
|
if not files:
|
|
print("No .dm2 files found.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if len(files) > 1 and args.output:
|
|
print("Error: -o/--output can only be used with a single input file.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
always_continue = args.always_continue
|
|
|
|
for demo_path in files:
|
|
try:
|
|
if len(files) == 1 and not args.output:
|
|
# Single file → stdout
|
|
report = process_file(demo_path)
|
|
print(report)
|
|
elif len(files) == 1 and args.output:
|
|
# Single file → specified output file
|
|
process_file(demo_path, args.output)
|
|
print(f"Report saved: {args.output}")
|
|
else:
|
|
# Batch mode → foo_report.md
|
|
out_path = demo_path.parent / (demo_path.stem + '_report.md')
|
|
process_file(demo_path, out_path)
|
|
print(f"OK {demo_path.name} -> {out_path.name}")
|
|
|
|
except Exception as exc:
|
|
print(f"\n[ERROR] {demo_path.name}: {exc}", file=sys.stderr)
|
|
|
|
if not always_continue and len(files) > 1:
|
|
try:
|
|
ans = input(
|
|
"Continue? [y]es / [n]o / [a]lways continue: "
|
|
).strip().lower()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nAborted.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if ans == 'n':
|
|
print("Aborted.", file=sys.stderr)
|
|
sys.exit(1)
|
|
elif ans == 'a':
|
|
always_continue = True
|
|
# 'y' or anything else → continue
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|