Files
GenericAgent/ga.py

559 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys, os, re, json, time, threading, importlib
from datetime import datetime
from pathlib import Path
import tempfile, traceback, subprocess, itertools, collections, difflib
if sys.stdout is None: sys.stdout = open(os.devnull, "w")
if sys.stderr is None: sys.stderr = open(os.devnull, "w")
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from agent_loop import BaseHandler, StepOutcome, json_default
def code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop_signal=[]):
"""代码执行器
python: 运行复杂的 .py 脚本(文件模式)
powershell/bash: 运行单行指令(命令模式)
优先使用python仅在必要系统操作时使用powershell"""
preview = (code[:60].replace('\n', ' ') + '...') if len(code) > 60 else code.strip()
yield f"[Action] Running {code_type} in {os.path.basename(cwd)}: {preview}\n"
script_dir = os.path.dirname(os.path.abspath(__file__))
cwd = cwd or os.path.join(script_dir, 'temp'); tmp_path = None
if code_type == "python":
tmp_file = tempfile.NamedTemporaryFile(suffix=".ai.py", delete=False, mode='w', encoding='utf-8', dir=code_cwd)
cr_header = os.path.join(script_dir, 'assets', 'code_run_header.py')
if os.path.exists(cr_header): tmp_file.write(open(cr_header, encoding='utf-8').read())
tmp_file.write(code)
tmp_path = tmp_file.name
tmp_file.close()
cmd = [sys.executable, "-X", "utf8", "-u", tmp_path]
elif code_type in ["powershell", "bash"]:
if os.name == 'nt': cmd = ["powershell", "-NoProfile", "-NonInteractive", "-Command", code]
else: cmd = ["bash", "-c", code]
else:
return {"status": "error", "msg": f"不支持的类型: {code_type}"}
print("code run output:")
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0 # SW_HIDE
full_stdout = []
def stream_reader(proc, logs):
try:
for line_bytes in iter(proc.stdout.readline, b''):
try: line = line_bytes.decode('utf-8')
except UnicodeDecodeError: line = line_bytes.decode('gbk', errors='ignore')
logs.append(line)
try: print(line, end="")
except: pass
except: pass
try:
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=0, cwd=cwd, startupinfo=startupinfo
)
start_t = time.time()
t = threading.Thread(target=stream_reader, args=(process, full_stdout), daemon=True)
t.start()
while t.is_alive():
istimeout = time.time() - start_t > timeout
if istimeout or len(stop_signal) > 0:
process.kill()
print("[Debug] Process killed due to timeout or stop signal.")
if istimeout: full_stdout.append("\n[Timeout Error] 超时强制终止")
else: full_stdout.append("\n[Stopped] 用户强制终止")
break
time.sleep(1)
t.join(timeout=1)
exit_code = process.poll()
stdout_str = "".join(full_stdout)
status = "success" if exit_code == 0 else "error"
status_icon = "" if exit_code == 0 else ""
if exit_code is None: status_icon = ""
output_snippet = smart_format(stdout_str, max_str_len=600, omit_str='\n\n[omitted long output]\n\n')
yield f"[Status] {status_icon} Exit Code: {exit_code}\n[Stdout]\n{output_snippet}\n"
if process.stdout: threading.Thread(target=process.stdout.close, daemon=True).start()
return {
"status": status,
"stdout": smart_format(stdout_str, max_str_len=10000, omit_str='\n\n[omitted long output]\n\n'),
"exit_code": exit_code
}
except Exception as e:
if 'process' in locals(): process.kill()
return {"status": "error", "msg": str(e)}
finally:
if code_type == "python" and tmp_path and os.path.exists(tmp_path): os.remove(tmp_path)
def ask_user(question, candidates=None):
"""question: 向用户提出的问题。candidates: 可选的候选项列表"""
return {"status": "INTERRUPT", "intent": "HUMAN_INTERVENTION",
"data": {"question": question, "candidates": candidates or []}}
import simphtml
driver = None
def first_init_driver():
global driver
from TMWebDriver import TMWebDriver
driver = TMWebDriver()
for i in range(20):
time.sleep(1)
sess = driver.get_all_sessions()
if len(sess) > 0: break
if len(sess) == 0: return
if len(sess) == 1:
#driver.newtab()
time.sleep(3)
def web_scan(tabs_only=False, switch_tab_id=None, text_only=False):
"""
获取当前页面的简化HTML内容和标签页列表。注意简化过程会过滤边栏、浮动元素等非主体内容。
tabs_only: 仅返回标签页列表不获取HTML内容节省token
switch_tab_id: 可选参数,如果提供,则在扫描前切换到该标签页。
应当多用execute_js少全量观察html。
"""
global driver
try:
if driver is None: first_init_driver()
if len(driver.get_all_sessions()) == 0:
return {"status": "error", "msg": "没有可用的浏览器标签页查L3记忆分析原因。"}
tabs = []
for sess in driver.get_all_sessions():
sess.pop('connected_at', None)
sess.pop('type', None)
sess['url'] = sess.get('url', '')[:50] + ("..." if len(sess.get('url', '')) > 50 else "")
tabs.append(sess)
if switch_tab_id: driver.default_session_id = switch_tab_id
result = {
"status": "success",
"metadata": {
"tabs_count": len(tabs), "tabs": tabs,
"active_tab": driver.default_session_id
}
}
if not tabs_only:
importlib.reload(simphtml); result["content"] = simphtml.get_html(driver, cutlist=True, maxchars=35000, text_only=text_only)
if text_only: result['content'] = smart_format(result['content'], max_str_len=10000, omit_str='\n\n[omitted long content]\n\n')
return result
except Exception as e:
return {"status": "error", "msg": format_error(e)}
def format_error(e):
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
if tb:
f = tb[-1]
fname = os.path.basename(f.filename)
return f"{exc_type.__name__}: {str(e)} @ {fname}:{f.lineno}, {f.name} -> `{f.line}`"
return f"{exc_type.__name__}: {str(e)}"
def log_memory_access(path):
if 'memory' not in path: return
script_dir = os.path.dirname(os.path.abspath(__file__))
stats_file = os.path.join(script_dir, 'memory/file_access_stats.json')
try:
with open(stats_file, 'r', encoding='utf-8') as f: stats = json.load(f)
except: stats = {}
fname = os.path.basename(path)
stats[fname] = {'count': stats.get(fname, {}).get('count', 0) + 1, 'last': datetime.now().strftime('%Y-%m-%d')}
with open(stats_file, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2, ensure_ascii=False)
def web_execute_js(script, switch_tab_id=None, no_monitor=False):
"""执行 JS 脚本来控制浏览器,并捕获结果和页面变化"""
global driver
try:
if driver is None: first_init_driver()
if len(driver.get_all_sessions()) == 0: return {"status": "error", "msg": "没有可用的浏览器标签页查L3记忆分析原因。"}
if switch_tab_id: driver.default_session_id = switch_tab_id
result = simphtml.execute_js_rich(script, driver, no_monitor=no_monitor)
return result
except Exception as e: return {"status": "error", "msg": format_error(e)}
def expand_file_refs(text, base_dir=None):
"""展开文本中的 {{file:路径:起始行:结束行}} 引用为实际文件内容。
可与普通文本混排。展开失败抛 ValueError。
base_dir: 相对路径的基准目录,默认为进程 cwd"""
pattern = r'\{\{file:(.+?):(\d+):(\d+)\}\}'
def replacer(match):
path, start, end = match.group(1), int(match.group(2)), int(match.group(3))
path = os.path.abspath(os.path.join(base_dir or '.', path))
if not os.path.isfile(path): raise ValueError(f"引用文件不存在: {path}")
with open(path, 'r', encoding='utf-8') as f: lines = f.readlines()
if start < 1 or end > len(lines) or start > end: raise ValueError(f"行号越界: {path}{len(lines)}行, 请求{start}-{end}")
return ''.join(lines[start-1:end])
return re.sub(pattern, replacer, text)
def file_patch(path: str, old_content: str, new_content: str):
"""在文件中寻找唯一的 old_content 块并替换为 new_content"""
path = str(Path(path).resolve())
try:
if not os.path.exists(path): return {"status": "error", "msg": "文件不存在"}
with open(path, 'r', encoding='utf-8') as f: full_text = f.read()
if not old_content: return {"status": "error", "msg": "old_content 为空,请确认 arguments"}
count = full_text.count(old_content)
if count == 0: return {"status": "error", "msg": "未找到匹配的旧文本块,建议:先用 file_read 确认当前内容,再分小段进行 patch。若多次失败则询问用户严禁自行使用 overwrite 或代码替换。"}
if count > 1: return {"status": "error", "msg": f"找到 {count} 处匹配,无法确定唯一位置。请提供更长、更具体的旧文本块以确保唯一性。建议:包含上下文行来增强特征,或分小段逐个修改。"}
updated_text = full_text.replace(old_content, new_content)
with open(path, 'w', encoding='utf-8') as f: f.write(updated_text)
return {"status": "success", "msg": "文件局部修改成功"}
except Exception as e: return {"status": "error", "msg": str(e)}
_read_dirs = set()
def _scan_files(base, depth=2):
try:
for e in os.scandir(base):
if e.is_file(): yield (e.name, e.path)
elif depth > 0 and e.is_dir(follow_symlinks=False): yield from _scan_files(e.path, depth - 1)
except (PermissionError, OSError): pass
def file_read(path, start=1, keyword=None, count=200, show_linenos=True):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
stream = ((i, l.rstrip('\r\n')) for i, l in enumerate(f, 1))
stream = itertools.dropwhile(lambda x: x[0] < start, stream)
if keyword:
before = collections.deque(maxlen=count//3)
for i, l in stream:
if keyword.lower() in l.lower():
res = list(before) + [(i, l)] + list(itertools.islice(stream, count - len(before) - 1))
break
before.append((i, l))
else: return f"Keyword '{keyword}' not found after line {start}. Falling back to content from line {start}:\n\n" \
+ file_read(path, start, None, count, show_linenos)
else: res = list(itertools.islice(stream, count))
realcnt = len(res); L_MAX = min(max(100, 256000//max(realcnt,1)), 8000); TAG = " ... [TRUNCATED]"
remaining = sum(1 for _ in itertools.islice(stream, 5000))
total_lines = (res[0][0] - 1 if res else start - 1) + realcnt + remaining
total_tag = "[FILE] Total " + (f"{total_lines}+" if remaining >= 5000 else str(total_lines)) + ' lines\n'
res = [(i, l if len(l) <= L_MAX else l[:L_MAX] + TAG) for i, l in res]
result = "\n".join(f"{i}|{l}" if show_linenos else l for i, l in res)
if show_linenos: result = total_tag + result
_read_dirs.add(os.path.dirname(os.path.abspath(path)))
return result
except FileNotFoundError:
msg = f"Error: File not found: {path}"
try:
tgt = os.path.basename(path); scan = os.path.dirname(os.path.dirname(os.path.abspath(path)))
roots = [scan] + [d for d in _read_dirs if not d.startswith(scan)]
cands = list(itertools.islice((c for base in roots for c in _scan_files(base)), 2000))
top = sorted([(difflib.SequenceMatcher(None, tgt.lower(), c[0].lower()).ratio(), c) for c in cands[:2000]], key=lambda x: -x[0])[:5]
top = [(s, c) for s, c in top if s > 0.3]
if top: msg += "\n\nDid you mean:\n" + "\n".join(f" {c[1]} ({s:.0%})" for s, c in top)
except Exception: pass
return msg
except Exception as e: return f"Error: {str(e)}"
def smart_format(data, max_str_len=100, omit_str=' ... '):
if not isinstance(data, str): data = str(data)
if len(data) < max_str_len + len(omit_str)*2: return data
return f"{data[:max_str_len//2]}{omit_str}{data[-max_str_len//2:]}"
def consume_file(dr, file):
if dr and os.path.exists(os.path.join(dr, file)):
with open(os.path.join(dr, file), encoding='utf-8', errors='replace') as f: content = f.read()
os.remove(os.path.join(dr, file))
return content
class GenericAgentHandler(BaseHandler):
'''Generic Agent 工具库,包含多种工具的实现。工具函数自动加上了 do_ 前缀。实际工具名没有前缀。'''
def __init__(self, parent, last_history=None, cwd='./temp'):
self.parent = parent
self.working = {}
self.cwd = cwd; self.current_turn = 0
self.history_info = last_history if last_history else []
self.code_stop_signal = []
def _get_abs_path(self, path):
if not path: return ""
return os.path.abspath(os.path.join(self.cwd, path))
def _extract_code_block(self, response, code_type):
matches = re.findall(rf"```{code_type}\n(.*?)\n```", response.content, re.DOTALL)
return matches[-1].strip() if matches else None
def do_code_run(self, args, response):
'''执行代码片段,有长度限制,不允许代码中放大量数据,如有需要应当通过文件读取进行。'''
code_type = args.get("type", "python")
code = args.get("code") or args.get("script")
if not code:
code = self._extract_code_block(response, code_type)
if not code: return StepOutcome("[Error] Code missing. Use ```{code_type} block or 'script' arg.", next_prompt="\n")
timeout = args.get("timeout", 60)
raw_path = os.path.join(self.cwd, args.get("cwd", './'))
cwd = os.path.normpath(os.path.abspath(raw_path))
code_cwd = os.path.normpath(self.cwd)
if code_type == 'python' and args.get("inline_eval"):
ns = {'handler': self, 'parent': self.parent}
old_cwd = os.getcwd()
try:
os.chdir(cwd)
try: result = repr(eval(code, ns))
except SyntaxError: exec(code, ns); result = ns.get('_r', 'OK')
except Exception as e: result = f'Error: {e}'
finally: os.chdir(old_cwd)
else: result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal)
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome(result, next_prompt=next_prompt)
def do_ask_user(self, args, response):
question = args.get("question", "请提供输入:")
candidates = args.get("candidates", [])
result = ask_user(question, candidates)
yield f"Waiting for your answer ...\n"
return StepOutcome(result, next_prompt="", should_exit=True)
def do_web_scan(self, args, response):
'''获取当前页面内容和标签页列表。也可用于切换标签页。
注意HTML经过简化边栏/浮动元素等可能被过滤。如需查看被过滤的内容请用execute_js。
tabs_only=true时仅返回标签页列表不获取HTML省token
'''
tabs_only = args.get("tabs_only", False)
switch_tab_id = args.get("switch_tab_id", None)
text_only = args.get("text_only", False)
result = web_scan(tabs_only=tabs_only, switch_tab_id=switch_tab_id, text_only=text_only)
content = result.pop("content", None)
yield f'[Info] {str(result)}\n'
if content: result = json.dumps(result, ensure_ascii=False, default=json_default) + f"\n```html\n{content}\n```"
next_prompt = "\n"
return StepOutcome(result, next_prompt=next_prompt)
def do_web_execute_js(self, args, response):
'''web情况下的优先使用工具执行任何js达成对浏览器的*完全*控制。支持将结果保存到文件供后续读取分析。'''
script = args.get("script", "") or self._extract_code_block(response, "javascript")
if not script: return StepOutcome("[Error] Script missing. Use ```javascript block or 'script' arg.", next_prompt="\n")
abs_path = self._get_abs_path(script.strip())
if os.path.isfile(abs_path):
with open(abs_path, 'r', encoding='utf-8') as f: script = f.read()
save_to_file = args.get("save_to_file", "")
switch_tab_id = args.get("switch_tab_id") or args.get("tab_id")
no_monitor = args.get("no_monitor", False)
result = web_execute_js(script, switch_tab_id=switch_tab_id, no_monitor=no_monitor)
if save_to_file and "js_return" in result:
content = str(result["js_return"] or '')
abs_path = self._get_abs_path(save_to_file)
result["js_return"] = smart_format(content, max_str_len=170)
try:
with open(abs_path, 'w', encoding='utf-8') as f: f.write(str(content))
result["js_return"] += f"\n\n[已保存完整内容到 {abs_path}]"
except:
result['js_return'] += f"\n\n[保存失败,无法写入文件 {abs_path}]"
show = smart_format(json.dumps(result, ensure_ascii=False, indent=2, default=json_default), max_str_len=300)
try: print("Web Execute JS Result:", show)
except: pass
yield f"JS 执行结果:\n{show}\n"
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
result = json.dumps(result, ensure_ascii=False, default=json_default)
return StepOutcome(smart_format(result, max_str_len=8000), next_prompt=next_prompt)
def do_file_patch(self, args, response):
path = self._get_abs_path(args.get("path", ""))
yield f"[Action] Patching file: {path}\n"
old_content = args.get("old_content", "")
new_content = args.get("new_content", "")
try: new_content = expand_file_refs(new_content, base_dir=self.cwd)
except ValueError as e:
yield f"[Status] ❌ 引用展开失败: {e}\n"
return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n")
result = file_patch(path, old_content, new_content)
yield f"\n{str(result)}\n"
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome(result, next_prompt=next_prompt)
def do_file_write(self, args, response):
'''用于对整个文件的大量处理精细修改要用file_patch。
需要将要写入的内容放在<file_content>标签内,或者放在代码块中。
'''
path = self._get_abs_path(args.get("path", ""))
mode = args.get("mode", "overwrite") # overwrite/append/prepend
action_str = {"prepend": "Prepending to", "append": "Appending to"}.get(mode, "Overwriting")
yield f"[Action] {action_str} file: {os.path.basename(path)}\n"
def extract_robust_content(text):
tag = re.search(r"<file_content[^>]*>(.*)</file_content>", text, re.DOTALL)
if tag: return tag.group(1).strip()
s, e = text.find("```"), text.rfind("```")
if -1 < s < e: return text[text.find("\n", s)+1 : e].strip()
return None
blocks = extract_robust_content(response.content)
if not blocks:
yield f"[Status] ❌ 失败: 未在回复中找到<file_content>代码块内容\n"
return StepOutcome({"status": "error", "msg": "No content found. Put content inside <file_content>...</file_content> tags in your reply body and call file_write."}, next_prompt="\n")
try:
new_content = expand_file_refs(blocks, base_dir=self.cwd)
if mode == "prepend":
old = open(path, 'r', encoding="utf-8").read() if os.path.exists(path) else ""
open(path, 'w', encoding="utf-8").write(new_content + old)
else:
with open(path, 'a' if mode == "append" else 'w', encoding="utf-8") as f: f.write(new_content)
yield f"[Status] ✅ {mode.capitalize()} 成功 ({len(new_content)} bytes)\n"
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome({"status": "success", 'writed_bytes': len(new_content)}, next_prompt=next_prompt)
except Exception as e:
yield f"[Status] ❌ 写入异常: {str(e)}\n"
return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n")
def do_file_read(self, args, response):
'''读取文件内容。从第start行开始读取。如有keyword则返回第一个keyword(忽略大小写)周边内容'''
path = self._get_abs_path(args.get("path", ""))
yield f"\n[Action] Reading file: {path}\n"
start = args.get("start", 1)
count = args.get("count", 200)
keyword = args.get("keyword")
show_linenos = args.get("show_linenos", True)
result = file_read(path, start=start, keyword=keyword,
count=count, show_linenos=show_linenos)
if show_linenos and not result.startswith("Error:"): result = '由于设置了show_linenos以下返回信息为(行号|)内容 。\n' + result
if ' ... [TRUNCATED]' in result: result += '\n\n(某些行被截断,如需完整内容可改用 code_run 读取)'
result = smart_format(result, max_str_len=20000, omit_str='\n\n[omitted long content]\n\n')
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
log_memory_access(path)
if 'memory' in path or 'sop' in path:
next_prompt += "\n[SYSTEM TIPS] 正在读取记忆或SOP文件若决定按sop执行请提取sop中的关键点特别是靠后的update working memory."
return StepOutcome(result, next_prompt=next_prompt)
def _in_plan_mode(self): return self.working.get('in_plan_mode')
def _exit_plan_mode(self): self.working.pop('in_plan_mode', None)
def enter_plan_mode(self, plan_path):
self.working['in_plan_mode'] = plan_path; self.max_turns = 80
print(f"[Info] Entered plan mode with plan file: {plan_path}"); return plan_path
def _check_plan_completion(self):
if not os.path.isfile(p:=self._in_plan_mode() or ''): return None
try: return len(re.findall(r'\[ \]', open(p, encoding='utf-8', errors='replace').read()))
except: return None
def do_update_working_checkpoint(self, args, response):
'''为整个任务设定后续需要临时记忆的重点。'''
key_info = args.get("key_info", "")
related_sop = args.get("related_sop", "")
if "key_info" in args: self.working['key_info'] = key_info
if "related_sop" in args: self.working['related_sop'] = related_sop
self.working['passed_sessions'] = 0
yield f"[Info] Updated key_info and related_sop.\n"
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
#next_prompt += '\n[SYSTEM TIPS] 此函数一般在任务开始或中间时调用如果任务已成功完成应该是start_long_term_update用于结算长期记忆。\n'
return StepOutcome({"result": "working key_info updated"}, next_prompt=next_prompt)
def do_no_tool(self, args, response):
'''这是一个特殊工具由引擎自主调用不要包含在TOOLS_SCHEMA里。
当模型在一轮中未显式调用任何工具时,由引擎自动触发。
二次确认仅在回复几乎只包含<thinking>/<summary>和一段大代码块时触发。'''
content = getattr(response, 'content', '') or ""
if not response or not content.strip():
yield "[Warn] LLM returned an empty response. Retrying...\n"
return StepOutcome({}, next_prompt="[System] Blank response, regenerate and tooluse")
if '未收到完整响应 !!!]' in content[-100:]:
return StepOutcome({}, next_prompt="[System] Incomplete response. Regenerate and tooluse.")
if 'max_tokens !!!]' in content[-100:]:
return StepOutcome({}, next_prompt="[System] max_tokens limit reached. Use multi small steps to do it.")
if self._in_plan_mode() and any(kw in content for kw in ['任务完成', '全部完成', '已完成所有', '🏁']):
if 'VERDICT' not in content and '[VERIFY]' not in content and '验证subagent' not in content:
yield "[Warn] Plan模式完成声明拦截。\n"
return StepOutcome({}, next_prompt="⛔ [验证拦截] 检测到你在plan模式下声称完成但未执行[VERIFY]验证步骤。请先按plan_sop §四启动验证subagent获得VERDICT后才能声称完成。")
# 2. 检测"包含较大代码块但未调用工具"的情况
# 关键特征恰好1个大代码块 + 代码块直接结尾(后面只有空白)
code_block_pattern = r"```[a-zA-Z0-9_]*\n[\s\S]{50,}?```"
blocks = re.findall(code_block_pattern, content)
if len(blocks) == 1:
m = re.search(code_block_pattern, content)
after_block = content[m.end():]
if not after_block.strip():
residual = content.replace(m.group(0), "")
residual = re.sub(r"<thinking>[\s\S]*?</thinking>", "", residual, flags=re.IGNORECASE)
residual = re.sub(r"<summary>[\s\S]*?</summary>", "", residual, flags=re.IGNORECASE)
clean_residual = re.sub(r"\s+", "", residual)
if len(clean_residual) <= 30:
yield "[Info] Detected large code block without tool call and no extra natural language. Requesting clarification.\n"
next_prompt = (
"[System] 检测到你在上一轮回复中主要内容是较大代码块,且本轮未调用任何工具。\n"
"如果这些代码需要执行、写入文件或进一步分析,请重新组织回复并显式调用相应工具"
"例如code_run、file_write、file_patch 等);\n"
"如果只是向用户展示或讲解代码片段,请在回复中补充自然语言说明,"
"并明确是否还需要额外的实际操作。"
)
return StepOutcome({}, next_prompt=next_prompt)
if self._in_plan_mode():
remaining = self._check_plan_completion()
if remaining == 0:
self._exit_plan_mode(); yield "[Info] Plan完成plan.md中0个[ ]残留退出plan模式。\n"
yield "[Info] Final response to user.\n"
return StepOutcome(response, next_prompt=None)
def do_start_long_term_update(self, args, response):
'''Agent觉得当前任务完成后有重要信息需要记忆时调用此工具。'''
prompt = '''### [总结提炼经验] 既然你觉得当前任务有重要信息需要记忆,请提取最近一次任务中【事实验证成功且长期有效】的环境事实、用户偏好、重要步骤,更新记忆。
本工具是标记开启结算过程,若已在更新记忆过程或没有值得记忆的点,忽略本次调用。
**提取行动验证成功的信息**
- **环境事实**(路径/凭证/配置)→ `file_patch` 更新 L2同步 L1
- **复杂任务经验**(关键坑点/前置条件/重要步骤)→ L3 精简 SOP只记你被坑得多次重试的核心要点
**禁止**:临时变量、具体推理过程、未验证信息、通用常识、你可以轻松复现的细节。
**操作**严格遵循提供的L0的记忆更新SOP。先 `file_read` 看现有 → 判断类型 → 最小化更新 → 无新内容跳过,保证对记忆库最小局部修改。\n
''' + get_global_memory()
yield "[Info] Start distilling good memory for long-term storage.\n"
path = './memory/memory_management_sop.md'
if os.path.exists(path): result = file_read(path, show_linenos=False)
else: result = "Memory Management SOP not found. Do not update memory."
return StepOutcome(result, next_prompt=prompt)
def _get_anchor_prompt(self, skip=False):
if skip: return "\n"
h_str = "\n".join(self.history_info[-20:])
prompt = f"\n### [WORKING MEMORY]\n<history>\n{h_str}\n</history>"
prompt += f"\nCurrent turn: {self.current_turn}\n"
if self.working.get('key_info'): prompt += f"\n<key_info>{self.working.get('key_info')}</key_info>"
if self.working.get('related_sop'): prompt += f"\n有不清晰的地方请再次读取{self.working.get('related_sop')}"
if getattr(self.parent, 'verbose', False):
try: print(prompt)
except: pass
return prompt
def turn_end_callback(self, response, tool_calls, tool_results, turn, next_prompt, exit_reason):
_c = re.sub(r'```.*?```|<thinking>.*?</thinking>', '', response.content, flags=re.DOTALL)
rsumm = re.search(r"<summary>(.*?)</summary>", _c, re.DOTALL)
if rsumm: summary = rsumm.group(1).strip()
else:
tc = tool_calls[0]; tool_name, args = tc['tool_name'], tc['args'] # at least one because no_tool
clean_args = {k: v for k, v in args.items() if not k.startswith('_')}
summary = f"调用工具{tool_name}, args: {clean_args}"
if tool_name == 'no_tool': summary = "直接回答了用户问题"
next_prompt += "\n[DANGER] 上一轮遗漏了<summary>,已根据物理动作自动补全。在下次回复中记得<summary>协议。"
summary = smart_format(summary, max_str_len=100)
self.history_info.append(f'[Agent] {summary}')
if turn % 35 == 0 and 'plan' not in str(self.working.get('related_sop')):
next_prompt += f"\n\n[DANGER] 已连续执行第 {turn} 轮。你必须总结情况进行ask_user不允许继续重试。"
elif turn % 7 == 0:
next_prompt += f"\n\n[DANGER] 已连续执行第 {turn} 轮。禁止无效重试。若无有效进展必须切换策略1. 探测物理边界 2. 请求用户协助。如有需要,可调用 update_working_checkpoint 保存关键上下文。"
elif turn % 10 == 0: next_prompt += get_global_memory()
if (_plan := self._in_plan_mode()) and turn >= 10 and turn % 5 == 0:
next_prompt = f"[Plan Hint] 你正在计划模式。必须 file_read({_plan}) 确认当前步骤,回复开头引用:📌 当前步骤:...\n\n" + next_prompt
if _plan and turn >= 70: next_prompt += f"\n\n[DANGER] Plan模式已运行 {turn} 轮,已达上限。必须 ask_user 汇报进度并确认是否继续。"
injkeyinfo = consume_file(self.parent.task_dir, '_keyinfo')
injprompt = consume_file(self.parent.task_dir, '_intervene')
if injkeyinfo: self.working['key_info'] = self.working.get('key_info', '') + f"\n[MASTER] {injkeyinfo}"
if injprompt: next_prompt += f"\n\n[MASTER] {injprompt}\n"
for hook in getattr(self.parent, '_turn_end_hooks', {}).values(): hook(locals()) # current readonly
return next_prompt
def get_global_memory():
prompt = "\n"
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
suffix = '_en' if os.environ.get('GA_LANG', '') == 'en' else ''
with open(os.path.join(script_dir, 'memory/global_mem_insight.txt'), 'r', encoding='utf-8', errors='replace') as f: insight = f.read()
with open(os.path.join(script_dir, f'assets/insight_fixed_structure{suffix}.txt'), 'r', encoding='utf-8') as f: structure = f.read()
prompt += f'cwd = {os.path.join(script_dir, "temp")} (./)\n'
prompt += f"\n[Memory] (../memory)\n"
prompt += structure + '\n../memory/global_mem_insight.txt:\n'
prompt += insight + "\n"
except FileNotFoundError: pass
return prompt