From fdc24e4dd08bb040a62ca42755437fc6c585e618 Mon Sep 17 00:00:00 2001 From: Liang Jiaqing Date: Sat, 21 Mar 2026 12:13:46 +0800 Subject: [PATCH] add NativeClaudeSession+NativeToolClient, support parallel tool calls, fix tool_calls[-1:], add llm log --- agent_loop.py | 60 ++++++------ agentmain.py | 27 +++--- assets/tools_schema.json | 2 +- ga.py | 4 +- llmcore.py | 201 +++++++++++++++++++++++++++++++-------- 5 files changed, 207 insertions(+), 87 deletions(-) diff --git a/agent_loop.py b/agent_loop.py index 2eb87bd..23af155 100644 --- a/agent_loop.py +++ b/agent_loop.py @@ -17,9 +17,10 @@ class BaseHandler: def tool_before_callback(self, tool_name, args, response): pass def tool_after_callback(self, tool_name, args, response, ret): pass def next_prompt_patcher(self, next_prompt, outcome, turn): return next_prompt - def dispatch(self, tool_name, args, response): + def dispatch(self, tool_name, args, response, index=0): method_name = f"do_{tool_name}" if hasattr(self, method_name): + args['_index'] = index prer = yield from try_call_generator(self.tool_before_callback, tool_name, args, response) ret = yield from try_call_generator(getattr(self, method_name), args, response) _ = yield from try_call_generator(self.tool_after_callback, tool_name, args, response, ret) @@ -61,35 +62,34 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, response = exhaust(response_gen) yield response.content - if not response.tool_calls: - tool_name, args = 'no_tool', {} - else: - tool_call = response.tool_calls[0] - tool_name = tool_call.function.name - args = json.loads(tool_call.function.arguments) - - if tool_name == 'no_tool': pass - else: - showarg = get_pretty_json(args) - if not verbose and len(showarg) > 200: showarg = showarg[:200] + ' ...' - yield f"🛠️ **正在调用工具:** `{tool_name}` 📥**参数:**\n````text\n{showarg}\n````\n" - handler.current_turn = turn + 1 - gen = handler.dispatch(tool_name, args, response) - if verbose: - yield '`````\n' - outcome = yield from gen - yield '`````\n' - else: outcome = exhaust(gen) - - if outcome.next_prompt is None: return {'result': 'CURRENT_TASK_DONE', 'data': outcome.data} - if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} - if outcome.next_prompt.startswith('未知工具'): client.last_tools = '' - + if not response.tool_calls: tool_calls = [{'tool_name': 'no_tool', 'args': {}}] + else: tool_calls = [{'tool_name': tc.function.name, 'args': json.loads(tc.function.arguments)} + for tc in response.tool_calls] + next_prompt = "" - if outcome.data is not None: - datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data) - next_prompt += f"\n{datastr}\n\n\n" - next_prompt += outcome.next_prompt - next_prompt = handler.next_prompt_patcher(next_prompt, outcome, turn+1) + for ii, tc in enumerate(tool_calls): + tool_name, args = tc['tool_name'], tc['args'] + if tool_name == 'no_tool': pass + else: + showarg = get_pretty_json(args) + if not verbose and len(showarg) > 200: showarg = showarg[:200] + ' ...' + yield f"🛠️ **正在调用工具:** `{tool_name}` 📥**参数:**\n````text\n{showarg}\n````\n" + handler.current_turn = turn + 1 + gen = handler.dispatch(tool_name, args, response, index=ii) + if verbose: + yield '`````\n' + outcome = yield from gen + yield '`````\n' + else: outcome = exhaust(gen) + + if outcome.next_prompt is None: return {'result': 'CURRENT_TASK_DONE', 'data': outcome.data} + if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} + if outcome.next_prompt.startswith('未知工具'): client.last_tools = '' + + if outcome.data is not None: + datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data) + next_prompt += f"\n{datastr}\n\n\n" + next_prompt += outcome.next_prompt + next_prompt = handler.next_prompt_patcher(next_prompt, None, turn+1) messages = [{"role": "user", "content": next_prompt}] return {'result': 'MAX_TURNS_EXCEEDED'} diff --git a/agentmain.py b/agentmain.py index a1025c7..ff6e3d6 100644 --- a/agentmain.py +++ b/agentmain.py @@ -5,8 +5,8 @@ if sys.stderr is None: sys.stderr = open(os.devnull, "w") elif hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(errors='replace') sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from llmcore import SiderLLMSession, LLMSession, ToolClient, ClaudeSession, XaiSession, build_multimodal_content -from agent_loop import agent_runner_loop, StepOutcome, BaseHandler +from llmcore import SiderLLMSession, LLMSession, ToolClient, ClaudeSession, XaiSession, NativeToolClient, NativeClaudeSession, build_multimodal_content +from agent_loop import agent_runner_loop from ga import GenericAgentHandler, smart_format, get_global_memory, format_error script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -44,28 +44,30 @@ class GeneraticAgent: for k, cfg in mykeys.items(): if not any(x in k for x in ['api', 'config', 'cookie']): continue try: - if 'claude' in k: llm_sessions += [ClaudeSession(cfg=cfg)] - if 'oai' in k: llm_sessions += [LLMSession(cfg=cfg)] - if 'xai' in k: llm_sessions += [XaiSession(cfg=cfg)] - if 'sider' in k: llm_sessions += [SiderLLMSession(cfg={'apikey': cfg, 'model': x}) for x in \ + if 'native' in k and 'claude' in k: llm_sessions += [NativeToolClient(NativeClaudeSession(cfg=cfg))] + elif 'claude' in k: llm_sessions += [ToolClient(ClaudeSession(cfg=cfg))] + elif 'oai' in k: llm_sessions += [ToolClient(LLMSession(cfg=cfg))] + elif 'xai' in k: llm_sessions += [ToolClient(XaiSession(cfg=cfg))] + elif 'sider' in k: llm_sessions += [ToolClient(SiderLLMSession(cfg={'apikey': cfg, 'model': x})) for x in \ ["gemini-3.0-flash", "gpt-5.4"]] except: pass - if len(llm_sessions) > 0: self.llmclient = ToolClient(llm_sessions, auto_save_tokens=True) - else: self.llmclient = None + self.llmclients = llm_sessions self.lock = threading.Lock() self.history = [] self.task_queue = queue.Queue() self.is_running, self.stop_sig = False, False self.llm_no = 0; self.inc_out = False self.handler = None; self.verbose = True + self.llmclient = self.llmclients[self.llm_no] def next_llm(self, n=-1): - self.llm_no = ((self.llm_no + 1) if n < 0 else n) % len(self.llmclient.backends) + self.llm_no = ((self.llm_no + 1) if n < 0 else n) % len(self.llmclients) + self.llmclient = self.llmclients[self.llm_no] self.llmclient.last_tools = '' - def list_llms(self): return [(i, f"{type(b).__name__}/{b.default_model}", i == self.llm_no) for i, b in enumerate(self.llmclient.backends)] + def list_llms(self): return [(i, f"{type(b.backend).__name__}/{b.backend.default_model}", i == self.llm_no) for i, b in enumerate(self.llmclients)] def get_llm_name(self): - b = self.llmclient.backends[self.llm_no] - return f"{type(b).__name__}/{b.default_model}" + b = self.llmclient + return f"{type(b.backend).__name__}/{b.backend.default_model}" def abort(self): print('Abort current task...') @@ -95,7 +97,6 @@ class GeneraticAgent: handler.working['passed_sessions'] = ps = self.handler.working.get('passed_sessions', 0) + 1 if ps > 0: handler.working['key_info'] += f'\n[SYSTEM] 此为 {ps} 个对话前设置的key_info,若已在新任务,先更新或清除工作记忆。\n' self.handler = handler - self.llmclient.backend = self.llmclient.backends[self.llm_no] user_input = raw_query if source == 'feishu' and len(self.history) > 1: # 如果有历史记录且来自飞书,注入到首轮 user_input 中(支持/restore恢复上下文) user_input = handler._get_anchor_prompt() + f"\n\n### 用户当前消息\n{raw_query}" diff --git a/assets/tools_schema.json b/assets/tools_schema.json index 0d34f01..c35ace8 100644 --- a/assets/tools_schema.json +++ b/assets/tools_schema.json @@ -1,7 +1,7 @@ [ {"type": "function", "function": { "name": "code_run", - "description": "代码执行器。优先使用python,仅在必要系统操作时使用 powershell。注意:执行的代码必须放在在回复正文中,以 ```python 或 ```powershell 代码块的形式。严禁在代码中硬编码大量数据,如有需要应通过文件读取。", + "description": "代码执行器。优先使用python,仅在必要系统操作时使用 powershell。注意:不能同时调用多个,执行的代码放在回复正文中,以 ```python 或 ```powershell 代码块的形式。严禁在代码中硬编码大量数据,如有需要应通过文件读取。", "parameters": {"type": "object", "properties": { "type": {"type": "string", "enum": ["python", "powershell"], "description": "执行环境类型,默认为 python。", "default": "python"}, "timeout": {"type": "integer", "description": "执行超时时间(秒),默认 60。", "default": 60}, diff --git a/ga.py b/ga.py index e5b64c1..56b4e32 100644 --- a/ga.py +++ b/ga.py @@ -256,6 +256,7 @@ class GenericAgentHandler(BaseHandler): return os.path.abspath(os.path.join(self.cwd, path)) def tool_after_callback(self, tool_name, args, response, ret): + if args.get('_index', 0) > 0: return rsumm = re.search(r"(.*?)", response.content, re.DOTALL) if rsumm: summary = rsumm.group(1).strip()[:200] else: @@ -268,6 +269,7 @@ class GenericAgentHandler(BaseHandler): def do_code_run(self, args, response): '''执行代码片段,有长度限制,不允许代码中放大量数据,如有需要应当通过文件读取进行。 ''' + if args.get('_index', 0) > 0: return StepOutcome("[BLANK]", next_prompt="no multi code_run in one round!") code_type = args.get("type", "python") # 从 response.content 中提取代码块, 匹配 ```python ... ``` 或 ```powershell ... ``` pattern = rf"```{code_type}\n(.*?)\n```" @@ -502,4 +504,4 @@ def get_global_memory(): prompt += structure + '\n../memory/global_mem_insight.txt:\n' prompt += insight + "\n" except FileNotFoundError: pass - return prompt \ No newline at end of file + return prompt diff --git a/llmcore.py b/llmcore.py index 5a081f6..1d85a8e 100644 --- a/llmcore.py +++ b/llmcore.py @@ -397,25 +397,127 @@ class XaiSession: yield f"[XaiError] {e}" def reset(self): self._last_response_id = None + + + +class NativeClaudeSession: + def __init__(self, cfg): + self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') + self.default_model = cfg.get('model', 'claude-opus') + self.context_win = cfg.get('context_win', 24000) + self.history = [] + self.system = None + self.lock = threading.Lock() + def set_system(self, system_text): self.system = system_text + + def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144): + """底层API调用。yields text chunks,generator return = list[content_block]""" + model = model or self.default_model + headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01"} + payload = {"model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True} + if tools: payload["tools"] = tools + if system: payload["system"] = system + try: + resp = requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=120) + if resp.status_code != 200: + error_msg = f"Error: HTTP {resp.status_code} {resp.text[:500]}" + yield error_msg + return [{"type": "text", "text": error_msg}] + except Exception as e: + error_msg = f"Error: {e}" + yield error_msg + return [{"type": "text", "text": error_msg}] + + content_blocks = []; current_block = None; tool_json_buf = "" + for line in resp.iter_lines(): + if not line: continue + line = line.decode('utf-8') if isinstance(line, bytes) else line + data_str = line[6:] + if data_str.strip() == "[DONE]": break + try: evt = json.loads(data_str) + except: continue + evt_type = evt.get("type", "") + if evt_type == "content_block_start": + block = evt.get("content_block", {}) + if block.get("type") == "text": current_block = {"type": "text", "text": ""} + elif block.get("type") == "tool_use": + current_block = {"type": "tool_use", "id": block.get("id", ""), "name": block.get("name", ""), "input": {}} + tool_json_buf = "" + elif evt_type == "content_block_delta": + delta = evt.get("delta", {}) + if delta.get("type") == "text_delta": + text = delta.get("text", "") + if current_block: current_block["text"] += text + yield text + elif delta.get("type") == "input_json_delta": tool_json_buf += delta.get("partial_json", "") + elif evt_type == "content_block_stop": + if current_block: + if current_block["type"] == "tool_use": + try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {} + except: current_block["input"] = {"_raw": tool_json_buf} + content_blocks.append(current_block) + current_block = None + return content_blocks + + def ask(self, msg, tools=None, model=None): + """增量ask。msg: str|list[content_block]|dict。yields text chunks, return MockResponse""" + if isinstance(msg, str): msg = {"role": "user", "content": msg} + elif isinstance(msg, list): msg = {"role": "user", "content": msg} + with self.lock: + self.history.append(msg) + while len(self.history) > 2: + cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history) + len(self.system or '') + if cost <= self.context_win * 4: break + self.history.pop(0); self.history.pop(0) # 砍一对 + messages = list(self.history) + + content_blocks = None + gen = self.raw_ask(messages, tools, self.system, model) + try: + while True: yield next(gen) + except StopIteration as e: content_blocks = e.value or [] + if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("Error:")): + self.history.append({"role": "assistant", "content": content_blocks}) + thinking = '' + text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"] + content = "\n".join(text_parts).strip() + tool_calls = [] + for b in content_blocks: + if b.get("type") == "tool_use": + tool_calls.append(MockToolCall(b["name"], b.get("input", {}), id=b.get("id", ""))) + return MockResponse(thinking, content, tool_calls, str(content_blocks)) + +def openai_tools_to_claude(tools): + """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}]. 幂等""" + result = [] + for t in tools: + if 'input_schema' in t: result.append(t); continue # 已是claude格式 + fn = t.get('function', t) + result.append({ + 'name': fn['name'], 'description': fn.get('description', ''), + 'input_schema': fn.get('parameters', {'type': 'object', 'properties': {}}) + }) + return result + + class MockFunction: def __init__(self, name, arguments): self.name, self.arguments = name, arguments class MockToolCall: - def __init__(self, name, args): + def __init__(self, name, args, id=''): arg_str = json.dumps(args, ensure_ascii=False) if isinstance(args, dict) else args - self.function = MockFunction(name, arg_str) + self.function = MockFunction(name, arg_str); self.id = id class MockResponse: - def __init__(self, thinking, content, tool_calls, raw): - self.thinking = thinking # 存放 内部的思维过程 - self.content = content # 存放去除标签后的纯文本回复 - self.tool_calls = tool_calls # 存放 MockToolCall 列表 或 None - self.raw = raw + def __init__(self, thinking, content, tool_calls, raw, stop_reason='end_turn'): + self.thinking = thinking; self.content = content + self.tool_calls = tool_calls; self.raw = raw + self.stop_reason = 'tool_use' if tool_calls else stop_reason def __repr__(self): return f"" class ToolClient: - def __init__(self, backends, auto_save_tokens=False): + def __init__(self, backends, auto_save_tokens=True): if isinstance(backends, list): self.backends = backends else: self.backends = [backends] self.backend = self.backends[0] @@ -424,8 +526,6 @@ class ToolClient: self.total_cd_tokens = 0 def chat(self, messages, tools=None): - script_dir = os.path.dirname(os.path.abspath(__file__)) - log_path = os.path.join(script_dir, f'./temp/model_responses_{os.getpid()}.txt') if self._should_use_structured_messages(messages): backend_messages = self._build_backend_messages(messages, tools) print("Structured prompt length:", sum(self._estimate_content_len(m.get("content")) for m in backend_messages), 'chars') @@ -436,8 +536,7 @@ class ToolClient: print("Full prompt length:", len(full_prompt), 'chars') prompt_log = full_prompt gen = self.backend.ask(full_prompt, stream=True) - with open(log_path, 'a', encoding='utf-8', errors="replace") as f: - f.write(f"=== Prompt === {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{prompt_log}\n") + _write_llm_log('Prompt', prompt_log) raw_text = ''; summarytag = '[NextWillSummary]' for chunk in gen: raw_text += chunk @@ -445,8 +544,7 @@ class ToolClient: print('Complete response received.') if raw_text.endswith(summarytag): self.last_tools = ''; raw_text = raw_text[:-len(summarytag)] - with open(log_path, 'a', encoding='utf-8', errors="replace") as f: - f.write(f"=== Response === {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{raw_text}\n\n") + _write_llm_log('Response', raw_text) return self._parse_mixed_response(raw_text) def _should_use_structured_messages(self, messages): @@ -474,7 +572,7 @@ class ToolClient: 请按照以下步骤思考并行动,标签之间需要回车换行: 1. **思考**: 在 `` 标签中先进行思考,分析现状和策略。 2. **总结**: 在 `` 中输出*极为简短*的高度概括的单行(<30字)物理快照,包括上次工具调用结果产生的新信息+本次工具调用意图。此内容将进入长期工作记忆,记录关键信息,严禁输出无实际信息增量的描述。 -3. **行动**: 如需调用工具,请在回复正文之后输出一个 **块**,然后结束,我会稍后给你返回块。 +3. **行动**: 如需调用工具,请在回复正文之后输出一个(或多个)**块**,然后结束,我会稍后给你返回块。 格式: ```\n{{"name": "工具名", "arguments": {{参数}}}}\n\n``` ### 可用工具库(已挂载,持续有效) @@ -589,7 +687,13 @@ class ToolClient: print(e['err']) if 'bad_json' in e: tool_calls.append(MockToolCall('bad_json', {'msg': e['bad_json']})) content = remaining_text.strip() - return MockResponse(thinking, content, tool_calls[-1:], text) + return MockResponse(thinking, content, tool_calls, text) + +def _write_llm_log(label, content): + log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'temp/model_responses_{os.getpid()}.txt') + ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + with open(log_path, 'a', encoding='utf-8', errors='replace') as f: + f.write(f"=== {label} === {ts}\n{content}\n\n") def tryparse(json_str): try: return json.loads(json_str) @@ -602,30 +706,43 @@ def tryparse(json_str): if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1] return json.loads(json_str) -if __name__ == "__main__": - sider_cookie = mykeys.get("sider_cookie") - oai_configs = { - k: v for k, v in mykeys.items() if k.startswith("oai_config") and v - } - google_api_key = mykeys.get("google_api_key") - cfg = oai_configs.get("oai_config") - llmclient = ToolClient(LLMSession(cfg)) - def get_final(gen): - try: - while True: print('mid:', next(gen)) - except StopIteration as e: - return e.value - - response = get_final(llmclient.chat( - messages=[{"role": "user", "content": "我的IP是多少"}], - tools=[{"name": "get_ip", "parameters": {}}] - )) - print(f"思考: {response.thinking}") - if response.tool_calls: - cmd = response.tool_calls[0] - print(f"调用: {cmd.function.name} 参数: {cmd.function.arguments}") - response = get_final(llmclient.chat( - messages=[{"role": "user", "content": "10.176.45.12"}] - )) - print(response.content) +class NativeToolClient: + THINKING_PROMPT = """ +### 行动规范(持续有效) +每次回复请遵循: +1. 在 标签中先分析现状和策略 +2. 在 中输出极简单行(<30字)物理快照:上次结果新信息+本次意图。此内容进入长期工作记忆。 +3. 如需调用工具,直接使用工具调用能力,然后结束回复。 +""".strip() + def __init__(self, backend): + self.backend = backend + self.backend.system = self.THINKING_PROMPT + self.tools = {} + def set_system(self, extra_system): + combined = f"{extra_system}\n\n{self.THINKING_PROMPT}" if extra_system else self.THINKING_PROMPT + self.backend.system = combined + def chat(self, messages, tools=None): + if tools: self.tools = openai_tools_to_claude(tools) if isinstance(self.backend, NativeClaudeSession) else tools + combined_content = []; resp = None + for msg in messages: + c = msg.get('content', '') + if isinstance(c, str): combined_content.append({"type": "text", "text": c}) + elif isinstance(c, list) or isinstance(c, dict): combined_content.extend(c) + merged = {"role": "user", "content": combined_content} + _write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2)) + gen = self.backend.ask(merged, self.tools); + try: + while True: + chunk = next(gen); yield chunk + except StopIteration as e: resp = e.value + print('Complete response received.') + if resp: + _write_llm_log('Response', resp.raw) + text = resp.content + think_match = re.search(r'(.*?)', text, re.DOTALL) + if think_match: + resp.thinking = think_match.group(1).strip() + text = re.sub(r'.*?', '', text, flags=re.DOTALL) + resp.content = text.strip() + return resp \ No newline at end of file