Update core logic and prompts

This commit is contained in:
Liang Jiaqing
2026-01-28 09:29:56 +08:00
parent 3c1525e427
commit 424ddc4ab1
7 changed files with 80 additions and 99 deletions

82
ga.py
View File

@@ -1,15 +1,11 @@
import sys, os, re
import pyperclip, threading
import json, time
import sys, os, re, json, time, pyperclip, threading
from pathlib import Path
import subprocess
import tempfile
import tempfile, traceback, subprocess
if sys.stdout is None: sys.stdout = open(os.devnull, "w")
if sys.stderr is None: sys.stderr = open(os.devnull, "w")
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from sidercall import LLMSession, ToolClient
from agent_loop import BaseHandler, StepOutcome, agent_runner_loop
from agent_loop import BaseHandler, StepOutcome, try_call_generator
def code_run(code: str, code_type: str = "python", timeout: int = 60, cwd: str = None):
"""
@@ -146,7 +142,6 @@ def web_scan(focus_item="", switch_tab_id=None):
except Exception as e:
return {"status": "error", "msg": format_error(e)}
import traceback
def format_error(e):
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
@@ -182,24 +177,18 @@ def web_execute_js(script: str):
return {"status": "error", "msg": format_error(e)}
def file_patch(path: str, old_content: str, new_content: str):
"""
在文件中寻找唯一的 old_content 块并替换为 new_content。
"""在文件中寻找唯一的 old_content 块并替换为 new_content。
"""
path = str(Path(path).resolve())
try:
if not os.path.exists(path):
return {"status": "error", "msg": "文件不存在"}
with open(path, 'r', encoding='utf-8') as f:
full_text = f.read()
if not os.path.exists(path): return {"status": "error", "msg": "文件不存在"}
with open(path, 'r', encoding='utf-8') as f: full_text = f.read()
# 检查唯一性
count = full_text.count(old_content)
if count == 0:
return {"status": "error", "msg": "找到匹配的旧文本块,请检查空格、缩进和换行是否完全一致"}
if count > 1:
return {"status": "error", "msg": f"找到 {count} 处匹配,请提供更长的旧文本块以确保唯一性。"}
if count == 0: return {"status": "error", "msg": "未找到匹配的旧文本块,请检查空格、缩进和换行是否完全一致。"}
if count > 1: return {"status": "error", "msg": f"找到 {count} 处匹配,请提供更长的旧文本块以确保唯一性"}
updated_text = full_text.replace(old_content, new_content)
with open(path, 'w', encoding='utf-8') as f:
f.write(updated_text)
with open(path, 'w', encoding='utf-8') as f: f.write(updated_text)
return {"status": "success", "msg": "文件局部修改成功"}
except Exception as e:
return {"status": "error", "msg": str(e)}
@@ -224,31 +213,40 @@ def smart_format(data, max_depth=2, max_str_len=100):
if isinstance(obj, dict): return {k: truncate(v, depth + 1) for k, v in obj.items()}
if isinstance(obj, list): return [truncate(i, depth + 1) for i in obj]
return obj
if isinstance(data, (str, bytes)): return truncate(data, 0)
return json.dumps(truncate(data, 0), indent=2, ensure_ascii=False, default=str)
class GenericAgentHandler(BaseHandler):
'''
Generic Agent 工具库,包含多种工具的实现。工具函数自动加上了 do_ 前缀。实际工具名没有前缀。
'''
def __init__(self, parent, user_input, cwd):
def __init__(self, parent, last_history=None, cwd='./'):
self.parent = parent
self.user_input = user_input
self.plan = ""
self.focus = ""
self.cwd = cwd
self.history_info = last_history if last_history else []
def _get_abs_path(self, path):
if not path: return ""
return os.path.abspath(os.path.join(self.cwd, path))
def tool_after_callback(self, tool_name, args, response, ret):
rsumm = re.search(r"<summary>(.*?)</summary>", response.content, re.DOTALL)
if rsumm: summary = rsumm.group(1).strip()[:200]
else:
summary = f"调用工具{tool_name}, args: {args}"
if tool_name == 'no_tool': summary = "直接回答了用户问题"
if type(ret.next_prompt) is str:
ret.next_prompt += "\nPROTOCOL_VIOLATION: 上一轮遗漏了<summary>。 我已根据物理动作自动补全。请务必在下次回复中记得<summary>协议。"
self.history_info.append('[Agent] ' + smart_format(summary, max_str_len=100))
def do_code_run(self, args, response):
'''执行代码片段,有长度限制,不允许代码中放大量数据,如有需要应当通过文件读取进行。
'''
code_type = args.get("type", "python")
# 从 response.content 中提取代码块
# 匹配 ```python ... ``` 或 ```powershell ... ```
# 从 response.content 中提取代码块, 匹配 ```python ... ``` 或 ```powershell ... ```
pattern = rf"```{code_type}\n(.*?)\n```"
# 也可以更通用一点不分类型提取最后一个代码块rf"```(?:{code_type})?\n(.*?)\n```"
matches = re.findall(pattern, response.content, re.DOTALL)
if not matches:
return StepOutcome(None, next_prompt=f"【系统错误】:你调用了 code_run但未在回复中提供 ```{code_type} 代码块。请重新输出代码并附带工具调用。")
@@ -257,7 +255,8 @@ class GenericAgentHandler(BaseHandler):
timeout = args.get("timeout", 60)
cwd = args.get("cwd", self.cwd)
result = yield from code_run(code, code_type, timeout, cwd)
return StepOutcome(result, next_prompt=self._get_anchor_prompt())
next_prompt = self._get_anchor_prompt()
return StepOutcome(result, next_prompt=next_prompt)
def do_ask_user(self, args, response):
question = args.get("question", "请提供输入:")
@@ -292,7 +291,8 @@ class GenericAgentHandler(BaseHandler):
result["js_return"] += f"\n\n[已保存以上内容到 {abs_path}]"
print("Web Execute JS Result:", smart_format(result))
yield f"JS 执行结果:\n{smart_format(result)}\n"
return StepOutcome(result, next_prompt=self._get_anchor_prompt())
next_prompt = self._get_anchor_prompt()
return StepOutcome(result, next_prompt=next_prompt)
def do_file_patch(self, args, response):
path = self._get_abs_path(args.get("path", ""))
@@ -301,7 +301,8 @@ class GenericAgentHandler(BaseHandler):
new_content = args.get("new_content", "")
result = file_patch(path, old_content, new_content)
yield f"\n{smart_format(result)}\n"
return StepOutcome(result, next_prompt=self._get_anchor_prompt())
next_prompt = self._get_anchor_prompt()
return StepOutcome(result, next_prompt=next_prompt)
def do_file_write(self, args, response):
'''用于对整个文件的大量处理精细修改要用file_patch。
@@ -330,8 +331,9 @@ class GenericAgentHandler(BaseHandler):
with open(path, write_mode, encoding="utf-8") as f:
f.write(final_content)
yield f"[Status] ✅ {mode.capitalize()} 成功 ({len(new_content)} bytes)\n"
next_prompt = self._get_anchor_prompt()
return StepOutcome({"status": "success", 'writed_bytes': len(new_content)},
next_prompt=self._get_anchor_prompt())
next_prompt=next_prompt)
except Exception as e:
yield f"[Status] ❌ 写入异常: {str(e)}\n"
return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n")
@@ -343,7 +345,8 @@ class GenericAgentHandler(BaseHandler):
count = args.get("count", 100)
show_linenos = args.get("show_linenos", True)
result = file_read(path, start, count, show_linenos)
return StepOutcome(result, next_prompt=self._get_anchor_prompt())
next_prompt = self._get_anchor_prompt()
return StepOutcome(result, next_prompt=next_prompt)
def do_update_plan(self, args, response):
'''
@@ -361,8 +364,8 @@ class GenericAgentHandler(BaseHandler):
yield f"[Info] Updated plan and focus.\n"
yield f"New Plan:\n{self.plan}\n\n"
yield f"New Focus:\n{self.focus}\n"
return StepOutcome({"status": "success"},
next_prompt=self._get_anchor_prompt())
next_prompt = self._get_anchor_prompt()
return StepOutcome({"status": "success"}, next_prompt=next_prompt)
def do_no_tool(self, args, response):
'''这是一个特殊工具由引擎自主调用不要包含在TOOLS_SCHEMA里。
@@ -371,12 +374,9 @@ class GenericAgentHandler(BaseHandler):
return StepOutcome(response, next_prompt=None, should_exit=True)
def _get_anchor_prompt(self):
prompt = f"\n提醒: 用户原始输入:\n<user_input>{self.user_input}</user_input>\n"
if self.plan: prompt += f"<plan>\n{self.plan}\n</plan>\n"
if self.focus: prompt += f"<current>\n{self.focus}\n</current>\n"
prompt += "\n请继续执行下一步。"
return prompt
if __name__ == "__main__":
pass
h_str = "\n".join(self.history_info[-20:])
prompt = f"\n### [WORKING MEMORY]\n<history>\n{h_str}\n</history>"
print(prompt)
if self.plan: prompt += f"\n<plan>{self.plan}</plan>"
if self.focus: prompt += f"\n<focus>{self.focus}</focus>"
return prompt + "\n请继续执行下一步。"