From de8adf76a90f81f7467318ff79a7aa278e9e3b7f Mon Sep 17 00:00:00 2001 From: Liang Jiaqing Date: Sat, 11 Apr 2026 15:55:35 +0800 Subject: [PATCH] feat: L4 session archiver + scheduler cron integration - Add compress_session.py: compress raw model_responses into L4 archives - Integrate 12h silent cron into scheduler.check() (runs before TASKS dir check) - Whitelist compress_session.py in .gitignore (archives excluded) - llmcore: refactor SSE warn handling, max_retries default 2->1 - scheduler: remove unused health_check(), INTERVAL 60->120 --- .gitignore | 5 + llmcore.py | 20 +- memory/L4_raw_sessions/compress_session.py | 246 +++++++++++++++++++++ reflect/scheduler.py | 64 ++---- 4 files changed, 276 insertions(+), 59 deletions(-) create mode 100644 memory/L4_raw_sessions/compress_session.py diff --git a/.gitignore b/.gitignore index 0371600..8d8c455 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,11 @@ memory/* !memory/autonomous_operation_sop/** !memory/scheduled_task_sop.md +# L4 session archiver (only the script, not archives) +!memory/L4_raw_sessions/ +memory/L4_raw_sessions/* +!memory/L4_raw_sessions/compress_session.py + # ljqCtrl related tools !memory/ljqCtrl.py !memory/ljqCtrl_sop.md diff --git a/llmcore.py b/llmcore.py index 4868922..d1a9413 100644 --- a/llmcore.py +++ b/llmcore.py @@ -121,7 +121,7 @@ class SiderLLMSession: def _parse_claude_sse(resp_lines): """Parse Anthropic SSE stream. Yields text chunks, returns list[content_block].""" content_blocks = []; current_block = None; tool_json_buf = "" - stop_reason = None; got_message_stop = False + stop_reason = None; got_message_stop = False; warn = None for line in resp_lines: if not line: continue line = line.decode('utf-8') if isinstance(line, bytes) else line @@ -167,15 +167,13 @@ def _parse_claude_sse(resp_lines): elif evt_type == "error": err = evt.get("error", {}) emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) - print(f"[SSE ERROR] {emsg}") - yield f"\n\n[SSE Error: {emsg}]" - break - if not got_message_stop and not stop_reason: - print("[WARN] SSE stream ended without message_stop - possible network interruption") - yield "\n\n[!!! 流异常中断,未收到完整响应 !!!]" - elif stop_reason == "max_tokens": - print(f"[WARN] Response truncated: max_tokens") - yield "\n\n[!!! Response truncated: max_tokens !!!]" + warn = f"\n\n[SSE Error: {emsg}]"; break + if not warn: + if not got_message_stop and not stop_reason: warn = "\n\n[!!! 流异常中断,未收到完整响应 !!!]" + elif stop_reason == "max_tokens": warn = "\n\n[!!! Response truncated: max_tokens !!!]" + if warn: + print(f"[WARN] {warn.strip()}") + content_blocks.append({"type": "text", "text": warn}); yield warn return content_blocks def _parse_openai_sse(resp_lines, api_mode="chat_completions"): @@ -432,7 +430,7 @@ class BaseSession: self.name = cfg.get('name', self.default_model) proxy = cfg.get('proxy') self.proxies = {"http": proxy, "https": proxy} if proxy else None - self.max_retries = max(0, int(cfg.get('max_retries', 2))) + self.max_retries = max(0, int(cfg.get('max_retries', 1))) self.connect_timeout = max(1, int(cfg.get('timeout', 5))) self.read_timeout = max(5, int(cfg.get('read_timeout', 30))) effort = cfg.get('reasoning_effort') diff --git a/memory/L4_raw_sessions/compress_session.py b/memory/L4_raw_sessions/compress_session.py new file mode 100644 index 0000000..093e052 --- /dev/null +++ b/memory/L4_raw_sessions/compress_session.py @@ -0,0 +1,246 @@ +"""L4 Session Log Processor — compress & extract history. +Format A (JSON): kept as-is. Format B (Raw): strip sys prompt & assistant echo. +""" +import re, os, json, ast +from datetime import datetime + +L4_DIR = os.path.dirname(os.path.abspath(__file__)) + +_RE_PROMPT = re.compile(r'^=== Prompt ===(?: (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))?', re.M) +_RE_RESPONSE = re.compile(r'^=== Response ===(?: (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))?', re.M) +_RE_USER = re.compile(r'^=== USER ===$', re.M) +_RE_ASST = re.compile(r'^=== ASSISTANT ===$', re.M) +_RE_ANY_MARKER = re.compile(r'^=== (?:Prompt|Response|USER|ASSISTANT) ===(?:.*)?$', re.M) + +def _ts_fmt(ts_str): + """'2026-04-03 20:13:06' → '0403_2013'""" + try: return datetime.strptime(ts_str.strip(), '%Y-%m-%d %H:%M:%S').strftime('%m%d_%H%M') + except Exception: return None + +def _detect_format(text): + """Detect A (json) vs B (raw) by checking content after first Prompt marker.""" + m = _RE_PROMPT.search(text) + if not m: return 'unknown' + return 'json' if re.match(r'\s*\{', text[m.end():m.end()+200]) else 'raw' + +def _parse_sections(text): + """Split text into (type, marker_line, body) tuples.""" + markers = list(_RE_ANY_MARKER.finditer(text)) + if not markers: + return [('preamble', '', text)] + _MAP = {'Prompt': 'prompt', 'Response': 'response', 'USER': 'user', 'ASSISTANT': 'assistant'} + sections = [] + if markers[0].start() > 0: + sections.append(('preamble', '', text[:markers[0].start()])) + for i, m in enumerate(markers): + line = m.group() + end = markers[i+1].start() if i+1 < len(markers) else len(text) + typ = next((v for k, v in _MAP.items() if line.startswith(f'=== {k}')), None) + if typ: + sections.append((typ, line, text[m.end():end])) + return sections + +def compress_session(src, dst_dir=None): + """Compress model_responses_xxx.txt → MMDD_HHMM-MMDD_HHMM.txt. Returns (dst, stats) or (None, reason).""" + dst_dir = dst_dir or L4_DIR + with open(src, 'r', encoding='utf-8', errors='replace') as f: + text = f.read() + timestamps = [m.group(1) for m in _RE_PROMPT.finditer(text) if m.group(1)] + if not timestamps: # fallback to Response timestamps + timestamps = [m.group(1) for m in _RE_RESPONSE.finditer(text) if m.group(1)] + if not timestamps: + return None, 'no timestamps found' + ts_first, ts_last = _ts_fmt(timestamps[0]), _ts_fmt(timestamps[-1]) + if not ts_first: + return None, 'bad timestamp format' + name = f"{ts_first}-{ts_last or ts_first}.txt" + fmt = _detect_format(text) + compressed = _compress_raw(text) if fmt == 'raw' else text + if len(compressed.encode('utf-8')) < 4500: + return None, f'too small after compress ({len(compressed)}B)' + dst = os.path.join(dst_dir, name) + with open(dst, 'w', encoding='utf-8', newline='') as f: + f.write(compressed) + orig_kb, new_kb = os.path.getsize(src) // 1024, os.path.getsize(dst) // 1024 + ratio = (1 - new_kb / max(orig_kb, 1)) * 100 + return dst, {'src': os.path.basename(src), 'dst': name, 'fmt': fmt, + 'orig_kb': orig_kb, 'new_kb': new_kb, 'ratio': f'{ratio:.0f}%', + 'year': timestamps[0][:4]} + +def _compress_raw(text): + """Format B: strip system prompt (Prompt→USER) and assistant echo (ASSISTANT→Response).""" + sections = _parse_sections(text) + out = [] + for i, (typ, line, body) in enumerate(sections): + if typ == 'prompt': + out.append(line + '\n') + if not (i+1 < len(sections) and sections[i+1][0] == 'user'): + out.append(body) # no USER follows → keep body + elif typ in ('user', 'response'): + out.append(line + '\n') + out.append(body) + elif typ == 'preamble': + out.append(body) + # assistant → skip (redundant echo) + return ''.join(out) + +_RE_HISTORY = re.compile(r'(.*?)', re.S) + +def _parse_history_block(raw): + """Parse block into ['[USER]...', '[Agent]...'] lines.""" + lines = [l.strip() for l in raw.split('\n') if l.strip()] + parsed = [l for l in lines if l.startswith('[USER]') or l.startswith('[Agent]')] + if len(parsed) >= 2: + return parsed + # JSON format: literal \\n separators + joined = raw.strip() + if '\\n[USER]' in joined or '\\n[Agent]' in joined: + parts = joined.replace('\\n', '\n').split('\n') + parsed = [p.strip() for p in parts if p.strip() and (p.strip().startswith('[USER]') or p.strip().startswith('[Agent]'))] + if parsed: return parsed + return parsed or [] + +def _merge_history_blocks(all_blocks): + """Merge sliding-window history blocks into one deduplicated list.""" + if not all_blocks: return [] + acc = list(all_blocks[0]) + for block in all_blocks[1:]: + if not block: continue + if not acc: + acc = list(block); continue + best = 0 + for k in range(1, min(len(acc), len(block)) + 1): + if acc[-k:] == block[:k]: best = k + if best > 0: + acc.extend(block[best:]) + elif block[0] in acc: + idx = len(acc) - 1 - acc[::-1].index(block[0]) + match_len = 0 + for j in range(min(len(block), len(acc) - idx)): + if acc[idx + j] == block[j]: match_len = j + 1 + else: break + acc.extend(block[match_len:]) + else: + acc.extend(block) + return acc + +def extract_history(src, session_name=None): + """Extract [USER]/[Agent] history from session file.""" + with open(src, 'r', encoding='utf-8', errors='replace') as f: + text = f.read() + if session_name is None: + session_name = os.path.splitext(os.path.basename(src))[0] + all_blocks = [parsed for m in _RE_HISTORY.finditer(text) + if (parsed := _parse_history_block(m.group(1)))] + if all_blocks: + return _merge_history_blocks(all_blocks) + return [] + +def format_history_block(session_name, history_lines): + """Format history lines into all_histories.txt block format.""" + sep = '=' * 60 + return f"{sep}\nSESSION: {session_name}\n{sep}\n" + '\n'.join(history_lines) + '\n' + +import tempfile, shutil, zipfile, glob +from collections import defaultdict + +def _existing_sessions(l4_dir): + """Read session names already in all_histories.txt.""" + hist_path = os.path.join(l4_dir, 'all_histories.txt') + if not os.path.exists(hist_path): return set() + with open(hist_path, 'r', encoding='utf-8') as f: + return {l.strip().replace('SESSION: ', '') for l in f if l.startswith('SESSION: ')} + +def batch_process(src, l4_dir=None, dry_run=True): + """Batch compress + extract history + archive. dry_run=True is safe default.""" + l4_dir = os.path.normpath(l4_dir or L4_DIR) + raw_files = sorted(src) if isinstance(src, (list, tuple)) else \ + sorted(glob.glob(os.path.join(src, 'model_responses_*.txt'))) + if not raw_files: + print("No raw files found"); return {'processed': 0, 'skipped': 0, 'errors': 0, 'new_sessions': 0} + + existing = _existing_sessions(l4_dir) + print(f"Found {len(raw_files)} raw, {len(existing)} existing in L4") + + tmp_dir = tempfile.mkdtemp(prefix='cs_batch_') + results, skipped, errors = [], [], [] + + import time + cutoff = time.time() - 7200 # skip files modified within 2h + + # Phase 1: Compress + Extract (to temp dir) + for fp in raw_files: + fname = os.path.basename(fp) + if os.path.getmtime(fp) > cutoff: + skipped.append((fname, 'recent(<2h)')); continue + try: + dst, info = compress_session(fp, tmp_dir) + if dst is None: + skipped.append((fname, info)); continue + sn = os.path.splitext(os.path.basename(dst))[0] + if sn in existing: + skipped.append((fname, f'dup:{sn}')); os.remove(dst); continue + results.append((sn, dst, extract_history(dst), info, fp)) + except Exception as e: + errors.append((fname, str(e))) + results.sort(key=lambda x: x[0]) + + print(f"\nP1: {len(results)} new, {len(skipped)} skip, {len(errors)} err") + for f, r in skipped[:5]: print(f" SKIP {f}: {r}") + for f, e in errors[:5]: print(f" ERR {f}: {e}") + if results: print(f" Range: {results[0][0]} → {results[-1][0]}") + + if dry_run: + print("\n[DRY RUN] Pass dry_run=False to execute.") + shutil.rmtree(tmp_dir, ignore_errors=True) + return {'processed': len(results), 'skipped': len(skipped), + 'errors': len(errors), 'new_sessions': len(results), + 'sessions': [r[0] for r in results]} + + # Phase 2: Append history + with open(os.path.join(l4_dir, 'all_histories.txt'), 'a', encoding='utf-8') as f: + for sn, _, hist, _, _ in results: + if hist: f.write('\n' + format_history_block(sn, hist)) + print(f"Appended {len(results)} sessions to all_histories.txt") + + # Phase 3: Archive to monthly zips + by_month = defaultdict(list) + for sn, cpath, _, info, _ in results: + year = info.get('year', '2026') if isinstance(info, dict) else '2026' + by_month[f"{year}-{sn[:2]}"].append((sn, cpath)) + for mk, items in sorted(by_month.items()): + zpath = os.path.join(l4_dir, f"{mk}.zip") + mode = 'a' if os.path.exists(zpath) else 'w' + with zipfile.ZipFile(zpath, mode, zipfile.ZIP_DEFLATED) as zf: + names = set(zf.namelist()) if mode == 'a' else set() + for sn, cp in items: + if f"{sn}.txt" not in names: zf.write(cp, f"{sn}.txt") + print(f" {mk}.zip: +{len(items)}") + + # Phase 4: Delete raw files + to_del = [rp for *_, rp in results] + for fname, _ in skipped: + m = [f for f in raw_files if os.path.basename(f) == fname] + if m: to_del.append(m[0]) + deleted = 0 + for rp in to_del: + try: os.remove(rp); deleted += 1 + except Exception: pass + print(f"Deleted {deleted}/{len(to_del)} raw files") + + shutil.rmtree(tmp_dir, ignore_errors=True) + report = {'processed': len(results), 'skipped': len(skipped), + 'errors': len(errors), 'new_sessions': len(results), 'deleted_raw': deleted} + print(f"\nDone: {report}") + return report + +# ── CLI ── +RAW_DIR = os.path.join(os.path.dirname(os.path.dirname(L4_DIR)), 'temp', 'model_responses') + +if __name__ == '__main__': + import argparse + ap = argparse.ArgumentParser(description='L4 session archiver') + ap.add_argument('src', nargs='?', default=RAW_DIR, help='raw files dir') + ap.add_argument('--run', action='store_true', help='actually execute (default: dry run)') + args = ap.parse_args() + batch_process(args.src, dry_run=not args.run) \ No newline at end of file diff --git a/reflect/scheduler.py b/reflect/scheduler.py index 3082708..f5fcd9f 100644 --- a/reflect/scheduler.py +++ b/reflect/scheduler.py @@ -1,4 +1,4 @@ -import os, json, socket as _socket, logging +import os, json, time as _time, socket as _socket, logging from datetime import datetime, timedelta # 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出 @@ -8,7 +8,7 @@ except NameError: _lock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) _lock.bind(('127.0.0.1', 45762)); _lock.listen(1) -INTERVAL = 60 +INTERVAL = 120 ONCE = False _dir = os.path.dirname(os.path.abspath(__file__)) @@ -27,6 +27,7 @@ if not _logger.handlers: # 默认最大延迟窗口(小时),超过此时间不触发 DEFAULT_MAX_DELAY = 6 +_l4_t = 0 # last L4 archive time def _parse_cooldown(repeat): """解析repeat为冷却时间(比实际周期略短,防漂移)""" @@ -56,6 +57,18 @@ def _last_run(tid, done_files): return latest def check(): + # L4 archive cron (silent, every 12h) + global _l4_t + if _time.time() - _l4_t > 43200: + _l4_t = _time.time() + try: + import sys; sys.path.insert(0, os.path.join(_dir, '../memory/L4_raw_sessions')) + from compress_session import batch_process + r = batch_process(dry_run=False) + print(f'[L4 cron] {r}') + except Exception as e: + _logger.error(f'L4 archive failed: {e}') + if not os.path.isdir(TASKS): return None now = datetime.now() os.makedirs(DONE, exist_ok=True) @@ -109,50 +122,5 @@ def check(): f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n' f'{prompt}\n\n' f'完成后将执行报告写入 {rpt}。') - return None -def health_check(): - """检查所有定时任务的健康状态,返回结构化报告""" - if not os.path.isdir(TASKS): - return {'error': 'TASKS directory not found'} - now = datetime.now() - os.makedirs(DONE, exist_ok=True) - done_files = set(os.listdir(DONE)) - results = [] - for f in sorted(os.listdir(TASKS)): - if not f.endswith('.json'): continue - tid = f[:-5] - try: - task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read()) - except Exception as e: - results.append({'task': tid, 'status': 'ERROR', 'detail': f'JSON parse: {e}'}) - continue - - enabled = task.get('enabled', False) - repeat = task.get('repeat', 'daily') - sched = task.get('schedule', '00:00') - last = _last_run(tid, done_files) - cooldown = _parse_cooldown(repeat) - - # 判断健康状态 - if not enabled: - status = 'DISABLED' - elif last is None: - status = 'NEVER_RUN' - elif repeat == 'once': - status = 'COMPLETED' if last else 'PENDING' - else: - # 检查是否超过预期间隔的1.5倍 - expected_gap = cooldown * 1.25 # 略大于冷却时间 - if (now - last) > expected_gap: - status = 'OVERDUE' - else: - status = 'HEALTHY' - - results.append({ - 'task': tid, 'status': status, 'enabled': enabled, - 'repeat': repeat, 'schedule': sched, - 'last_run': last.strftime('%Y-%m-%d %H:%M') if last else None, - 'cooldown_hours': cooldown.total_seconds() / 3600, - }) - return results + return None