enhance SSE parsing, add stream error/truncation detection, fix tool_call fallback extraction

This commit is contained in:
Liang Jiaqing
2026-03-26 18:42:09 +08:00
parent 650c14dc1a
commit 47ad98239f
2 changed files with 38 additions and 6 deletions

View File

@@ -18,7 +18,7 @@ def get_screen_width():
def start_streamlit(port):
global proc
cmd = [sys.executable, "-m", "streamlit", "run", os.path.join(frontends_dir, "stapp.py"), "--server.port", str(port), "--server.address", "localhost", "--server.headless", "true", "--theme.base", "dark"] # 暗黑模式
cmd = [sys.executable, "-m", "streamlit", "run", os.path.join(frontends_dir, "stapp.py"), "--server.port", str(port), "--server.address", "localhost", "--server.headless", "true"]
proc = subprocess.Popen(cmd)
atexit.register(proc.kill)

View File

@@ -78,6 +78,7 @@ class SiderLLMSession:
def _parse_claude_sse(resp_lines):
"""Parse Anthropic SSE stream. Yields text chunks, returns list[content_block]."""
content_blocks = []; current_block = None; tool_json_buf = ""
stop_reason = None; got_message_stop = False
for line in resp_lines:
if not line: continue
line = line.decode('utf-8') if isinstance(line, bytes) else line
@@ -85,7 +86,9 @@ def _parse_claude_sse(resp_lines):
data_str = line[5:].lstrip()
if data_str == "[DONE]": break
try: evt = json.loads(data_str)
except: continue
except Exception as e:
print(f"[SSE] JSON parse error: {e}, line: {data_str[:200]}")
continue
evt_type = evt.get("type", "")
if evt_type == "message_start":
usage = evt.get("message", {}).get("usage", {})
@@ -111,6 +114,25 @@ def _parse_claude_sse(resp_lines):
except: current_block["input"] = {"_raw": tool_json_buf}
content_blocks.append(current_block)
current_block = None
elif evt_type == "message_delta":
delta = evt.get("delta", {})
stop_reason = delta.get("stop_reason", stop_reason)
out_usage = evt.get("usage", {})
out_tokens = out_usage.get("output_tokens", 0)
if out_tokens: print(f"[Output] tokens={out_tokens} stop_reason={stop_reason}")
elif evt_type == "message_stop": got_message_stop = True
elif evt_type == "error":
err = evt.get("error", {})
emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
print(f"[SSE ERROR] {emsg}")
yield f"\n\n[SSE Error: {emsg}]"
break
if not got_message_stop and not stop_reason:
print("[WARN] SSE stream ended without message_stop - possible network interruption")
yield "\n\n[!!! 流异常中断,未收到完整响应 !!!]"
elif stop_reason == "max_tokens":
print(f"[WARN] Response truncated: max_tokens")
yield "\n\n[!!! Response truncated: max_tokens !!!]"
return content_blocks
def _parse_openai_sse(resp_lines, api_mode="chat_completions"):
@@ -557,7 +579,19 @@ class NativeOAISession:
text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"]
content = "\n".join(text_parts).strip()
tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in content_blocks if b.get("type") == "tool_use"]
return MockResponse("", content, tool_calls, str(content_blocks))
if len(tool_calls) == 0 and content.endswith('}]') and '[{"type":"tool_use"' in content:
try:
idx = content.index('[{"type":"tool_use"')
raw = json.loads(content[idx:])
tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in raw if b.get("type") == "tool_use"]
content = content[:idx].strip()
except: pass
think_pattern = r"<thinking>(.*?)</thinking>"; thinking = ''
think_match = re.search(think_pattern, content, re.DOTALL)
if think_match:
thinking = think_match.group(1).strip()
content = re.sub(think_pattern, "", content, flags=re.DOTALL)
return MockResponse(thinking, content, tool_calls, str(content_blocks))
class NativeClaudeSession:
@@ -712,8 +746,7 @@ class ToolClient:
"""
if self.auto_save_tokens and self.last_tools == tools_json:
tool_instruction = "\n### 工具库状态持续有效code_run/file_read等**可正常调用**。调用协议沿用。\n"
else:
self.total_cd_tokens = 0
else: self.total_cd_tokens = 0
self.last_tools = tools_json
return tool_instruction
@@ -843,7 +876,6 @@ class NativeToolClient:
每次回复请遵循:
1. 在 <thinking></thinking> 标签中先分析现状和策略
2. 在 <summary></summary> 中输出极简单行(<30字物理快照上次结果新信息+本次意图。此内容进入长期工作记忆。
3. 如需调用工具,直接使用工具调用能力,然后结束回复。
""".strip()
def __init__(self, backend):
self.backend = backend