add NativeClaudeSession+NativeToolClient, support parallel tool calls, fix tool_calls[-1:], add llm log

This commit is contained in:
Liang Jiaqing
2026-03-21 12:13:46 +08:00
parent 85f17db487
commit fdc24e4dd0
5 changed files with 207 additions and 87 deletions

View File

@@ -397,25 +397,127 @@ class XaiSession:
yield f"[XaiError] {e}"
def reset(self): self._last_response_id = None
class NativeClaudeSession:
def __init__(self, cfg):
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
self.default_model = cfg.get('model', 'claude-opus')
self.context_win = cfg.get('context_win', 24000)
self.history = []
self.system = None
self.lock = threading.Lock()
def set_system(self, system_text): self.system = system_text
def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144):
"""底层API调用。yields text chunksgenerator return = list[content_block]"""
model = model or self.default_model
headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01"}
payload = {"model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True}
if tools: payload["tools"] = tools
if system: payload["system"] = system
try:
resp = requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=120)
if resp.status_code != 200:
error_msg = f"Error: HTTP {resp.status_code} {resp.text[:500]}"
yield error_msg
return [{"type": "text", "text": error_msg}]
except Exception as e:
error_msg = f"Error: {e}"
yield error_msg
return [{"type": "text", "text": error_msg}]
content_blocks = []; current_block = None; tool_json_buf = ""
for line in resp.iter_lines():
if not line: continue
line = line.decode('utf-8') if isinstance(line, bytes) else line
data_str = line[6:]
if data_str.strip() == "[DONE]": break
try: evt = json.loads(data_str)
except: continue
evt_type = evt.get("type", "")
if evt_type == "content_block_start":
block = evt.get("content_block", {})
if block.get("type") == "text": current_block = {"type": "text", "text": ""}
elif block.get("type") == "tool_use":
current_block = {"type": "tool_use", "id": block.get("id", ""), "name": block.get("name", ""), "input": {}}
tool_json_buf = ""
elif evt_type == "content_block_delta":
delta = evt.get("delta", {})
if delta.get("type") == "text_delta":
text = delta.get("text", "")
if current_block: current_block["text"] += text
yield text
elif delta.get("type") == "input_json_delta": tool_json_buf += delta.get("partial_json", "")
elif evt_type == "content_block_stop":
if current_block:
if current_block["type"] == "tool_use":
try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {}
except: current_block["input"] = {"_raw": tool_json_buf}
content_blocks.append(current_block)
current_block = None
return content_blocks
def ask(self, msg, tools=None, model=None):
"""增量ask。msg: str|list[content_block]|dict。yields text chunks, return MockResponse"""
if isinstance(msg, str): msg = {"role": "user", "content": msg}
elif isinstance(msg, list): msg = {"role": "user", "content": msg}
with self.lock:
self.history.append(msg)
while len(self.history) > 2:
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history) + len(self.system or '')
if cost <= self.context_win * 4: break
self.history.pop(0); self.history.pop(0) # 砍一对
messages = list(self.history)
content_blocks = None
gen = self.raw_ask(messages, tools, self.system, model)
try:
while True: yield next(gen)
except StopIteration as e: content_blocks = e.value or []
if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("Error:")):
self.history.append({"role": "assistant", "content": content_blocks})
thinking = ''
text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"]
content = "\n".join(text_parts).strip()
tool_calls = []
for b in content_blocks:
if b.get("type") == "tool_use":
tool_calls.append(MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")))
return MockResponse(thinking, content, tool_calls, str(content_blocks))
def openai_tools_to_claude(tools):
"""[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}]. 幂等"""
result = []
for t in tools:
if 'input_schema' in t: result.append(t); continue # 已是claude格式
fn = t.get('function', t)
result.append({
'name': fn['name'], 'description': fn.get('description', ''),
'input_schema': fn.get('parameters', {'type': 'object', 'properties': {}})
})
return result
class MockFunction:
def __init__(self, name, arguments): self.name, self.arguments = name, arguments
class MockToolCall:
def __init__(self, name, args):
def __init__(self, name, args, id=''):
arg_str = json.dumps(args, ensure_ascii=False) if isinstance(args, dict) else args
self.function = MockFunction(name, arg_str)
self.function = MockFunction(name, arg_str); self.id = id
class MockResponse:
def __init__(self, thinking, content, tool_calls, raw):
self.thinking = thinking # 存放 <thinking> 内部的思维过程
self.content = content # 存放去除标签后的纯文本回复
self.tool_calls = tool_calls # 存放 MockToolCall 列表 或 None
self.raw = raw
def __init__(self, thinking, content, tool_calls, raw, stop_reason='end_turn'):
self.thinking = thinking; self.content = content
self.tool_calls = tool_calls; self.raw = raw
self.stop_reason = 'tool_use' if tool_calls else stop_reason
def __repr__(self):
return f"<MockResponse thinking={bool(self.thinking)}, content='{self.content}', tools={bool(self.tool_calls)}>"
class ToolClient:
def __init__(self, backends, auto_save_tokens=False):
def __init__(self, backends, auto_save_tokens=True):
if isinstance(backends, list): self.backends = backends
else: self.backends = [backends]
self.backend = self.backends[0]
@@ -424,8 +526,6 @@ class ToolClient:
self.total_cd_tokens = 0
def chat(self, messages, tools=None):
script_dir = os.path.dirname(os.path.abspath(__file__))
log_path = os.path.join(script_dir, f'./temp/model_responses_{os.getpid()}.txt')
if self._should_use_structured_messages(messages):
backend_messages = self._build_backend_messages(messages, tools)
print("Structured prompt length:", sum(self._estimate_content_len(m.get("content")) for m in backend_messages), 'chars')
@@ -436,8 +536,7 @@ class ToolClient:
print("Full prompt length:", len(full_prompt), 'chars')
prompt_log = full_prompt
gen = self.backend.ask(full_prompt, stream=True)
with open(log_path, 'a', encoding='utf-8', errors="replace") as f:
f.write(f"=== Prompt === {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{prompt_log}\n")
_write_llm_log('Prompt', prompt_log)
raw_text = ''; summarytag = '[NextWillSummary]'
for chunk in gen:
raw_text += chunk
@@ -445,8 +544,7 @@ class ToolClient:
print('Complete response received.')
if raw_text.endswith(summarytag):
self.last_tools = ''; raw_text = raw_text[:-len(summarytag)]
with open(log_path, 'a', encoding='utf-8', errors="replace") as f:
f.write(f"=== Response === {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{raw_text}\n\n")
_write_llm_log('Response', raw_text)
return self._parse_mixed_response(raw_text)
def _should_use_structured_messages(self, messages):
@@ -474,7 +572,7 @@ class ToolClient:
请按照以下步骤思考并行动,标签之间需要回车换行:
1. **思考**: 在 `<thinking>` 标签中先进行思考,分析现状和策略。
2. **总结**: 在 `<summary>` 中输出*极为简短*的高度概括的单行(<30字物理快照包括上次工具调用结果产生的新信息+本次工具调用意图。此内容将进入长期工作记忆,记录关键信息,严禁输出无实际信息增量的描述。
3. **行动**: 如需调用工具,请在回复正文之后输出一个 **<tool_use>块**,然后结束,我会稍后给你返回<tool_result>块。
3. **行动**: 如需调用工具,请在回复正文之后输出一个(或多个)**<tool_use>块**,然后结束,我会稍后给你返回<tool_result>块。
格式: ```<tool_use>\n{{"name": "工具名", "arguments": {{参数}}}}\n</tool_use>\n```
### 可用工具库(已挂载,持续有效)
@@ -589,7 +687,13 @@ class ToolClient:
print(e['err'])
if 'bad_json' in e: tool_calls.append(MockToolCall('bad_json', {'msg': e['bad_json']}))
content = remaining_text.strip()
return MockResponse(thinking, content, tool_calls[-1:], text)
return MockResponse(thinking, content, tool_calls, text)
def _write_llm_log(label, content):
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'temp/model_responses_{os.getpid()}.txt')
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(log_path, 'a', encoding='utf-8', errors='replace') as f:
f.write(f"=== {label} === {ts}\n{content}\n\n")
def tryparse(json_str):
try: return json.loads(json_str)
@@ -602,30 +706,43 @@ def tryparse(json_str):
if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1]
return json.loads(json_str)
if __name__ == "__main__":
sider_cookie = mykeys.get("sider_cookie")
oai_configs = {
k: v for k, v in mykeys.items() if k.startswith("oai_config") and v
}
google_api_key = mykeys.get("google_api_key")
cfg = oai_configs.get("oai_config")
llmclient = ToolClient(LLMSession(cfg))
def get_final(gen):
try:
while True: print('mid:', next(gen))
except StopIteration as e:
return e.value
response = get_final(llmclient.chat(
messages=[{"role": "user", "content": "我的IP是多少"}],
tools=[{"name": "get_ip", "parameters": {}}]
))
print(f"思考: {response.thinking}")
if response.tool_calls:
cmd = response.tool_calls[0]
print(f"调用: {cmd.function.name} 参数: {cmd.function.arguments}")
response = get_final(llmclient.chat(
messages=[{"role": "user", "content": "<tool_result>10.176.45.12</tool_result>"}]
))
print(response.content)
class NativeToolClient:
THINKING_PROMPT = """
### 行动规范(持续有效)
每次回复请遵循:
1. 在 <thinking> 标签中先分析现状和策略
2. 在 <summary> 中输出极简单行(<30字物理快照上次结果新信息+本次意图。此内容进入长期工作记忆。
3. 如需调用工具,直接使用工具调用能力,然后结束回复。
""".strip()
def __init__(self, backend):
self.backend = backend
self.backend.system = self.THINKING_PROMPT
self.tools = {}
def set_system(self, extra_system):
combined = f"{extra_system}\n\n{self.THINKING_PROMPT}" if extra_system else self.THINKING_PROMPT
self.backend.system = combined
def chat(self, messages, tools=None):
if tools: self.tools = openai_tools_to_claude(tools) if isinstance(self.backend, NativeClaudeSession) else tools
combined_content = []; resp = None
for msg in messages:
c = msg.get('content', '')
if isinstance(c, str): combined_content.append({"type": "text", "text": c})
elif isinstance(c, list) or isinstance(c, dict): combined_content.extend(c)
merged = {"role": "user", "content": combined_content}
_write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2))
gen = self.backend.ask(merged, self.tools);
try:
while True:
chunk = next(gen); yield chunk
except StopIteration as e: resp = e.value
print('Complete response received.')
if resp:
_write_llm_log('Response', resp.raw)
text = resp.content
think_match = re.search(r'<thinking>(.*?)</thinking>', text, re.DOTALL)
if think_match:
resp.thinking = think_match.group(1).strip()
text = re.sub(r'<thinking>.*?</thinking>', '', text, flags=re.DOTALL)
resp.content = text.strip()
return resp