diff --git a/agentmain.py b/agentmain.py index 42d7a57..e3ca5a1 100644 --- a/agentmain.py +++ b/agentmain.py @@ -166,12 +166,13 @@ if __name__ == '__main__': import importlib.util spec = importlib.util.spec_from_file_location('reflect_script', args.reflect) mod = importlib.util.module_from_spec(spec); spec.loader.exec_module(mod) - interval = getattr(mod, 'INTERVAL', 5) - once = getattr(mod, 'ONCE', False) - on_done = getattr(mod, 'on_done', None) - print(f'[Reflect] loaded {args.reflect}, interval={interval}s, once={once}') + _mt = os.path.getmtime(args.reflect) + print(f'[Reflect] loaded {args.reflect}') while True: - time.sleep(interval) + if os.path.getmtime(args.reflect) != _mt: + try: spec.loader.exec_module(mod); _mt = os.path.getmtime(args.reflect); print('[Reflect] reloaded') + except Exception as e: print(f'[Reflect] reload error: {e}') + time.sleep(getattr(mod, 'INTERVAL', 5)) try: task = mod.check() except Exception as e: print(f'[Reflect] check() error: {e}'); continue @@ -183,17 +184,16 @@ if __name__ == '__main__': result = item['done'] print(result) except Exception as e: - if once: raise + if getattr(mod, 'ONCE', False): raise print(f'[Reflect] drain error: {e}'); result = f'[ERROR] {e}' log_dir = os.path.join(script_dir, 'temp/reflect_logs'); os.makedirs(log_dir, exist_ok=True) script_name = os.path.splitext(os.path.basename(args.reflect))[0] open(os.path.join(log_dir, f'{script_name}_{datetime.now():%Y-%m-%d}.log'), 'a', encoding='utf-8').write(f'[{datetime.now():%m-%d %H:%M}]\n{result}\n\n') - if on_done: + if (on_done := getattr(mod, 'on_done', None)): try: on_done(result) except Exception as e: print(f'[Reflect] on_done error: {e}') - if once: print('[Reflect] ONCE=True, exiting.'); break - elif args.scheduled: - print('moved to reflect mode') + if getattr(mod, 'ONCE', False): print('[Reflect] ONCE=True, exiting.'); break + elif args.scheduled: print('moved to reflect mode') else: agent.inc_out = True while True: diff --git a/llmcore.py b/llmcore.py index 555dc1d..9b7cdca 100644 --- a/llmcore.py +++ b/llmcore.py @@ -14,19 +14,27 @@ mykeys = _load_mykeys() proxy = mykeys.get("proxy", 'http://127.0.0.1:2082') proxies = {"http": proxy, "https": proxy} if proxy else None -def compress_history_tags(messages, keep_recent=10, max_len=500): - """Compress // tags in older messages to save tokens.""" +def compress_history_tags(messages, keep_recent=12, max_len=1000): + """Compress // tags in older messages to save tokens. + Supports both prompt-style (ClaudeSession/LLMSession) and content-style (NativeClaudeSession) messages.""" compress_history_tags._cd = getattr(compress_history_tags, '_cd', 0) + 1 if compress_history_tags._cd % 5 != 0: return messages + _pats = {tag: re.compile(rf'(<{tag}>)([\s\S]*?)()') for tag in ('thinking', 'tool_use', 'tool_result')} + def _trunc(text): + for pat in _pats.values(): text = pat.sub(lambda m: m.group(1) + m.group(2)[:max_len] + '...' + m.group(3) if len(m.group(2)) > max_len else m.group(0), text) + return text for i, msg in enumerate(messages): - if i < len(messages) - keep_recent and 'orig' not in msg: + if i >= len(messages) - keep_recent: break + if 'prompt' in msg and 'orig' not in msg: msg['orig'] = msg['prompt'] - for tag in ('thinking', 'tool_use', 'tool_result'): - msg['prompt'] = re.sub( - rf'(<{tag}>)([\s\S]*?)()', - lambda m, _ml=max_len: m.group(1) + (m.group(2)[:_ml] + '...') + m.group(3) if len(m.group(2)) > _ml else m.group(0), - msg['prompt'] - ) + msg['prompt'] = _trunc(msg['prompt']) + elif 'content' in msg and 'prompt' not in msg: + c = msg['content'] + if isinstance(c, str): msg['content'] = _trunc(c) + elif isinstance(c, list): + for block in c: + if isinstance(block, dict) and block.get('type') == 'text' and isinstance(block.get('text'), str): + block['text'] = _trunc(block['text']) return messages def auto_make_url(base, path): @@ -71,24 +79,24 @@ class ClaudeSession: def __init__(self, cfg): self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') self.default_model = cfg.get('model', 'claude-opus') - self.context_win = cfg.get('context_win', 12000) + self.context_win = cfg.get('context_win', 18000) self.raw_msgs, self.lock = [], threading.Lock() self.prompt_cache = cfg.get('prompt_cache', False) def _trim_messages(self, messages): if not self.prompt_cache: compress_history_tags(messages) total = sum(len(m['prompt']) for m in messages) - if total <= self.context_win * 4: return messages - target, current, result = self.context_win * 4 * 0.9, 0, [] + if total <= self.context_win * 3: return messages + target, current, result = self.context_win * 3 * 0.6, 0, [] for msg in reversed(messages): if (msg_len := len(msg['prompt'])) + current <= target: result.append(msg); current += msg_len else: break - if current > self.context_win * 3.6: print(f'[DEBUG] {len(result)} contexts, whole length {current//4} tokens.') + if current > self.context_win * 2.7: print(f'[DEBUG] {len(result)} contexts, whole length {current//3} tokens.') return result[::-1] or messages[-2:] def raw_ask(self, messages, model=None, temperature=0.5, max_tokens=6144): model = model or self.default_model if 'kimi' in model.lower() or 'moonshot' in model.lower(): temperature = 1.0 # kimi/moonshot only accepts temp 1.0 - headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01"} + headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} payload = {"model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True} try: with requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=(5,30)) as r: @@ -101,14 +109,23 @@ class ClaudeSession: if data == "[DONE]": break try: obj = json.loads(data) - if obj.get("type") == "content_block_delta" and obj.get("delta", {}).get("type") == "text_delta": + if obj.get("type") == "message_start": + usage = obj.get("message", {}).get("usage", {}) + ci, cr, inp = usage.get("cache_creation_input_tokens", 0), usage.get("cache_read_input_tokens", 0), usage.get("input_tokens", 0) + print(f"[Cache] input={inp} creation={ci} read={cr}") + elif obj.get("type") == "content_block_delta" and obj.get("delta", {}).get("type") == "text_delta": text = obj["delta"].get("text", "") if text: yield text except: pass except Exception as e: yield f"Error: {str(e)}" def make_messages(self, raw_list): trimmed = self._trim_messages(raw_list) - return [{"role": m['role'], "content": m['prompt']} for m in trimmed] + msgs = [{"role": m['role'], "content": m['prompt']} for m in trimmed] + for i in range(len(msgs)-1, -1, -1): + if msgs[i]["role"] == "assistant": + msgs[i]["content"] = [{"type": "text", "text": msgs[i]["content"], "cache_control": {"type": "ephemeral"}}] + break + return msgs def ask(self, prompt, model=None, stream=False): def _ask_gen(): content = '' @@ -124,7 +141,7 @@ class LLMSession: def __init__(self, cfg): self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') self.default_model = cfg['model'] - self.context_win = cfg.get('context_win', 16000) + self.context_win = cfg.get('context_win', 18000) self.raw_msgs, self.messages = [], [] proxy = cfg.get('proxy') self.proxies = {"http": proxy, "https": proxy} if proxy else None @@ -290,7 +307,7 @@ class LLMSession: with self.lock: keep = 0; tok = 0 for m in reversed(self.raw_msgs): - l = len(str(m))//4 + l = len(str(m))//3 if tok + l > self.context_win*0.2: break tok += l; keep += 1 keep = max(2, keep) @@ -299,9 +316,9 @@ class LLMSession: p = "Summarize prev summary and prev conversations into compact memory (facts/decisions/constraints/open questions). Do NOT restate long schemas. The new summary should less than 1000 tokens. Permit dropping non-important things.\n" messages = self.make_messages(old, omit_images=True) messages += [{"role":"user", "content":p}] - msg_lens = [1000 if isinstance(m["content"], list) else len(str(m["content"]))//4 for m in messages] + msg_lens = [1000 if isinstance(m["content"], list) else len(str(m["content"]))//3 for m in messages] summary = ''.join(list(self.raw_ask(messages, model, temperature=0.1))) - print('[Debug] Summary length:', len(summary)//4, '; Orig context lengths:', str(msg_lens)) + print('[Debug] Summary length:', len(summary)//3, '; Orig context lengths:', str(msg_lens)) if not summary.startswith("Error:"): self.raw_msgs.insert(0, {"role":"assistant", "prompt":"Prev summary:\n"+summary, "image":None}) else: self.raw_msgs = old + self.raw_msgs # 不做了,下次再做 @@ -314,7 +331,7 @@ class LLMSession: self.raw_msgs.append({"role": "user", "prompt": prompt, "image": image_base64}) messages = self.make_messages(self.raw_msgs[:-1], omit_images=True) messages += self.make_messages([self.raw_msgs[-1]], omit_images=False) - msg_lens = [1000 if isinstance(m["content"], list) else len(str(m["content"]))//4 for m in messages] + msg_lens = [1000 if isinstance(m["content"], list) else len(str(m["content"]))//3 for m in messages] total_len = sum(msg_lens) # estimate token count gen = self.raw_ask(messages, model) for chunk in gen: @@ -475,7 +492,7 @@ class NativeClaudeSession: def __init__(self, cfg): self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') self.default_model = cfg.get('model', 'claude-opus') - self.context_win = cfg.get('context_win', 24000) + self.context_win = cfg.get('context_win', 32000) self.history = [] self.system = None self.lock = threading.Lock() @@ -487,11 +504,15 @@ class NativeClaudeSession: headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} payload = {"model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True} if tools: - tools = [dict(t) for t in tools] - tools[-1]["cache_control"] = {"type": "ephemeral"} + tools = [dict(t) for t in tools]; tools[-1]["cache_control"] = {"type": "ephemeral"} payload["tools"] = tools - if system: - payload["system"] = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}] + if system: payload["system"] = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}] + # 历史消息缓存:最后一个assistant消息加cache_control + for i in range(len(messages) - 1, -1, -1): + if messages[i]["role"] == "assistant": + c = messages[i].get("content", []) + if isinstance(c, list) and c: messages[i] = {**messages[i], "content": [*c[:-1], {**c[-1], "cache_control": {"type": "ephemeral"}}]} + break try: resp = requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=120) if resp.status_code != 200: @@ -540,10 +561,13 @@ class NativeClaudeSession: elif isinstance(msg, list): msg = {"role": "user", "content": msg} with self.lock: self.history.append(msg) - while len(self.history) > 2: - cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history) + len(self.system or '') - if cost <= self.context_win * 4: break - self.history.pop(0); self.history.pop(0) # 砍一对 + compress_history_tags(self.history) + cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history) + if cost > self.context_win * 4: + target = self.context_win * 4 * 0.6 + while len(self.history) > 2 and cost > target: + self.history.pop(0); self.history.pop(0) + cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history) messages = list(self.history) content_blocks = None @@ -563,7 +587,7 @@ class NativeClaudeSession: return MockResponse(thinking, content, tool_calls, str(content_blocks)) def openai_tools_to_claude(tools): - """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}]. 幂等""" + """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}].""" result = [] for t in tools: if 'input_schema' in t: result.append(t); continue # 已是claude格式