diff --git a/agent_loop.py b/agent_loop.py index e30eecb..cfd2702 100644 --- a/agent_loop.py +++ b/agent_loop.py @@ -32,17 +32,23 @@ def json_default(o): if isinstance(o, set): return list(o) return str(o) +def get_pretty_json(data): + if isinstance(data, dict) and "script" in data: + data = data.copy() + data["script"] = data["script"].replace("; ", ";\n ") + return json.dumps(data, indent=2, ensure_ascii=False).replace('\\n', '\n') + def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, max_turns=15): messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_input} ] for turn in range(max_turns): - yield f"\n[🤖 LLM Thinking (Turn {turn+1})] ..." + yield f"**LLM Running (Turn {turn+1}) ...**\n\n" response = client.chat(messages=messages, tools=tools_schema) - if response.thinking: yield '' + response.thinking + '\n' - yield response.content + if response.thinking: yield '' + response.thinking + '\n\n' + yield response.content + '\n\n' if not response.tool_calls: tool_name, args = 'no_tool', {} @@ -52,8 +58,12 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, args = json.loads(tool_call.function.arguments) if tool_name == 'no_tool': pass - else: yield f"\n\n正在调用工具: {tool_name},参数: {args}\n" + else: + yield f"🛠️ **正在调用工具:** `{tool_name}` 📥**参数:**\n" + yield f"````text\n{get_pretty_json(args)}\n````\n" + yield '`````\n' outcome = yield from handler.dispatch(tool_name, args, response) + yield '`````\n' if outcome.next_prompt is None: return {'result': 'CURRENT_TASK_DONE', 'data': outcome.data} if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} diff --git a/ga.py b/ga.py index 0e40171..6e140ab 100644 --- a/ga.py +++ b/ga.py @@ -1,5 +1,5 @@ import sys, os, re -import pyperclip +import pyperclip, threading import json, time from pathlib import Path import subprocess @@ -18,12 +18,10 @@ def code_run(code: str, code_type: str = "python", timeout: int = 60, cwd: str = powershell: 运行单行指令(命令模式) 优先使用python,仅在必要系统操作时使用powershell。 """ - # 统一路径处理 preview = (code[:60].replace('\n', ' ') + '...') if len(code) > 60 else code.strip() - yield f"\n[Action] Running {code_type} in {os.path.basename(cwd)}: {preview}\n" + yield f"[Action] Running {code_type} in {os.path.basename(cwd)}: {preview}\n" cwd = cwd or os.getcwd() if code_type == "python": - # Python 依然建议走文件,因为模型生成的逻辑通常包含多行、import 和类定义 tmp_file = tempfile.NamedTemporaryFile(suffix=".py", delete=False, mode='w', encoding='utf-8') tmp_file.write(code) tmp_path = tmp_file.name @@ -41,49 +39,47 @@ def code_run(code: str, code_type: str = "python", timeout: int = 60, cwd: str = startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = 0 # SW_HIDE full_stdout = [] - full_stderr = [] + + def stream_reader(proc, logs): + for line_bytes in iter(proc.stdout.readline, b''): + try: line = line_bytes.decode('utf-8') + except UnicodeDecodeError: line = line_bytes.decode('gbk', errors='ignore') + logs.append(line) + print(line, end="") + try: process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, cwd=cwd, startupinfo=startupinfo ) - for line_bytes in iter(process.stdout.readline, b''): - try: - line = line_bytes.decode('utf-8') - except UnicodeDecodeError: - line = line_bytes.decode('gbk', errors='ignore') - print(line, end="") - full_stdout.append(line) + start_t = time.time() + t = threading.Thread(target=stream_reader, args=(process, full_stdout), daemon=True) + t.start() + + while t.is_alive(): + if time.time() - start_t > timeout: + process.kill() + full_stdout.append("\n[Timeout Error] 超时强制终止") + break + time.sleep(0.2) + + t.join(timeout=1) + exit_code = process.poll() - stdout_rem, stderr_raw = process.communicate(timeout=timeout) - if stdout_rem: - try: rem_str = stdout_rem.decode('utf-8') - except UnicodeDecodeError: - rem_str = stdout_rem.decode('gbk', errors='ignore') - full_stdout.append(rem_str) - - if stderr_raw: - try: stderr_str = stderr_raw.decode('utf-8') - except UnicodeDecodeError: - stderr_str = stderr_raw.decode('gbk', errors='ignore') - full_stderr.append(stderr_str) - print(f"Error: {stderr_str}") - - status = "success" if process.returncode == 0 else "error" stdout_str = "".join(full_stdout) - stderr_str = "".join(full_stderr) - status_icon = "✅" if process.returncode == 0 else "❌" - output_snippet = (stdout_str[:200] + '...') if len(stdout_str) > 200 else stdout_str - yield f"[Status] {status_icon} Exit Code: {process.returncode}\n[Stdout] {output_snippet}\n" + status = "success" if exit_code == 0 else "error" + status_icon = "✅" if exit_code == 0 else "❌" + if exit_code is None: status_icon = "⏳" + output_snippet = (stdout_str[:100] + '...' + stdout_str[-100:]) if len(stdout_str) > 300 else stdout_str + yield f"[Status] {status_icon} Exit Code: {exit_code}\n[Stdout]\n{output_snippet}\n" + if process.stdout: process.stdout.close() return { "status": status, "stdout": stdout_str[-2000:], - "stderr": stderr_str[-2000:], - "exit_code": process.returncode + "exit_code": exit_code } - except subprocess.TimeoutExpired: - return {"status": "error", "msg": "Timeout"} except Exception as e: + if 'process' in locals(): process.kill() return {"status": "error", "msg": str(e)} finally: if code_type == "python" and tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) @@ -123,8 +119,7 @@ def first_init_driver(): def web_scan(focus_item="", switch_tab_id=None): """ 利用 get_html 获取清洗后的网页内容。 - focus_item: 语义过滤指令。如果用户在找特定内容(如“小米汽车”), - 算法会优先保留包含该关键词的列表项。 + focus_item: 语义过滤指令。如果用户在找特定内容(如“小米汽车”),算法会优先保留包含该关键词的列表项。 switch_tab_id: 可选参数,如果提供,则在扫描前切换到该标签页。 应当多用execute_js,少全量观察html。 """ @@ -220,6 +215,17 @@ def file_read(path, start=1, count=100, show_linenos=True): except Exception as e: return f"Error: {str(e)}" +def smart_format(data, max_depth=2, max_str_len=100): + def truncate(obj, depth): + if isinstance(obj, str): + if len(obj) > max_str_len: return f"{obj[:max_str_len//2]} ... {obj[-max_str_len//2:]}" + return obj + if depth >= max_depth: return truncate(str(obj), depth + 1) + if isinstance(obj, dict): return {k: truncate(v, depth + 1) for k, v in obj.items()} + if isinstance(obj, list): return [truncate(i, depth + 1) for i in obj] + return obj + return json.dumps(truncate(data, 0), indent=2, ensure_ascii=False, default=str) + class GenericAgentHandler(BaseHandler): ''' Generic Agent 工具库,包含多种工具的实现。工具函数自动加上了 do_ 前缀。实际工具名没有前缀。 @@ -267,7 +273,7 @@ class GenericAgentHandler(BaseHandler): switch_tab_id = args.get("switch_tab_id", None) result = web_scan(focus_item, switch_tab_id=switch_tab_id) content = result.pop("content", None) - yield f'\n{str(result)}\n' + yield f'[Info] {str(result)}\n' next_prompt = f"```html\n{content}\n```" return StepOutcome(result, next_prompt=next_prompt) @@ -284,16 +290,17 @@ class GenericAgentHandler(BaseHandler): with open(abs_path, 'w', encoding='utf-8') as f: f.write(str(content)) result["js_return"] = content[:200] + ("..." if len(content) > 200 else "") result["js_return"] += f"\n\n[已保存以上内容到 {abs_path}]" - print("Web Execute JS Result:", result) + print("Web Execute JS Result:", smart_format(result)) + yield f"JS 执行结果:\n{smart_format(result)}\n" return StepOutcome(result, next_prompt=self._get_anchor_prompt()) def do_file_patch(self, args, response): path = self._get_abs_path(args.get("path", "")) - yield f"\n[Action] Patching file: {path}\n" + yield f"[Action] Patching file: {path}\n" old_content = args.get("old_content", "") new_content = args.get("new_content", "") result = file_patch(path, old_content, new_content) - yield str(result) + "\n" + yield f"\n{smart_format(result)}\n" return StepOutcome(result, next_prompt=self._get_anchor_prompt()) def do_file_write(self, args, response): @@ -303,7 +310,7 @@ class GenericAgentHandler(BaseHandler): path = self._get_abs_path(args.get("path", "")) mode = args.get("mode", "overwrite") action_str = "Appending to" if mode == "append" else "Writing" - yield f"\n[Action] {action_str} file: {os.path.basename(path)}\n" + yield f"[Action] {action_str} file: {os.path.basename(path)}\n" def extract_robust_content(text): tag = re.search(r"(.*?)", text, re.DOTALL) @@ -351,7 +358,7 @@ class GenericAgentHandler(BaseHandler): new_focus = args.get("focus", "") if new_plan: self.plan = new_plan if new_focus: self.focus = new_focus - yield f"\n[Info] Updated plan and focus.\n" + yield f"[Info] Updated plan and focus.\n" yield f"New Plan:\n{self.plan}\n\n" yield f"New Focus:\n{self.focus}\n" return StepOutcome({"status": "success"}, @@ -360,11 +367,11 @@ class GenericAgentHandler(BaseHandler): def do_no_tool(self, args, response): '''这是一个特殊工具,由引擎自主调用,不要包含在TOOLS_SCHEMA里。 ''' - yield "\n\n[Info] No tool called. Final response to user.\n" + yield "[Info] No tool called. Final response to user.\n" return StepOutcome(response, next_prompt=None, should_exit=True) def _get_anchor_prompt(self): - prompt = f"\n提醒: \n{self.user_input}\n" + prompt = f"\n提醒: 用户原始输入:\n{self.user_input}\n" if self.plan: prompt += f"\n{self.plan}\n\n" if self.focus: prompt += f"\n{self.focus}\n\n" prompt += "\n请继续执行下一步。"