refactor: agent loop done_hooks, inline_eval, scheduler logging & health_check
- agent_loop: for→while, _done_hooks callback mechanism, max_turns=40 - agentmain: pass self to handler, abort clears task queue - stapp: heartbeat yield for Streamlit StopException detection - ga: _inline_eval param for in-process eval, no_tool thresholds tuned - scheduler: logging, max_delay_hours, weekday repeat, health_check()
This commit is contained in:
@@ -46,14 +46,15 @@ def get_pretty_json(data):
|
||||
data["script"] = data["script"].replace("; ", ";\n ")
|
||||
return json.dumps(data, indent=2, ensure_ascii=False).replace('\\n', '\n')
|
||||
|
||||
def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, max_turns=15, verbose=True, initial_user_content=None):
|
||||
def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, max_turns=40, verbose=True, initial_user_content=None):
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": initial_user_content if initial_user_content is not None else user_input}
|
||||
]
|
||||
for turn in range(max_turns):
|
||||
yield f"**LLM Running (Turn {turn+1}) ...**\n\n"
|
||||
if (turn+1) % 10 == 0: client.last_tools = '' # 每10轮重置一次工具描述,避免上下文过大导致的模型性能下降
|
||||
turn = 0; handler._done_hooks = []; handler.max_turns = max_turns
|
||||
while turn < handler.max_turns:
|
||||
turn += 1; yield f"**LLM Running (Turn {turn}) ...**\n\n"
|
||||
if turn%10 == 0: client.last_tools = '' # 每10轮重置一次工具描述,避免上下文过大导致的模型性能下降
|
||||
response_gen = client.chat(messages=messages, tools=tools_schema)
|
||||
if verbose:
|
||||
response = yield from response_gen
|
||||
@@ -66,7 +67,7 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema,
|
||||
else: tool_calls = [{'tool_name': tc.function.name, 'args': json.loads(tc.function.arguments)}
|
||||
for tc in response.tool_calls]
|
||||
|
||||
next_prompt = ""
|
||||
next_prompt = ""; should_exit = None
|
||||
for ii, tc in enumerate(tool_calls):
|
||||
tool_name, args = tc['tool_name'], tc['args']
|
||||
if tool_name == 'no_tool': pass
|
||||
@@ -74,7 +75,7 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema,
|
||||
showarg = get_pretty_json(args)
|
||||
if not verbose and len(showarg) > 200: showarg = showarg[:200] + ' ...'
|
||||
yield f"🛠️ **正在调用工具:** `{tool_name}` 📥**参数:**\n````text\n{showarg}\n````\n"
|
||||
handler.current_turn = turn + 1
|
||||
handler.current_turn = turn
|
||||
gen = handler.dispatch(tool_name, args, response, index=ii)
|
||||
if verbose:
|
||||
yield '`````\n'
|
||||
@@ -82,14 +83,17 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema,
|
||||
yield '`````\n'
|
||||
else: outcome = exhaust(gen)
|
||||
|
||||
if outcome.next_prompt is None: return {'result': 'CURRENT_TASK_DONE', 'data': outcome.data}
|
||||
if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data}
|
||||
if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} # should_exit is only used for immediate exit
|
||||
if not outcome.next_prompt:
|
||||
should_exit = {'result': 'CURRENT_TASK_DONE', 'data': outcome.data}; break
|
||||
if outcome.next_prompt.startswith('未知工具'): client.last_tools = ''
|
||||
|
||||
if outcome.data is not None:
|
||||
datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data)
|
||||
next_prompt += f"<tool_result>\n{datastr}\n</tool_result>\n\n"
|
||||
next_prompt += outcome.next_prompt
|
||||
next_prompt = handler.next_prompt_patcher(next_prompt, None, turn+1)
|
||||
next_prompt += outcome.next_prompt;
|
||||
if not next_prompt:
|
||||
if len(handler._done_hooks) == 0: return should_exit
|
||||
next_prompt += handler._done_hooks.pop(0)
|
||||
next_prompt = handler.next_prompt_patcher(next_prompt, None, turn)
|
||||
messages = [{"role": "user", "content": next_prompt}] # just new message, history is kept in *Session
|
||||
return {'result': 'MAX_TURNS_EXCEEDED'}
|
||||
|
||||
@@ -100,7 +100,7 @@ class GeneraticAgent:
|
||||
|
||||
sys_prompt = get_system_prompt()
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
handler = GenericAgentHandler(None, self.history, os.path.join(script_dir, 'temp'))
|
||||
handler = GenericAgentHandler(self, self.history, os.path.join(script_dir, 'temp'))
|
||||
if self.handler and 'key_info' in self.handler.working:
|
||||
ki = re.sub(r'\n\[SYSTEM\] 此为.*?工作记忆[。\n]*', '', self.handler.working['key_info']) # 去旧
|
||||
handler.working['key_info'] = ki
|
||||
@@ -115,6 +115,7 @@ class GeneraticAgent:
|
||||
initial_user_content = build_multimodal_content(user_input, images)
|
||||
elif images:
|
||||
print(f"[INFO] backend {type(self.llmclient.backend).__name__} does not support direct multimodal input, fallback to text attachment hints.")
|
||||
# although new handler, the **full** history is in llmclient, so it is full history!
|
||||
gen = agent_runner_loop(self.llmclient, sys_prompt, user_input,
|
||||
handler, TOOLS_SCHEMA, max_turns=40, verbose=self.verbose,
|
||||
initial_user_content=initial_user_content)
|
||||
@@ -135,6 +136,9 @@ class GeneraticAgent:
|
||||
print(f"Backend Error: {format_error(e)}")
|
||||
display_queue.put({'done': full_resp + f'\n```\n{format_error(e)}\n```', 'source': source})
|
||||
finally:
|
||||
if self.stop_sig:
|
||||
print('User aborted the task.')
|
||||
with self.task_queue.mutex: self.task_queue.queue.clear()
|
||||
self.is_running = self.stop_sig = False
|
||||
self.task_queue.task_done()
|
||||
if self.handler is not None: self.handler.code_stop_signal.append(1)
|
||||
|
||||
@@ -8,7 +8,7 @@ except: pass
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
import streamlit as st
|
||||
import time, json, re, threading
|
||||
import time, json, re, threading, queue
|
||||
from agentmain import GeneraticAgent
|
||||
|
||||
st.set_page_config(page_title="Cowork", layout="wide")
|
||||
@@ -69,14 +69,18 @@ with st.sidebar: render_sidebar()
|
||||
|
||||
def agent_backend_stream(prompt):
|
||||
display_queue = agent.put_task(prompt, source="user")
|
||||
response = ''
|
||||
try:
|
||||
while True:
|
||||
item = display_queue.get()
|
||||
if 'next' in item: yield item['next']
|
||||
try: item = display_queue.get(timeout=1)
|
||||
except queue.Empty:
|
||||
yield response # heartbeat: let outer st.markdown() run → Streamlit checks StopException
|
||||
continue
|
||||
if 'next' in item:
|
||||
response = item['next']; yield response
|
||||
if 'done' in item:
|
||||
yield item['done']; break
|
||||
finally:
|
||||
agent.abort()
|
||||
finally: agent.abort()
|
||||
|
||||
if "messages" not in st.session_state: st.session_state.messages = []
|
||||
for msg in st.session_state.messages:
|
||||
|
||||
28
ga.py
28
ga.py
@@ -299,7 +299,12 @@ class GenericAgentHandler(BaseHandler):
|
||||
raw_path = os.path.join(self.cwd, args.get("cwd", './'))
|
||||
cwd = os.path.normpath(os.path.abspath(raw_path))
|
||||
code_cwd = os.path.normpath(self.cwd)
|
||||
result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal)
|
||||
if args.get("_inline_eval"):
|
||||
ns = {'handler': self, 'parent': self.parent}
|
||||
try: result = repr(eval(code, ns))
|
||||
except SyntaxError: exec(code, ns); result = ns.get('_r', 'OK')
|
||||
except Exception as e: result = f'Error: {e}'
|
||||
else: result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal)
|
||||
next_prompt = self._get_anchor_prompt()
|
||||
return StepOutcome(result, next_prompt=next_prompt)
|
||||
|
||||
@@ -394,8 +399,7 @@ class GenericAgentHandler(BaseHandler):
|
||||
with open(path, 'a' if mode == "append" else 'w', encoding="utf-8") as f: f.write(new_content)
|
||||
yield f"[Status] ✅ {mode.capitalize()} 成功 ({len(new_content)} bytes)\n"
|
||||
next_prompt = self._get_anchor_prompt()
|
||||
return StepOutcome({"status": "success", 'writed_bytes': len(new_content)},
|
||||
next_prompt=next_prompt)
|
||||
return StepOutcome({"status": "success", 'writed_bytes': len(new_content)}, next_prompt=next_prompt)
|
||||
except Exception as e:
|
||||
yield f"[Status] ❌ 写入异常: {str(e)}\n"
|
||||
return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n")
|
||||
@@ -437,31 +441,29 @@ class GenericAgentHandler(BaseHandler):
|
||||
def do_no_tool(self, args, response):
|
||||
'''这是一个特殊工具,由引擎自主调用,不要包含在TOOLS_SCHEMA里。
|
||||
当模型在一轮中未显式调用任何工具时,由引擎自动触发。
|
||||
二次确认仅在回复几乎只包含<thinking>/<summary>和一段大代码块时触发。
|
||||
'''
|
||||
二次确认仅在回复几乎只包含<thinking>/<summary>和一段大代码块时触发。'''
|
||||
content = getattr(response, 'content', '') or ""
|
||||
if not response or not content.strip():
|
||||
yield "[Warn] LLM returned an empty response. Retrying...\n"
|
||||
return StepOutcome({}, next_prompt="[System] Blank response, regenerate and tooluse", should_exit=False)
|
||||
return StepOutcome({}, next_prompt="[System] Blank response, regenerate and tooluse")
|
||||
if '流异常中断,未收到完整响应 !!!]' in content:
|
||||
return StepOutcome({}, next_prompt="[System] Incomplete response. Regenerate and tooluse.", should_exit=False)
|
||||
return StepOutcome({}, next_prompt="[System] Incomplete response. Regenerate and tooluse.")
|
||||
if 'max_tokens !!!]' in content:
|
||||
return StepOutcome({}, next_prompt="[System] max_tokens limit reached. Use multi small steps to do it.", should_exit=False)
|
||||
return StepOutcome({}, next_prompt="[System] max_tokens limit reached. Use multi small steps to do it.")
|
||||
# 2. 检测“包含较大代码块但未调用工具”的情况
|
||||
# 这里通过三引号代码块 + 最少字符数的方式粗略判断“大段代码”
|
||||
code_block_pattern = r"```[a-zA-Z0-9_]*\n[\s\S]{100,}?```"
|
||||
code_block_pattern = r"```[a-zA-Z0-9_]*\n[\s\S]{300,}?```"
|
||||
m = re.search(code_block_pattern, content)
|
||||
if m:
|
||||
# 仅当 content 由 <thinking> / <summary> 和该代码块构成时才触发二次确认
|
||||
residual = content
|
||||
# 去掉代码块本身
|
||||
residual = residual.replace(m.group(0), "")
|
||||
# 去掉<thinking>和<summary>块(大小写不敏感)
|
||||
residual = re.sub(r"<thinking>[\s\S]*?</thinking>", "", residual, flags=re.IGNORECASE)
|
||||
residual = re.sub(r"<summary>[\s\S]*?</summary>", "", residual, flags=re.IGNORECASE)
|
||||
# 如果去除上述结构后的非空白字符很少,说明没有额外自然语言说明
|
||||
clean_residual = re.sub(r"\s+", "", residual)
|
||||
if len(clean_residual) <= 50:
|
||||
if len(clean_residual) <= 20:
|
||||
yield "[Info] Detected large code block without tool call and no extra natural language. Requesting clarification.\n"
|
||||
next_prompt = (
|
||||
"[System] 检测到你在上一轮回复中主要内容是较大代码块(仅配有<thinking>/<summary>),且本轮未调用任何工具。\n"
|
||||
@@ -470,10 +472,10 @@ class GenericAgentHandler(BaseHandler):
|
||||
"如果只是向用户展示或讲解代码片段,请在回复中补充自然语言说明,"
|
||||
"并明确是否还需要额外的实际操作。"
|
||||
)
|
||||
return StepOutcome({}, next_prompt=next_prompt, should_exit=False)
|
||||
return StepOutcome({}, next_prompt=next_prompt)
|
||||
# 3. 正常情况:直接将回复返回给用户并结束循环
|
||||
yield "[Info] Final response to user.\n"
|
||||
return StepOutcome(response, next_prompt=None, should_exit=True)
|
||||
return StepOutcome(response, next_prompt=None)
|
||||
|
||||
def do_start_long_term_update(self, args, response):
|
||||
'''Agent觉得当前任务完成后有重要信息需要记忆时调用此工具。'''
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import os, json, socket as _socket
|
||||
import os, json, socket as _socket, logging
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出
|
||||
@@ -14,11 +14,24 @@ ONCE = False
|
||||
_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
TASKS = os.path.join(_dir, '../sche_tasks')
|
||||
DONE = os.path.join(_dir, '../sche_tasks/done')
|
||||
_LOG = os.path.join(_dir, '../sche_tasks/scheduler.log')
|
||||
|
||||
# --- 日志 ---
|
||||
_logger = logging.getLogger('scheduler')
|
||||
if not _logger.handlers:
|
||||
_logger.setLevel(logging.INFO)
|
||||
_fh = logging.FileHandler(_LOG, encoding='utf-8')
|
||||
_fh.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M'))
|
||||
_logger.addHandler(_fh)
|
||||
|
||||
# 默认最大延迟窗口(小时),超过此时间不触发
|
||||
DEFAULT_MAX_DELAY = 6
|
||||
|
||||
def _parse_cooldown(repeat):
|
||||
"""解析repeat为冷却时间(比实际周期略短,防漂移)"""
|
||||
if repeat == 'once': return timedelta(days=999999)
|
||||
if repeat == 'daily': return timedelta(hours=20)
|
||||
if repeat in ('daily', 'weekday'): return timedelta(hours=20)
|
||||
if repeat == 'weekly': return timedelta(days=6)
|
||||
if repeat == 'monthly': return timedelta(days=27)
|
||||
if repeat.startswith('every_'):
|
||||
@@ -28,6 +41,7 @@ def _parse_cooldown(repeat):
|
||||
if u == 'h': return timedelta(hours=n)
|
||||
if u == 'm': return timedelta(minutes=n)
|
||||
if u == 'd': return timedelta(days=n)
|
||||
_logger.warning(f'Unknown repeat type: {repeat}, fallback to 20h cooldown')
|
||||
return timedelta(hours=20)
|
||||
|
||||
def _last_run(tid, done_files):
|
||||
@@ -51,22 +65,42 @@ def check():
|
||||
tid = f[:-5]
|
||||
try:
|
||||
task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read())
|
||||
except: continue
|
||||
except Exception as e:
|
||||
_logger.error(f'JSON parse error for {f}: {e}')
|
||||
continue
|
||||
if not task.get('enabled', False): continue
|
||||
|
||||
repeat = task.get('repeat', 'daily')
|
||||
sched = task.get('schedule', '00:00')
|
||||
try:
|
||||
h, m = map(int, sched.split(':'))
|
||||
except Exception as e:
|
||||
_logger.error(f'Invalid schedule format in {f}: {sched!r} ({e})')
|
||||
continue
|
||||
|
||||
# weekday任务:周末跳过
|
||||
if repeat == 'weekday' and now.weekday() >= 5: continue
|
||||
|
||||
# 还没到schedule时间就跳过
|
||||
if now.hour < h or (now.hour == h and now.minute < m): continue
|
||||
|
||||
# 执行窗口检查:超过max_delay小时则跳过(防止开机太晚触发过时任务)
|
||||
max_delay = task.get('max_delay_hours', DEFAULT_MAX_DELAY)
|
||||
sched_minutes = h * 60 + m
|
||||
now_minutes = now.hour * 60 + now.minute
|
||||
if (now_minutes - sched_minutes) > max_delay * 60:
|
||||
_logger.info(f'SKIP {tid}: {now_minutes - sched_minutes}min past schedule, '
|
||||
f'exceeds max_delay={max_delay}h')
|
||||
continue
|
||||
|
||||
# 检查冷却
|
||||
last = _last_run(tid, done_files)
|
||||
cooldown = _parse_cooldown(repeat)
|
||||
if last and (now - last) < cooldown: continue
|
||||
|
||||
# 触发
|
||||
_logger.info(f'TRIGGER {tid} (repeat={repeat}, schedule={sched}, '
|
||||
f'last_run={last})')
|
||||
ts = now.strftime('%Y-%m-%d_%H%M')
|
||||
rpt = os.path.join(DONE, f'{ts}_{tid}.md')
|
||||
prompt = task.get('prompt', '')
|
||||
@@ -76,3 +110,49 @@ def check():
|
||||
f'{prompt}\n\n'
|
||||
f'完成后将执行报告写入 {rpt}。')
|
||||
return None
|
||||
|
||||
def health_check():
|
||||
"""检查所有定时任务的健康状态,返回结构化报告"""
|
||||
if not os.path.isdir(TASKS):
|
||||
return {'error': 'TASKS directory not found'}
|
||||
now = datetime.now()
|
||||
os.makedirs(DONE, exist_ok=True)
|
||||
done_files = set(os.listdir(DONE))
|
||||
results = []
|
||||
for f in sorted(os.listdir(TASKS)):
|
||||
if not f.endswith('.json'): continue
|
||||
tid = f[:-5]
|
||||
try:
|
||||
task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read())
|
||||
except Exception as e:
|
||||
results.append({'task': tid, 'status': 'ERROR', 'detail': f'JSON parse: {e}'})
|
||||
continue
|
||||
|
||||
enabled = task.get('enabled', False)
|
||||
repeat = task.get('repeat', 'daily')
|
||||
sched = task.get('schedule', '00:00')
|
||||
last = _last_run(tid, done_files)
|
||||
cooldown = _parse_cooldown(repeat)
|
||||
|
||||
# 判断健康状态
|
||||
if not enabled:
|
||||
status = 'DISABLED'
|
||||
elif last is None:
|
||||
status = 'NEVER_RUN'
|
||||
elif repeat == 'once':
|
||||
status = 'COMPLETED' if last else 'PENDING'
|
||||
else:
|
||||
# 检查是否超过预期间隔的1.5倍
|
||||
expected_gap = cooldown * 1.25 # 略大于冷却时间
|
||||
if (now - last) > expected_gap:
|
||||
status = 'OVERDUE'
|
||||
else:
|
||||
status = 'HEALTHY'
|
||||
|
||||
results.append({
|
||||
'task': tid, 'status': status, 'enabled': enabled,
|
||||
'repeat': repeat, 'schedule': sched,
|
||||
'last_run': last.strftime('%Y-%m-%d %H:%M') if last else None,
|
||||
'cooldown_hours': cooldown.total_seconds() / 3600,
|
||||
})
|
||||
return results
|
||||
|
||||
Reference in New Issue
Block a user