diff --git a/agent_loop.py b/agent_loop.py index 7eab55d..07a42da 100644 --- a/agent_loop.py +++ b/agent_loop.py @@ -46,14 +46,15 @@ def get_pretty_json(data): data["script"] = data["script"].replace("; ", ";\n ") return json.dumps(data, indent=2, ensure_ascii=False).replace('\\n', '\n') -def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, max_turns=15, verbose=True, initial_user_content=None): +def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, max_turns=40, verbose=True, initial_user_content=None): messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": initial_user_content if initial_user_content is not None else user_input} ] - for turn in range(max_turns): - yield f"**LLM Running (Turn {turn+1}) ...**\n\n" - if (turn+1) % 10 == 0: client.last_tools = '' # 每10轮重置一次工具描述,避免上下文过大导致的模型性能下降 + turn = 0; handler._done_hooks = []; handler.max_turns = max_turns + while turn < handler.max_turns: + turn += 1; yield f"**LLM Running (Turn {turn}) ...**\n\n" + if turn%10 == 0: client.last_tools = '' # 每10轮重置一次工具描述,避免上下文过大导致的模型性能下降 response_gen = client.chat(messages=messages, tools=tools_schema) if verbose: response = yield from response_gen @@ -66,7 +67,7 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, else: tool_calls = [{'tool_name': tc.function.name, 'args': json.loads(tc.function.arguments)} for tc in response.tool_calls] - next_prompt = "" + next_prompt = ""; should_exit = None for ii, tc in enumerate(tool_calls): tool_name, args = tc['tool_name'], tc['args'] if tool_name == 'no_tool': pass @@ -74,22 +75,25 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, showarg = get_pretty_json(args) if not verbose and len(showarg) > 200: showarg = showarg[:200] + ' ...' yield f"🛠️ **正在调用工具:** `{tool_name}` 📥**参数:**\n````text\n{showarg}\n````\n" - handler.current_turn = turn + 1 + handler.current_turn = turn gen = handler.dispatch(tool_name, args, response, index=ii) if verbose: yield '`````\n' outcome = yield from gen yield '`````\n' else: outcome = exhaust(gen) - - if outcome.next_prompt is None: return {'result': 'CURRENT_TASK_DONE', 'data': outcome.data} - if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} + + if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} # should_exit is only used for immediate exit + if not outcome.next_prompt: + should_exit = {'result': 'CURRENT_TASK_DONE', 'data': outcome.data}; break if outcome.next_prompt.startswith('未知工具'): client.last_tools = '' - if outcome.data is not None: datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data) next_prompt += f"\n{datastr}\n\n\n" - next_prompt += outcome.next_prompt - next_prompt = handler.next_prompt_patcher(next_prompt, None, turn+1) + next_prompt += outcome.next_prompt; + if not next_prompt: + if len(handler._done_hooks) == 0: return should_exit + next_prompt += handler._done_hooks.pop(0) + next_prompt = handler.next_prompt_patcher(next_prompt, None, turn) messages = [{"role": "user", "content": next_prompt}] # just new message, history is kept in *Session return {'result': 'MAX_TURNS_EXCEEDED'} diff --git a/agentmain.py b/agentmain.py index 211749c..9df3a5d 100644 --- a/agentmain.py +++ b/agentmain.py @@ -100,7 +100,7 @@ class GeneraticAgent: sys_prompt = get_system_prompt() script_dir = os.path.dirname(os.path.abspath(__file__)) - handler = GenericAgentHandler(None, self.history, os.path.join(script_dir, 'temp')) + handler = GenericAgentHandler(self, self.history, os.path.join(script_dir, 'temp')) if self.handler and 'key_info' in self.handler.working: ki = re.sub(r'\n\[SYSTEM\] 此为.*?工作记忆[。\n]*', '', self.handler.working['key_info']) # 去旧 handler.working['key_info'] = ki @@ -115,6 +115,7 @@ class GeneraticAgent: initial_user_content = build_multimodal_content(user_input, images) elif images: print(f"[INFO] backend {type(self.llmclient.backend).__name__} does not support direct multimodal input, fallback to text attachment hints.") + # although new handler, the **full** history is in llmclient, so it is full history! gen = agent_runner_loop(self.llmclient, sys_prompt, user_input, handler, TOOLS_SCHEMA, max_turns=40, verbose=self.verbose, initial_user_content=initial_user_content) @@ -135,6 +136,9 @@ class GeneraticAgent: print(f"Backend Error: {format_error(e)}") display_queue.put({'done': full_resp + f'\n```\n{format_error(e)}\n```', 'source': source}) finally: + if self.stop_sig: + print('User aborted the task.') + with self.task_queue.mutex: self.task_queue.queue.clear() self.is_running = self.stop_sig = False self.task_queue.task_done() if self.handler is not None: self.handler.code_stop_signal.append(1) diff --git a/frontends/stapp.py b/frontends/stapp.py index a59a8ce..7ccc179 100644 --- a/frontends/stapp.py +++ b/frontends/stapp.py @@ -8,7 +8,7 @@ except: pass sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import streamlit as st -import time, json, re, threading +import time, json, re, threading, queue from agentmain import GeneraticAgent st.set_page_config(page_title="Cowork", layout="wide") @@ -69,14 +69,18 @@ with st.sidebar: render_sidebar() def agent_backend_stream(prompt): display_queue = agent.put_task(prompt, source="user") + response = '' try: while True: - item = display_queue.get() - if 'next' in item: yield item['next'] - if 'done' in item: + try: item = display_queue.get(timeout=1) + except queue.Empty: + yield response # heartbeat: let outer st.markdown() run → Streamlit checks StopException + continue + if 'next' in item: + response = item['next']; yield response + if 'done' in item: yield item['done']; break - finally: - agent.abort() + finally: agent.abort() if "messages" not in st.session_state: st.session_state.messages = [] for msg in st.session_state.messages: diff --git a/ga.py b/ga.py index a97db08..5df48c5 100644 --- a/ga.py +++ b/ga.py @@ -299,7 +299,12 @@ class GenericAgentHandler(BaseHandler): raw_path = os.path.join(self.cwd, args.get("cwd", './')) cwd = os.path.normpath(os.path.abspath(raw_path)) code_cwd = os.path.normpath(self.cwd) - result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal) + if args.get("_inline_eval"): + ns = {'handler': self, 'parent': self.parent} + try: result = repr(eval(code, ns)) + except SyntaxError: exec(code, ns); result = ns.get('_r', 'OK') + except Exception as e: result = f'Error: {e}' + else: result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal) next_prompt = self._get_anchor_prompt() return StepOutcome(result, next_prompt=next_prompt) @@ -394,8 +399,7 @@ class GenericAgentHandler(BaseHandler): with open(path, 'a' if mode == "append" else 'w', encoding="utf-8") as f: f.write(new_content) yield f"[Status] ✅ {mode.capitalize()} 成功 ({len(new_content)} bytes)\n" next_prompt = self._get_anchor_prompt() - return StepOutcome({"status": "success", 'writed_bytes': len(new_content)}, - next_prompt=next_prompt) + return StepOutcome({"status": "success", 'writed_bytes': len(new_content)}, next_prompt=next_prompt) except Exception as e: yield f"[Status] ❌ 写入异常: {str(e)}\n" return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n") @@ -437,31 +441,29 @@ class GenericAgentHandler(BaseHandler): def do_no_tool(self, args, response): '''这是一个特殊工具,由引擎自主调用,不要包含在TOOLS_SCHEMA里。 当模型在一轮中未显式调用任何工具时,由引擎自动触发。 - 二次确认仅在回复几乎只包含/和一段大代码块时触发。 - ''' + 二次确认仅在回复几乎只包含/和一段大代码块时触发。''' content = getattr(response, 'content', '') or "" if not response or not content.strip(): yield "[Warn] LLM returned an empty response. Retrying...\n" - return StepOutcome({}, next_prompt="[System] Blank response, regenerate and tooluse", should_exit=False) + return StepOutcome({}, next_prompt="[System] Blank response, regenerate and tooluse") if '流异常中断,未收到完整响应 !!!]' in content: - return StepOutcome({}, next_prompt="[System] Incomplete response. Regenerate and tooluse.", should_exit=False) + return StepOutcome({}, next_prompt="[System] Incomplete response. Regenerate and tooluse.") if 'max_tokens !!!]' in content: - return StepOutcome({}, next_prompt="[System] max_tokens limit reached. Use multi small steps to do it.", should_exit=False) + return StepOutcome({}, next_prompt="[System] max_tokens limit reached. Use multi small steps to do it.") # 2. 检测“包含较大代码块但未调用工具”的情况 # 这里通过三引号代码块 + 最少字符数的方式粗略判断“大段代码” - code_block_pattern = r"```[a-zA-Z0-9_]*\n[\s\S]{100,}?```" + code_block_pattern = r"```[a-zA-Z0-9_]*\n[\s\S]{300,}?```" m = re.search(code_block_pattern, content) if m: # 仅当 content 由 / 和该代码块构成时才触发二次确认 residual = content - # 去掉代码块本身 residual = residual.replace(m.group(0), "") # 去掉块(大小写不敏感) residual = re.sub(r"[\s\S]*?", "", residual, flags=re.IGNORECASE) residual = re.sub(r"[\s\S]*?", "", residual, flags=re.IGNORECASE) # 如果去除上述结构后的非空白字符很少,说明没有额外自然语言说明 clean_residual = re.sub(r"\s+", "", residual) - if len(clean_residual) <= 50: + if len(clean_residual) <= 20: yield "[Info] Detected large code block without tool call and no extra natural language. Requesting clarification.\n" next_prompt = ( "[System] 检测到你在上一轮回复中主要内容是较大代码块(仅配有/),且本轮未调用任何工具。\n" @@ -470,10 +472,10 @@ class GenericAgentHandler(BaseHandler): "如果只是向用户展示或讲解代码片段,请在回复中补充自然语言说明," "并明确是否还需要额外的实际操作。" ) - return StepOutcome({}, next_prompt=next_prompt, should_exit=False) + return StepOutcome({}, next_prompt=next_prompt) # 3. 正常情况:直接将回复返回给用户并结束循环 yield "[Info] Final response to user.\n" - return StepOutcome(response, next_prompt=None, should_exit=True) + return StepOutcome(response, next_prompt=None) def do_start_long_term_update(self, args, response): '''Agent觉得当前任务完成后有重要信息需要记忆时调用此工具。''' diff --git a/reflect/scheduler.py b/reflect/scheduler.py index 11d77f8..3082708 100644 --- a/reflect/scheduler.py +++ b/reflect/scheduler.py @@ -1,4 +1,4 @@ -import os, json, socket as _socket +import os, json, socket as _socket, logging from datetime import datetime, timedelta # 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出 @@ -14,11 +14,24 @@ ONCE = False _dir = os.path.dirname(os.path.abspath(__file__)) TASKS = os.path.join(_dir, '../sche_tasks') DONE = os.path.join(_dir, '../sche_tasks/done') +_LOG = os.path.join(_dir, '../sche_tasks/scheduler.log') + +# --- 日志 --- +_logger = logging.getLogger('scheduler') +if not _logger.handlers: + _logger.setLevel(logging.INFO) + _fh = logging.FileHandler(_LOG, encoding='utf-8') + _fh.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M')) + _logger.addHandler(_fh) + +# 默认最大延迟窗口(小时),超过此时间不触发 +DEFAULT_MAX_DELAY = 6 def _parse_cooldown(repeat): """解析repeat为冷却时间(比实际周期略短,防漂移)""" if repeat == 'once': return timedelta(days=999999) - if repeat == 'daily': return timedelta(hours=20) + if repeat in ('daily', 'weekday'): return timedelta(hours=20) if repeat == 'weekly': return timedelta(days=6) if repeat == 'monthly': return timedelta(days=27) if repeat.startswith('every_'): @@ -28,6 +41,7 @@ def _parse_cooldown(repeat): if u == 'h': return timedelta(hours=n) if u == 'm': return timedelta(minutes=n) if u == 'd': return timedelta(days=n) + _logger.warning(f'Unknown repeat type: {repeat}, fallback to 20h cooldown') return timedelta(hours=20) def _last_run(tid, done_files): @@ -51,22 +65,42 @@ def check(): tid = f[:-5] try: task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read()) - except: continue + except Exception as e: + _logger.error(f'JSON parse error for {f}: {e}') + continue if not task.get('enabled', False): continue repeat = task.get('repeat', 'daily') sched = task.get('schedule', '00:00') - h, m = map(int, sched.split(':')) + try: + h, m = map(int, sched.split(':')) + except Exception as e: + _logger.error(f'Invalid schedule format in {f}: {sched!r} ({e})') + continue + + # weekday任务:周末跳过 + if repeat == 'weekday' and now.weekday() >= 5: continue # 还没到schedule时间就跳过 if now.hour < h or (now.hour == h and now.minute < m): continue + # 执行窗口检查:超过max_delay小时则跳过(防止开机太晚触发过时任务) + max_delay = task.get('max_delay_hours', DEFAULT_MAX_DELAY) + sched_minutes = h * 60 + m + now_minutes = now.hour * 60 + now.minute + if (now_minutes - sched_minutes) > max_delay * 60: + _logger.info(f'SKIP {tid}: {now_minutes - sched_minutes}min past schedule, ' + f'exceeds max_delay={max_delay}h') + continue + # 检查冷却 last = _last_run(tid, done_files) cooldown = _parse_cooldown(repeat) if last and (now - last) < cooldown: continue # 触发 + _logger.info(f'TRIGGER {tid} (repeat={repeat}, schedule={sched}, ' + f'last_run={last})') ts = now.strftime('%Y-%m-%d_%H%M') rpt = os.path.join(DONE, f'{ts}_{tid}.md') prompt = task.get('prompt', '') @@ -75,4 +109,50 @@ def check(): f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n' f'{prompt}\n\n' f'完成后将执行报告写入 {rpt}。') - return None \ No newline at end of file + return None + +def health_check(): + """检查所有定时任务的健康状态,返回结构化报告""" + if not os.path.isdir(TASKS): + return {'error': 'TASKS directory not found'} + now = datetime.now() + os.makedirs(DONE, exist_ok=True) + done_files = set(os.listdir(DONE)) + results = [] + for f in sorted(os.listdir(TASKS)): + if not f.endswith('.json'): continue + tid = f[:-5] + try: + task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read()) + except Exception as e: + results.append({'task': tid, 'status': 'ERROR', 'detail': f'JSON parse: {e}'}) + continue + + enabled = task.get('enabled', False) + repeat = task.get('repeat', 'daily') + sched = task.get('schedule', '00:00') + last = _last_run(tid, done_files) + cooldown = _parse_cooldown(repeat) + + # 判断健康状态 + if not enabled: + status = 'DISABLED' + elif last is None: + status = 'NEVER_RUN' + elif repeat == 'once': + status = 'COMPLETED' if last else 'PENDING' + else: + # 检查是否超过预期间隔的1.5倍 + expected_gap = cooldown * 1.25 # 略大于冷却时间 + if (now - last) > expected_gap: + status = 'OVERDUE' + else: + status = 'HEALTHY' + + results.append({ + 'task': tid, 'status': status, 'enabled': enabled, + 'repeat': repeat, 'schedule': sched, + 'last_run': last.strftime('%Y-%m-%d %H:%M') if last else None, + 'cooldown_hours': cooldown.total_seconds() / 3600, + }) + return results