diff --git a/.gitignore b/.gitignore
index 0371600..8d8c455 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,6 +43,11 @@ memory/*
!memory/autonomous_operation_sop/**
!memory/scheduled_task_sop.md
+# L4 session archiver (only the script, not archives)
+!memory/L4_raw_sessions/
+memory/L4_raw_sessions/*
+!memory/L4_raw_sessions/compress_session.py
+
# ljqCtrl related tools
!memory/ljqCtrl.py
!memory/ljqCtrl_sop.md
diff --git a/llmcore.py b/llmcore.py
index 4868922..d1a9413 100644
--- a/llmcore.py
+++ b/llmcore.py
@@ -121,7 +121,7 @@ class SiderLLMSession:
def _parse_claude_sse(resp_lines):
"""Parse Anthropic SSE stream. Yields text chunks, returns list[content_block]."""
content_blocks = []; current_block = None; tool_json_buf = ""
- stop_reason = None; got_message_stop = False
+ stop_reason = None; got_message_stop = False; warn = None
for line in resp_lines:
if not line: continue
line = line.decode('utf-8') if isinstance(line, bytes) else line
@@ -167,15 +167,13 @@ def _parse_claude_sse(resp_lines):
elif evt_type == "error":
err = evt.get("error", {})
emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
- print(f"[SSE ERROR] {emsg}")
- yield f"\n\n[SSE Error: {emsg}]"
- break
- if not got_message_stop and not stop_reason:
- print("[WARN] SSE stream ended without message_stop - possible network interruption")
- yield "\n\n[!!! 流异常中断,未收到完整响应 !!!]"
- elif stop_reason == "max_tokens":
- print(f"[WARN] Response truncated: max_tokens")
- yield "\n\n[!!! Response truncated: max_tokens !!!]"
+ warn = f"\n\n[SSE Error: {emsg}]"; break
+ if not warn:
+ if not got_message_stop and not stop_reason: warn = "\n\n[!!! 流异常中断,未收到完整响应 !!!]"
+ elif stop_reason == "max_tokens": warn = "\n\n[!!! Response truncated: max_tokens !!!]"
+ if warn:
+ print(f"[WARN] {warn.strip()}")
+ content_blocks.append({"type": "text", "text": warn}); yield warn
return content_blocks
def _parse_openai_sse(resp_lines, api_mode="chat_completions"):
@@ -432,7 +430,7 @@ class BaseSession:
self.name = cfg.get('name', self.default_model)
proxy = cfg.get('proxy')
self.proxies = {"http": proxy, "https": proxy} if proxy else None
- self.max_retries = max(0, int(cfg.get('max_retries', 2)))
+ self.max_retries = max(0, int(cfg.get('max_retries', 1)))
self.connect_timeout = max(1, int(cfg.get('timeout', 5)))
self.read_timeout = max(5, int(cfg.get('read_timeout', 30)))
effort = cfg.get('reasoning_effort')
diff --git a/memory/L4_raw_sessions/compress_session.py b/memory/L4_raw_sessions/compress_session.py
new file mode 100644
index 0000000..093e052
--- /dev/null
+++ b/memory/L4_raw_sessions/compress_session.py
@@ -0,0 +1,246 @@
+"""L4 Session Log Processor — compress & extract history.
+Format A (JSON): kept as-is. Format B (Raw): strip sys prompt & assistant echo.
+"""
+import re, os, json, ast
+from datetime import datetime
+
+L4_DIR = os.path.dirname(os.path.abspath(__file__))
+
+_RE_PROMPT = re.compile(r'^=== Prompt ===(?: (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))?', re.M)
+_RE_RESPONSE = re.compile(r'^=== Response ===(?: (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))?', re.M)
+_RE_USER = re.compile(r'^=== USER ===$', re.M)
+_RE_ASST = re.compile(r'^=== ASSISTANT ===$', re.M)
+_RE_ANY_MARKER = re.compile(r'^=== (?:Prompt|Response|USER|ASSISTANT) ===(?:.*)?$', re.M)
+
+def _ts_fmt(ts_str):
+ """'2026-04-03 20:13:06' → '0403_2013'"""
+ try: return datetime.strptime(ts_str.strip(), '%Y-%m-%d %H:%M:%S').strftime('%m%d_%H%M')
+ except Exception: return None
+
+def _detect_format(text):
+ """Detect A (json) vs B (raw) by checking content after first Prompt marker."""
+ m = _RE_PROMPT.search(text)
+ if not m: return 'unknown'
+ return 'json' if re.match(r'\s*\{', text[m.end():m.end()+200]) else 'raw'
+
+def _parse_sections(text):
+ """Split text into (type, marker_line, body) tuples."""
+ markers = list(_RE_ANY_MARKER.finditer(text))
+ if not markers:
+ return [('preamble', '', text)]
+ _MAP = {'Prompt': 'prompt', 'Response': 'response', 'USER': 'user', 'ASSISTANT': 'assistant'}
+ sections = []
+ if markers[0].start() > 0:
+ sections.append(('preamble', '', text[:markers[0].start()]))
+ for i, m in enumerate(markers):
+ line = m.group()
+ end = markers[i+1].start() if i+1 < len(markers) else len(text)
+ typ = next((v for k, v in _MAP.items() if line.startswith(f'=== {k}')), None)
+ if typ:
+ sections.append((typ, line, text[m.end():end]))
+ return sections
+
+def compress_session(src, dst_dir=None):
+ """Compress model_responses_xxx.txt → MMDD_HHMM-MMDD_HHMM.txt. Returns (dst, stats) or (None, reason)."""
+ dst_dir = dst_dir or L4_DIR
+ with open(src, 'r', encoding='utf-8', errors='replace') as f:
+ text = f.read()
+ timestamps = [m.group(1) for m in _RE_PROMPT.finditer(text) if m.group(1)]
+ if not timestamps: # fallback to Response timestamps
+ timestamps = [m.group(1) for m in _RE_RESPONSE.finditer(text) if m.group(1)]
+ if not timestamps:
+ return None, 'no timestamps found'
+ ts_first, ts_last = _ts_fmt(timestamps[0]), _ts_fmt(timestamps[-1])
+ if not ts_first:
+ return None, 'bad timestamp format'
+ name = f"{ts_first}-{ts_last or ts_first}.txt"
+ fmt = _detect_format(text)
+ compressed = _compress_raw(text) if fmt == 'raw' else text
+ if len(compressed.encode('utf-8')) < 4500:
+ return None, f'too small after compress ({len(compressed)}B)'
+ dst = os.path.join(dst_dir, name)
+ with open(dst, 'w', encoding='utf-8', newline='') as f:
+ f.write(compressed)
+ orig_kb, new_kb = os.path.getsize(src) // 1024, os.path.getsize(dst) // 1024
+ ratio = (1 - new_kb / max(orig_kb, 1)) * 100
+ return dst, {'src': os.path.basename(src), 'dst': name, 'fmt': fmt,
+ 'orig_kb': orig_kb, 'new_kb': new_kb, 'ratio': f'{ratio:.0f}%',
+ 'year': timestamps[0][:4]}
+
+def _compress_raw(text):
+ """Format B: strip system prompt (Prompt→USER) and assistant echo (ASSISTANT→Response)."""
+ sections = _parse_sections(text)
+ out = []
+ for i, (typ, line, body) in enumerate(sections):
+ if typ == 'prompt':
+ out.append(line + '\n')
+ if not (i+1 < len(sections) and sections[i+1][0] == 'user'):
+ out.append(body) # no USER follows → keep body
+ elif typ in ('user', 'response'):
+ out.append(line + '\n')
+ out.append(body)
+ elif typ == 'preamble':
+ out.append(body)
+ # assistant → skip (redundant echo)
+ return ''.join(out)
+
+_RE_HISTORY = re.compile(r'(.*?)', re.S)
+
+def _parse_history_block(raw):
+ """Parse block into ['[USER]...', '[Agent]...'] lines."""
+ lines = [l.strip() for l in raw.split('\n') if l.strip()]
+ parsed = [l for l in lines if l.startswith('[USER]') or l.startswith('[Agent]')]
+ if len(parsed) >= 2:
+ return parsed
+ # JSON format: literal \\n separators
+ joined = raw.strip()
+ if '\\n[USER]' in joined or '\\n[Agent]' in joined:
+ parts = joined.replace('\\n', '\n').split('\n')
+ parsed = [p.strip() for p in parts if p.strip() and (p.strip().startswith('[USER]') or p.strip().startswith('[Agent]'))]
+ if parsed: return parsed
+ return parsed or []
+
+def _merge_history_blocks(all_blocks):
+ """Merge sliding-window history blocks into one deduplicated list."""
+ if not all_blocks: return []
+ acc = list(all_blocks[0])
+ for block in all_blocks[1:]:
+ if not block: continue
+ if not acc:
+ acc = list(block); continue
+ best = 0
+ for k in range(1, min(len(acc), len(block)) + 1):
+ if acc[-k:] == block[:k]: best = k
+ if best > 0:
+ acc.extend(block[best:])
+ elif block[0] in acc:
+ idx = len(acc) - 1 - acc[::-1].index(block[0])
+ match_len = 0
+ for j in range(min(len(block), len(acc) - idx)):
+ if acc[idx + j] == block[j]: match_len = j + 1
+ else: break
+ acc.extend(block[match_len:])
+ else:
+ acc.extend(block)
+ return acc
+
+def extract_history(src, session_name=None):
+ """Extract [USER]/[Agent] history from session file."""
+ with open(src, 'r', encoding='utf-8', errors='replace') as f:
+ text = f.read()
+ if session_name is None:
+ session_name = os.path.splitext(os.path.basename(src))[0]
+ all_blocks = [parsed for m in _RE_HISTORY.finditer(text)
+ if (parsed := _parse_history_block(m.group(1)))]
+ if all_blocks:
+ return _merge_history_blocks(all_blocks)
+ return []
+
+def format_history_block(session_name, history_lines):
+ """Format history lines into all_histories.txt block format."""
+ sep = '=' * 60
+ return f"{sep}\nSESSION: {session_name}\n{sep}\n" + '\n'.join(history_lines) + '\n'
+
+import tempfile, shutil, zipfile, glob
+from collections import defaultdict
+
+def _existing_sessions(l4_dir):
+ """Read session names already in all_histories.txt."""
+ hist_path = os.path.join(l4_dir, 'all_histories.txt')
+ if not os.path.exists(hist_path): return set()
+ with open(hist_path, 'r', encoding='utf-8') as f:
+ return {l.strip().replace('SESSION: ', '') for l in f if l.startswith('SESSION: ')}
+
+def batch_process(src, l4_dir=None, dry_run=True):
+ """Batch compress + extract history + archive. dry_run=True is safe default."""
+ l4_dir = os.path.normpath(l4_dir or L4_DIR)
+ raw_files = sorted(src) if isinstance(src, (list, tuple)) else \
+ sorted(glob.glob(os.path.join(src, 'model_responses_*.txt')))
+ if not raw_files:
+ print("No raw files found"); return {'processed': 0, 'skipped': 0, 'errors': 0, 'new_sessions': 0}
+
+ existing = _existing_sessions(l4_dir)
+ print(f"Found {len(raw_files)} raw, {len(existing)} existing in L4")
+
+ tmp_dir = tempfile.mkdtemp(prefix='cs_batch_')
+ results, skipped, errors = [], [], []
+
+ import time
+ cutoff = time.time() - 7200 # skip files modified within 2h
+
+ # Phase 1: Compress + Extract (to temp dir)
+ for fp in raw_files:
+ fname = os.path.basename(fp)
+ if os.path.getmtime(fp) > cutoff:
+ skipped.append((fname, 'recent(<2h)')); continue
+ try:
+ dst, info = compress_session(fp, tmp_dir)
+ if dst is None:
+ skipped.append((fname, info)); continue
+ sn = os.path.splitext(os.path.basename(dst))[0]
+ if sn in existing:
+ skipped.append((fname, f'dup:{sn}')); os.remove(dst); continue
+ results.append((sn, dst, extract_history(dst), info, fp))
+ except Exception as e:
+ errors.append((fname, str(e)))
+ results.sort(key=lambda x: x[0])
+
+ print(f"\nP1: {len(results)} new, {len(skipped)} skip, {len(errors)} err")
+ for f, r in skipped[:5]: print(f" SKIP {f}: {r}")
+ for f, e in errors[:5]: print(f" ERR {f}: {e}")
+ if results: print(f" Range: {results[0][0]} → {results[-1][0]}")
+
+ if dry_run:
+ print("\n[DRY RUN] Pass dry_run=False to execute.")
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+ return {'processed': len(results), 'skipped': len(skipped),
+ 'errors': len(errors), 'new_sessions': len(results),
+ 'sessions': [r[0] for r in results]}
+
+ # Phase 2: Append history
+ with open(os.path.join(l4_dir, 'all_histories.txt'), 'a', encoding='utf-8') as f:
+ for sn, _, hist, _, _ in results:
+ if hist: f.write('\n' + format_history_block(sn, hist))
+ print(f"Appended {len(results)} sessions to all_histories.txt")
+
+ # Phase 3: Archive to monthly zips
+ by_month = defaultdict(list)
+ for sn, cpath, _, info, _ in results:
+ year = info.get('year', '2026') if isinstance(info, dict) else '2026'
+ by_month[f"{year}-{sn[:2]}"].append((sn, cpath))
+ for mk, items in sorted(by_month.items()):
+ zpath = os.path.join(l4_dir, f"{mk}.zip")
+ mode = 'a' if os.path.exists(zpath) else 'w'
+ with zipfile.ZipFile(zpath, mode, zipfile.ZIP_DEFLATED) as zf:
+ names = set(zf.namelist()) if mode == 'a' else set()
+ for sn, cp in items:
+ if f"{sn}.txt" not in names: zf.write(cp, f"{sn}.txt")
+ print(f" {mk}.zip: +{len(items)}")
+
+ # Phase 4: Delete raw files
+ to_del = [rp for *_, rp in results]
+ for fname, _ in skipped:
+ m = [f for f in raw_files if os.path.basename(f) == fname]
+ if m: to_del.append(m[0])
+ deleted = 0
+ for rp in to_del:
+ try: os.remove(rp); deleted += 1
+ except Exception: pass
+ print(f"Deleted {deleted}/{len(to_del)} raw files")
+
+ shutil.rmtree(tmp_dir, ignore_errors=True)
+ report = {'processed': len(results), 'skipped': len(skipped),
+ 'errors': len(errors), 'new_sessions': len(results), 'deleted_raw': deleted}
+ print(f"\nDone: {report}")
+ return report
+
+# ── CLI ──
+RAW_DIR = os.path.join(os.path.dirname(os.path.dirname(L4_DIR)), 'temp', 'model_responses')
+
+if __name__ == '__main__':
+ import argparse
+ ap = argparse.ArgumentParser(description='L4 session archiver')
+ ap.add_argument('src', nargs='?', default=RAW_DIR, help='raw files dir')
+ ap.add_argument('--run', action='store_true', help='actually execute (default: dry run)')
+ args = ap.parse_args()
+ batch_process(args.src, dry_run=not args.run)
\ No newline at end of file
diff --git a/reflect/scheduler.py b/reflect/scheduler.py
index 3082708..f5fcd9f 100644
--- a/reflect/scheduler.py
+++ b/reflect/scheduler.py
@@ -1,4 +1,4 @@
-import os, json, socket as _socket, logging
+import os, json, time as _time, socket as _socket, logging
from datetime import datetime, timedelta
# 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出
@@ -8,7 +8,7 @@ except NameError:
_lock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
_lock.bind(('127.0.0.1', 45762)); _lock.listen(1)
-INTERVAL = 60
+INTERVAL = 120
ONCE = False
_dir = os.path.dirname(os.path.abspath(__file__))
@@ -27,6 +27,7 @@ if not _logger.handlers:
# 默认最大延迟窗口(小时),超过此时间不触发
DEFAULT_MAX_DELAY = 6
+_l4_t = 0 # last L4 archive time
def _parse_cooldown(repeat):
"""解析repeat为冷却时间(比实际周期略短,防漂移)"""
@@ -56,6 +57,18 @@ def _last_run(tid, done_files):
return latest
def check():
+ # L4 archive cron (silent, every 12h)
+ global _l4_t
+ if _time.time() - _l4_t > 43200:
+ _l4_t = _time.time()
+ try:
+ import sys; sys.path.insert(0, os.path.join(_dir, '../memory/L4_raw_sessions'))
+ from compress_session import batch_process
+ r = batch_process(dry_run=False)
+ print(f'[L4 cron] {r}')
+ except Exception as e:
+ _logger.error(f'L4 archive failed: {e}')
+
if not os.path.isdir(TASKS): return None
now = datetime.now()
os.makedirs(DONE, exist_ok=True)
@@ -109,50 +122,5 @@ def check():
f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n'
f'{prompt}\n\n'
f'完成后将执行报告写入 {rpt}。')
- return None
-def health_check():
- """检查所有定时任务的健康状态,返回结构化报告"""
- if not os.path.isdir(TASKS):
- return {'error': 'TASKS directory not found'}
- now = datetime.now()
- os.makedirs(DONE, exist_ok=True)
- done_files = set(os.listdir(DONE))
- results = []
- for f in sorted(os.listdir(TASKS)):
- if not f.endswith('.json'): continue
- tid = f[:-5]
- try:
- task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read())
- except Exception as e:
- results.append({'task': tid, 'status': 'ERROR', 'detail': f'JSON parse: {e}'})
- continue
-
- enabled = task.get('enabled', False)
- repeat = task.get('repeat', 'daily')
- sched = task.get('schedule', '00:00')
- last = _last_run(tid, done_files)
- cooldown = _parse_cooldown(repeat)
-
- # 判断健康状态
- if not enabled:
- status = 'DISABLED'
- elif last is None:
- status = 'NEVER_RUN'
- elif repeat == 'once':
- status = 'COMPLETED' if last else 'PENDING'
- else:
- # 检查是否超过预期间隔的1.5倍
- expected_gap = cooldown * 1.25 # 略大于冷却时间
- if (now - last) > expected_gap:
- status = 'OVERDUE'
- else:
- status = 'HEALTHY'
-
- results.append({
- 'task': tid, 'status': status, 'enabled': enabled,
- 'repeat': repeat, 'schedule': sched,
- 'last_run': last.strftime('%Y-%m-%d %H:%M') if last else None,
- 'cooldown_hours': cooldown.total_seconds() / 3600,
- })
- return results
+ return None