Browse Source

script

master
Przemyslaw Kadej 3 weeks ago
parent
commit
edcff1cd4f
  1. 748
      scripts/dem_parser.py

748
scripts/dem_parser.py

@ -0,0 +1,748 @@
#!/usr/bin/env python3
"""
Q2Pro demo (.dm2) parser - extracts player stats and generates Markdown reports.
Usage:
python dem_parser.py demo.dm2 # stdout
python dem_parser.py demo.dm2 -o out.md # output file
python dem_parser.py *.dm2 # batch - foo_report.md per file
python dem_parser.py /path/to/dir/ # batch - all .dm2 in directory
python dem_parser.py *.dm2 -y # batch, always continue on errors
"""
import struct
import re
import sys
import os
import zlib
import argparse
from pathlib import Path
# svc_ message types (inc/common/protocol.h)
SVC_MUZZLEFLASH = 1
SVC_MUZZLEFLASH2 = 2
SVC_TEMP_ENTITY = 3
SVC_LAYOUT = 4
SVC_INVENTORY = 5
SVC_NOP = 6
SVC_DISCONNECT = 7
SVC_RECONNECT = 8
SVC_SOUND = 9
SVC_PRINT = 10
SVC_STUFFTEXT = 11
SVC_SERVERDATA = 12
SVC_CONFIGSTRING = 13
SVC_SPAWNBASELINE = 14
SVC_CENTERPRINT = 15
SVC_DOWNLOAD = 16
SVC_PLAYERINFO = 17
SVC_PACKETENTITIES = 18
SVC_DELTAPACKETENTITIES = 19
SVC_FRAME = 20
SVC_ZPACKET = 21
SVC_ZDOWNLOAD = 22
SVC_GAMESTATE = 23
SVC_SETTING = 24
# Configstring indices - new protocol (34/36, inc/shared/shared.h)
CS_MAXCLIENTS = 60
CS_MODELS = 62
CS_SOUNDS = CS_MODELS + 8192
CS_IMAGES = CS_SOUNDS + 2048
CS_LIGHTS = CS_IMAGES + 2048
CS_ITEMS = CS_LIGHTS + 256
CS_PLAYERSKINS = CS_ITEMS + 256 # = 12862 — "name\model\skin"
CS_GENERAL = CS_PLAYERSKINS + 256 # = 13118
# OpenTDM configstring offsets (CS_GENERAL + N, g_local.h:139)
CS_TDM_TEAM_A_NAME = CS_GENERAL + 0
CS_TDM_TEAM_B_NAME = CS_GENERAL + 1
CS_TDM_TEAM_A_STATUS = CS_GENERAL + 2
CS_TDM_TEAM_B_STATUS = CS_GENERAL + 3
# Old protocol configstring indices (protocol 26, also used by opentdm/openffa)
CS_PLAYERSKINS_OLD = 1312
CS_GENERAL_OLD = 1568
STAT_FRAGS = 14 # g_local.h:65 (opentdm), q_shared.h:1029 (openffa)
EOF_MARKER = 0xFFFFFFFF
class ParseError(Exception):
pass
# ---------------------------------------------------------------------------
# Binary buffer reader
# ---------------------------------------------------------------------------
class Buffer:
def __init__(self, data):
self.data = data if isinstance(data, (bytes, bytearray)) else bytes(data)
self.pos = 0
def remaining(self):
return len(self.data) - self.pos
def read_byte(self):
if self.pos >= len(self.data):
raise ParseError("Unexpected end of buffer (byte)")
v = self.data[self.pos]
self.pos += 1
return v
def read_short(self):
if self.pos + 2 > len(self.data):
raise ParseError("Unexpected end of buffer (short)")
v = struct.unpack_from('<h', self.data, self.pos)[0]
self.pos += 2
return v
def read_ushort(self):
if self.pos + 2 > len(self.data):
raise ParseError("Unexpected end of buffer (ushort)")
v = struct.unpack_from('<H', self.data, self.pos)[0]
self.pos += 2
return v
def read_long(self):
if self.pos + 4 > len(self.data):
raise ParseError("Unexpected end of buffer (long)")
v = struct.unpack_from('<i', self.data, self.pos)[0]
self.pos += 4
return v
def read_string(self):
try:
end = self.data.index(b'\x00', self.pos)
except ValueError:
raise ParseError("Unterminated string in buffer")
s = self.data[self.pos:end].decode('latin-1', errors='replace')
self.pos = end + 1
return s
def skip(self, n):
if self.pos + n > len(self.data):
raise ParseError(f"Cannot skip {n} bytes ({self.remaining()} remaining)")
self.pos += n
def read_bytes(self, n):
if self.pos + n > len(self.data):
raise ParseError(f"Cannot read {n} bytes")
v = self.data[self.pos:self.pos + n]
self.pos += n
return v
# ---------------------------------------------------------------------------
# Demo state
# ---------------------------------------------------------------------------
class DemoState:
def __init__(self):
self.protocol = 34
self.gamedir = ''
self.mapname = ''
self.client_num = -1
self.configstrings = {}
self.cvars = {} # server cvars from svc_stufftext "set" commands
self.layouts = [] # all layout strings found
self.truncated = False # demo ended without svc_disconnect
self.had_disconnect = False
# ---------------------------------------------------------------------------
# Message processing
# ---------------------------------------------------------------------------
def process_messages(buf, state):
"""Process messages from buffer until svc_frame or unknown message."""
while buf.remaining() > 0:
cmd = buf.read_byte()
if cmd == SVC_NOP:
continue
elif cmd in (SVC_DISCONNECT, SVC_RECONNECT):
if cmd == SVC_DISCONNECT:
state.had_disconnect = True
break
elif cmd == SVC_SERVERDATA:
state.protocol = buf.read_long()
buf.read_long() # server count
buf.read_byte() # attract loop
state.gamedir = buf.read_string()
state.client_num = buf.read_short()
state.mapname = buf.read_string()
elif cmd == SVC_CONFIGSTRING:
index = buf.read_ushort()
value = buf.read_string()
state.configstrings[index] = value
elif cmd == SVC_LAYOUT:
layout = buf.read_string()
state.layouts.append(layout)
elif cmd == SVC_PRINT:
buf.read_byte() # print level
buf.read_string()
elif cmd == SVC_STUFFTEXT:
text = buf.read_string()
# capture "set varname value" server-info cvars
for m in re.finditer(r'\bset\s+(\S+)\s+(\S+)', text):
state.cvars[m.group(1)] = m.group(2)
elif cmd == SVC_CENTERPRINT:
buf.read_string()
elif cmd == SVC_SETTING:
buf.skip(8) # 2 × int32
elif cmd in (SVC_MUZZLEFLASH, SVC_MUZZLEFLASH2):
buf.skip(3) # entity (short) + weapon (byte)
elif cmd == SVC_ZPACKET:
len_in = buf.read_ushort()
len_out = buf.read_ushort() # decompressed size hint
compressed = buf.read_bytes(len_in)
try:
decompressed = zlib.decompress(compressed)
inner = Buffer(decompressed)
process_messages(inner, state)
except zlib.error:
break # corrupt compressed data, skip packet
elif cmd in (SVC_FRAME, SVC_PLAYERINFO, SVC_PACKETENTITIES,
SVC_DELTAPACKETENTITIES, SVC_SPAWNBASELINE,
SVC_GAMESTATE, SVC_DOWNLOAD, SVC_TEMP_ENTITY,
SVC_INVENTORY, SVC_ZDOWNLOAD, SVC_SOUND):
# Frame data or too complex to skip — stop processing this packet
break
else:
# Unknown message type — stop
break
# ---------------------------------------------------------------------------
# File parsing
# ---------------------------------------------------------------------------
def parse_demo(path):
"""Read and parse a .dm2 demo file. Returns DemoState."""
state = DemoState()
with open(path, 'rb') as f:
data = f.read()
pos = 0
while pos < len(data):
if pos + 4 > len(data):
state.truncated = True
break
msglen = struct.unpack_from('<I', data, pos)[0]
pos += 4
if msglen == EOF_MARKER:
break # clean end of file marker
if msglen == 0:
continue
if pos + msglen > len(data):
# Truncated final packet — parse what we have
state.truncated = True
msglen = len(data) - pos
packet = data[pos:pos + msglen]
pos += msglen
try:
buf = Buffer(packet)
process_messages(buf, state)
except ParseError:
state.truncated = True
# Continue with next packet
if not state.had_disconnect:
state.truncated = True
return state
# ---------------------------------------------------------------------------
# Layout string parsing
# ---------------------------------------------------------------------------
_HEADER_WORDS = {'name', 'frags', 'frag', 'dths', 'deaths', 'ping',
'net', 'eff', 'fph', 'time', 'player', 'rank'}
def _is_header(text):
words = set(text.lower().split())
return bool(words & _HEADER_WORDS)
def _parse_layout_entries(layout):
"""Parse layout into list of (yv_position, string_content) tuples."""
entries = []
for m in re.finditer(r'yv\s+(\d+)\s+string2?\s+"([^"]*)"', layout):
entries.append((int(m.group(1)), m.group(2)))
return entries
def _parse_player_tdm(content):
"""
Parse a single OpenTDM player line (fixed-width format).
Format: "%-15.15s %4d %3d %3d %3d" name, frags, deaths, net, ping
Returns dict or None.
"""
if len(content) < 20:
return None
name = content[:15].rstrip()
if not name or _is_header(name):
return None
nums = re.findall(r'-?\d+', content[15:])
if len(nums) < 2:
return None
try:
return {'name': name, 'frags': int(nums[0]), 'deaths': int(nums[1])}
except ValueError:
return None
def _parse_player_ffa(content):
"""
Parse a single OpenFFA player line (fixed-width format).
Format: "%2d %-15s %3d %3d %3d %4d %4s %4d" rank, name, frags, deaths, eff, ...
Offsets: [0:2]=rank, [3:18]=name, [19:22]=frags, [23:26]=deaths, [27:30]=eff
Returns dict or None.
"""
if len(content) < 27:
return None
rank_s = content[0:2].strip()
if not rank_s.isdigit():
return None
name = content[3:18].rstrip() if len(content) >= 18 else content[3:].rstrip()
if not name:
return None
nums = re.findall(r'-?\d+', content[18:])
if len(nums) < 3:
return None
try:
return {
'rank': int(rank_s),
'name': name,
'frags': int(nums[0]),
'deaths': int(nums[1]),
'eff': int(nums[2]),
}
except ValueError:
return None
def parse_layout_tdm(layout):
"""
Parse an OpenTDM scoreboard layout string.
Returns (team_a_players, team_b_players, team_a_score, team_b_score).
Each player: {'name', 'frags', 'deaths'}.
OpenTDM layout places both teams side by side:
Column A: yv 56, 64, ... (between header A at yv 48 and header B)
Column B: yv 96, 104, ... (below header B at yv 88)
"""
entries = _parse_layout_entries(layout)
# Team scores appear at small yv positions (yv 8, yv 16)
# Format: "TeamName SCORE" — last token is the score
team_scores = []
for yv, content in sorted(entries, key=lambda x: x[0]):
if yv > 30 or _is_header(content) or not content.strip():
continue
parts = content.rsplit(None, 1)
if len(parts) == 2:
try:
score = int(parts[1])
team_scores.append(score)
except ValueError:
pass
team_a_score = team_scores[0] if len(team_scores) > 0 else 0
team_b_score = team_scores[1] if len(team_scores) > 1 else 0
# Column header positions (string2 containing " Name " and "Frags")
header_yvs = sorted(
yv for yv, c in entries
if 'Name' in c and 'Frags' in c
)
# Team A: entries between first and second header yv
# Team B: entries below second header yv
split_yv = header_yvs[1] if len(header_yvs) >= 2 else 9999
first_header_yv = header_yvs[0] if header_yvs else 0
team_a_players = []
team_b_players = []
for yv, content in entries:
if _is_header(content) or not content.strip():
continue
p = _parse_player_tdm(content)
if p is None:
continue
if first_header_yv < yv <= split_yv:
team_a_players.append(p)
elif yv > split_yv:
team_b_players.append(p)
return team_a_players, team_b_players, team_a_score, team_b_score
def parse_layout_ffa(layout):
"""
Parse an OpenFFA/DM scoreboard layout string.
Returns list of {'rank', 'name', 'frags', 'deaths', 'eff'}.
"""
entries = _parse_layout_entries(layout)
players = []
for _yv, content in entries:
if _is_header(content):
continue
p = _parse_player_ffa(content)
if p:
players.append(p)
return players
# ---------------------------------------------------------------------------
# State helpers
# ---------------------------------------------------------------------------
def _cs_gen(state):
"""Return CS_GENERAL base index — old or new, detected from demo content."""
if any(state.configstrings.get(CS_GENERAL_OLD + i) for i in range(8)):
return CS_GENERAL_OLD
return CS_GENERAL
def _cs_skins(state):
"""Return CS_PLAYERSKINS base index — old or new."""
if any(state.configstrings.get(CS_PLAYERSKINS_OLD + i) for i in range(8)):
return CS_PLAYERSKINS_OLD
return CS_PLAYERSKINS
def is_opentdm(state):
cs_gen = _cs_gen(state)
return (
'opentdm' in state.gamedir.lower()
or state.configstrings.get(cs_gen + 0, '').strip() != ''
or state.configstrings.get(cs_gen + 1, '').strip() != ''
)
def get_team_names(state):
cs_gen = _cs_gen(state)
team_a = state.configstrings.get(cs_gen + 0, '').strip() or 'Team A'
team_b = state.configstrings.get(cs_gen + 1, '').strip() or 'Team B'
return team_a, team_b
def get_team_scores_from_cs(state):
"""Return (score_a, score_b) from CS_TDM_TEAM_A/B_STATUS configstrings."""
cs_gen = _cs_gen(state)
try:
score_a = int(state.configstrings.get(cs_gen + 2, '').strip())
except ValueError:
score_a = None
try:
score_b = int(state.configstrings.get(cs_gen + 3, '').strip())
except ValueError:
score_b = None
return score_a, score_b
def get_timelimit(state):
"""Return timelimit in minutes, or None if not found."""
# OpenTDM uses g_match_time (seconds); fallback to timelimit (minutes)
if 'g_match_time' in state.cvars:
try:
secs = float(state.cvars['g_match_time'])
if secs > 0:
return int(secs // 60)
except ValueError:
pass
if 'timelimit' in state.cvars:
try:
mins = float(state.cvars['timelimit'])
if mins > 0:
return int(mins)
except ValueError:
pass
return None
def get_player_names(state):
"""Return {slot_index: name} from CS_PLAYERSKINS configstrings."""
cs_base = _cs_skins(state)
players = {}
for i in range(256):
val = state.configstrings.get(cs_base + i, '').strip()
if val:
name = val.split('\\')[0].strip()
if name:
players[i] = name
return players
# ---------------------------------------------------------------------------
# Markdown report generation
# ---------------------------------------------------------------------------
_WARN_TRUNCATED = (
"\n> ⚠ **Demo truncated** — the recording may have ended abruptly (e.g. system reset). "
"Stats are from the last available scoreboard.\n"
)
_WARN_NO_LAYOUT = (
"\n> ⚠ **Demo truncated** — no scoreboard found in file. "
"Deaths unavailable (`N/A`). Frags from last known frame.\n"
)
def _md_table(headers, rows):
lines = []
lines.append('| ' + ' | '.join(headers) + ' |')
lines.append('|' + '|'.join('-' * (len(h) + 2) for h in headers) + '|')
for row in rows:
lines.append('| ' + ' | '.join(str(c) for c in row) + ' |')
return '\n'.join(lines)
def _meta_line(state):
"""Return a metadata line with map name and timelimit."""
mapname = state.mapname or 'unknown'
timelimit = get_timelimit(state)
tl_str = f"{timelimit} min" if timelimit else "N/A"
return f"**Map:** {mapname} | **Timelimit:** {tl_str}"
def generate_report_ffa(state, players, truncated, no_layout=False):
mod = state.gamedir or 'DM'
mapname = state.mapname or 'unknown'
lines = [f"# Demo: {mapname}{mod}", "", _meta_line(state), ""]
if no_layout:
lines.append(_WARN_NO_LAYOUT)
elif truncated:
lines.append(_WARN_TRUNCATED)
if not players:
lines.append("*No player data found in demo file.*")
return '\n'.join(lines)
players.sort(key=lambda p: p.get('frags', 0), reverse=True)
has_eff = any('eff' in p for p in players)
if has_eff:
headers = ['#', 'Player', 'Frags', 'Deaths', 'Eff%']
rows = [
(p.get('rank', i + 1), p['name'],
p['frags'], p.get('deaths', 'N/A'), f"{p.get('eff', 'N/A')}%")
for i, p in enumerate(players)
]
else:
headers = ['#', 'Player', 'Frags', 'Deaths']
rows = [
(i + 1, p['name'], p['frags'], p.get('deaths', 'N/A'))
for i, p in enumerate(players)
]
lines.append(_md_table(headers, rows))
return '\n'.join(lines)
def generate_report_tdm(state, team_a_name, team_b_name,
team_a_players, team_b_players,
team_a_score, team_b_score,
truncated):
mapname = state.mapname or 'unknown'
lines = [f"# Demo: {mapname} — OpenTDM", "", _meta_line(state), ""]
if truncated:
lines.append(_WARN_TRUNCATED)
lines.append("## Team scores")
lines.append("")
a_deaths = sum(p['deaths'] for p in team_a_players)
b_deaths = sum(p['deaths'] for p in team_b_players)
lines.append(_md_table(
['Team', 'Frags', 'Deaths'],
[
(team_a_name, team_a_score, a_deaths),
(team_b_name, team_b_score, b_deaths),
]
))
lines.append("")
for team_name, team_players in [
(team_a_name, team_a_players),
(team_b_name, team_b_players),
]:
lines.append(f"## {team_name}")
lines.append("")
if not team_players:
lines.append("*No data.*")
else:
team_players.sort(key=lambda p: p['frags'], reverse=True)
lines.append(_md_table(
['Player', 'Frags', 'Deaths'],
[(p['name'], p['frags'], p['deaths']) for p in team_players]
))
lines.append("")
return '\n'.join(lines)
# ---------------------------------------------------------------------------
# Report builder
# ---------------------------------------------------------------------------
def build_report(state):
last_layout = state.layouts[-1] if state.layouts else None
truncated = state.truncated
no_layout = last_layout is None
if is_opentdm(state):
team_a_name, team_b_name = get_team_names(state)
if last_layout:
ta_p, tb_p, ta_score, tb_score = parse_layout_tdm(last_layout)
else:
ta_p, tb_p, ta_score, tb_score = [], [], 0, 0
# Prefer team scores from configstrings (more reliable than layout parsing)
cs_a, cs_b = get_team_scores_from_cs(state)
if cs_a is not None:
ta_score = cs_a
if cs_b is not None:
tb_score = cs_b
return generate_report_tdm(
state, team_a_name, team_b_name,
ta_p, tb_p, ta_score, tb_score,
truncated
)
else:
if last_layout:
players = parse_layout_ffa(last_layout)
if not players:
# Fallback: try TDM pattern (some mods use similar format)
tdm_all = parse_layout_tdm(last_layout)
players = [
{'name': p['name'], 'frags': p['frags'], 'deaths': p['deaths']}
for p in tdm_all[0] + tdm_all[1]
]
else:
# No layout — fall back to player names from configstrings, no deaths
player_names = get_player_names(state)
players = [{'name': name, 'frags': 0} for name in player_names.values()]
return generate_report_ffa(state, players, truncated, no_layout)
# ---------------------------------------------------------------------------
# File processing
# ---------------------------------------------------------------------------
def process_file(path, output_path=None):
"""Parse a single demo file and write/return the report."""
state = parse_demo(path)
report = build_report(state)
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(report)
return None
return report
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(
description='Q2Pro .dm2 demo parser — generates Markdown player stats reports.'
)
ap.add_argument('inputs', nargs='+',
help='.dm2 file(s) or a directory containing .dm2 files')
ap.add_argument('-o', '--output',
help='Output file (single input file only)')
ap.add_argument('-y', '--always-continue', action='store_true',
help='Always continue on errors without prompting')
args = ap.parse_args()
# Collect .dm2 files
files = []
for inp in args.inputs:
p = Path(inp)
if p.is_dir():
files.extend(sorted(p.glob('*.dm2')))
elif p.suffix.lower() == '.dm2':
if p.exists():
files.append(p)
else:
print(f"File not found: {inp}", file=sys.stderr)
else:
print(f"Skipping (not .dm2 or directory): {inp}", file=sys.stderr)
if not files:
print("No .dm2 files found.", file=sys.stderr)
sys.exit(1)
if len(files) > 1 and args.output:
print("Error: -o/--output can only be used with a single input file.", file=sys.stderr)
sys.exit(1)
always_continue = args.always_continue
for demo_path in files:
try:
if len(files) == 1 and not args.output:
# Single file → stdout
report = process_file(demo_path)
print(report)
elif len(files) == 1 and args.output:
# Single file → specified output file
process_file(demo_path, args.output)
print(f"Report saved: {args.output}")
else:
# Batch mode → foo_report.md
out_path = demo_path.parent / (demo_path.stem + '_report.md')
process_file(demo_path, out_path)
print(f"OK {demo_path.name} -> {out_path.name}")
except Exception as exc:
print(f"\n[ERROR] {demo_path.name}: {exc}", file=sys.stderr)
if not always_continue and len(files) > 1:
try:
ans = input(
"Continue? [y]es / [n]o / [a]lways continue: "
).strip().lower()
except (EOFError, KeyboardInterrupt):
print("\nAborted.", file=sys.stderr)
sys.exit(1)
if ans == 'n':
print("Aborted.", file=sys.stderr)
sys.exit(1)
elif ans == 'a':
always_continue = True
# 'y' or anything else → continue
if __name__ == '__main__':
main()
Loading…
Cancel
Save