fix: file_patch空值校验 + tool_use解析健壮性提升

This commit is contained in:
Liang Jiaqing
2026-02-24 19:08:02 +08:00
parent 07a53889fb
commit 3570489ba6
2 changed files with 20 additions and 17 deletions

5
ga.py
View File

@@ -183,7 +183,7 @@ def file_patch(path: str, old_content: str, new_content: str):
try: try:
if not os.path.exists(path): return {"status": "error", "msg": "文件不存在"} if not os.path.exists(path): return {"status": "error", "msg": "文件不存在"}
with open(path, 'r', encoding='utf-8') as f: full_text = f.read() with open(path, 'r', encoding='utf-8') as f: full_text = f.read()
# 检查唯一性 if not old_content: return {"status": "error", "msg": "old_content 为空,请确认 arguments 参数"}
count = full_text.count(old_content) count = full_text.count(old_content)
if count == 0: return {"status": "error", "msg": "未找到匹配的旧文本块,建议:先用 file_read 确认当前内容,再分小段进行 patch。若多次失败则询问用户严禁自行使用 overwrite 或代码替换。"} if count == 0: return {"status": "error", "msg": "未找到匹配的旧文本块,建议:先用 file_read 确认当前内容,再分小段进行 patch。若多次失败则询问用户严禁自行使用 overwrite 或代码替换。"}
if count > 1: return {"status": "error", "msg": f"找到 {count} 处匹配,无法确定唯一位置。请提供更长、更具体的旧文本块以确保唯一性。建议:包含上下文行来增强特征,或分小段逐个修改。"} if count > 1: return {"status": "error", "msg": f"找到 {count} 处匹配,无法确定唯一位置。请提供更长、更具体的旧文本块以确保唯一性。建议:包含上下文行来增强特征,或分小段逐个修改。"}
@@ -385,7 +385,7 @@ class GenericAgentHandler(BaseHandler):
result += '\n\n(某些行被截断,如需完整内容可改用 code_run 读取)' result += '\n\n(某些行被截断,如需完整内容可改用 code_run 读取)'
next_prompt = self._get_anchor_prompt() next_prompt = self._get_anchor_prompt()
if 'memory' in path or 'sop' in path: if 'memory' in path or 'sop' in path:
next_prompt += "\nPROTOCOL: 你正在读取记忆或SOP文件若决定按sop执行请先调用相关工具提取sop中的关键点特别是靠后的进入工作记忆。" next_prompt += "\n[SYSTEM TIPS] 正在读取记忆或SOP文件若决定按sop执行请提取sop中的关键点特别是靠后的update working memory."
return StepOutcome(result, next_prompt=next_prompt) return StepOutcome(result, next_prompt=next_prompt)
def do_update_working_mem(self, args, response): def do_update_working_mem(self, args, response):
@@ -407,7 +407,6 @@ class GenericAgentHandler(BaseHandler):
二次确认仅在回复几乎只包含<thinking>/<summary>和一段大代码块时触发。 二次确认仅在回复几乎只包含<thinking>/<summary>和一段大代码块时触发。
''' '''
content = getattr(response, 'content', '') or "" content = getattr(response, 'content', '') or ""
# 1. 空回复保护:要求模型重新生成内容或调用工具 # 1. 空回复保护:要求模型重新生成内容或调用工具
if not response or not content.strip(): if not response or not content.strip():
yield "[Warn] LLM returned an empty response. Retrying...\n" yield "[Warn] LLM returned an empty response. Retrying...\n"

View File

@@ -320,8 +320,7 @@ class ToolClient:
return prompt return prompt
def _parse_mixed_response(self, text): def _parse_mixed_response(self, text):
remaining_text = text remaining_text = text; thinking = ''
thinking = ''
think_pattern = r"<thinking>(.*?)</thinking>" think_pattern = r"<thinking>(.*?)</thinking>"
think_match = re.search(think_pattern, text, re.DOTALL) think_match = re.search(think_pattern, text, re.DOTALL)
@@ -329,42 +328,47 @@ class ToolClient:
thinking = think_match.group(1).strip() thinking = think_match.group(1).strip()
remaining_text = re.sub(think_pattern, "", remaining_text, flags=re.DOTALL) remaining_text = re.sub(think_pattern, "", remaining_text, flags=re.DOTALL)
tool_calls = None tool_calls = []; json_strs = []; errors = []
tool_pattern = r"<tool_use>(.*?)</tool_use>" tool_pattern = r"<tool_use>(.{15,}?)</tool_use>"
tool_all = re.findall(tool_pattern, remaining_text, re.DOTALL) tool_all = re.findall(tool_pattern, remaining_text, re.DOTALL)
json_str = ""
if tool_all: if tool_all:
json_str = tool_all[-1].strip() tool_all = [s.strip() for s in tool_all]
json_strs.extend([s for s in tool_all if s.startswith('{') and s.endswith('}')])
remaining_text = re.sub(tool_pattern, "", remaining_text, flags=re.DOTALL) remaining_text = re.sub(tool_pattern, "", remaining_text, flags=re.DOTALL)
elif '<tool_use>' in remaining_text: elif '<tool_use>' in remaining_text:
weaktoolstr = remaining_text.split('<tool_use>')[-1].strip() weaktoolstr = remaining_text.split('<tool_use>')[-1].strip()
json_str = weaktoolstr if weaktoolstr.endswith('}') else '' json_str = weaktoolstr if weaktoolstr.endswith('}') else ''
if json_str == '' and '```' in weaktoolstr and weaktoolstr.split('```')[0].strip().endswith('}'): if json_str == '' and '```' in weaktoolstr and weaktoolstr.split('```')[0].strip().endswith('}'):
json_str = weaktoolstr.split('```')[0].strip() json_str = weaktoolstr.split('```')[0].strip()
if json_str:
json_strs.append(json_str)
remaining_text = remaining_text.replace('<tool_use>'+weaktoolstr, "") remaining_text = remaining_text.replace('<tool_use>'+weaktoolstr, "")
elif '"name":' in remaining_text and '"arguments":' in remaining_text: elif '"name":' in remaining_text and '"arguments":' in remaining_text:
json_match = re.search(r"(\{.*\"name\":.*?\})", remaining_text, re.DOTALL | re.MULTILINE) json_match = re.search(r"(\{.*\"name\":.*?\})", remaining_text, re.DOTALL | re.MULTILINE)
if json_match: if json_match:
json_str = json_match.group(1).strip() json_str = json_match.group(1).strip()
json_strs.append(json_str)
remaining_text = remaining_text.replace(json_str, "").strip() remaining_text = remaining_text.replace(json_str, "").strip()
if json_str:
for json_str in json_strs:
try: try:
data = tryparse(json_str) data = tryparse(json_str)
func_name = data.get('name') or data.get('function') or data.get('tool') func_name = data.get('name') or data.get('function') or data.get('tool')
args = data.get('arguments') or data.get('args') or data.get('params') or data.get('parameters') args = data.get('arguments') or data.get('args') or data.get('params') or data.get('parameters')
if args is None: args = data if args is None: args = data
if func_name: tool_calls = [MockToolCall(func_name, args)] if func_name: tool_calls.append(MockToolCall(func_name, args))
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print("[Warn] Failed to parse tool_use JSON:", json_str) errors.append({'err': f"[Warn] Failed to parse tool_use JSON: {json_str}", 'bad_json': f'Failed to parse tool_use JSON: {json_str[:200]}'})
tool_calls = [MockToolCall('bad_json', {'msg': f'Failed to parse tool_use JSON: {json_str[:200]}'})]
self.last_tools = '' # llm肯定忘了tool schema了再提供下 self.last_tools = '' # llm肯定忘了tool schema了再提供下
except Exception as e: except Exception as e:
print("[Error] Exception during tool_use parsing:", str(e), data) errors.append({'err': f'[Warn] Exception during tool_use parsing: {str(e)} {str(data)}'})
if len(tool_calls) == 0:
for e in errors:
print(e['err'])
if 'bad_json' in e: tool_calls.append(MockToolCall('bad_json', {'msg': e['bad_json']}))
content = remaining_text.strip() content = remaining_text.strip()
if not content: content = "" return MockResponse(thinking, content, tool_calls[-1:], text)
return MockResponse(thinking, content, tool_calls, text)
def tryparse(json_str): def tryparse(json_str):
try: return json.loads(json_str) try: return json.loads(json_str)