diff --git a/agentmain.py b/agentmain.py index 371d8f2..c9cb40c 100644 --- a/agentmain.py +++ b/agentmain.py @@ -190,23 +190,7 @@ if __name__ == '__main__': except Exception as e: print(f'[Reflect] on_done error: {e}') if once: print('[Reflect] ONCE=True, exiting.'); break elif args.scheduled: - script_dir = os.path.dirname(os.path.abspath(__file__)) - def drain(dq, tag): - while 'done' not in (item := dq.get()): pass - open(os.path.join(script_dir, './temp/scheduler.log'), 'a', encoding='utf-8').write(f'[{datetime.now():%m-%d %H:%M}] {tag}\n{item["done"]}\n\n') - while True: - time.sleep(55 + random.random() * 10) - now = datetime.now() - script_dir = os.path.dirname(os.path.abspath(__file__)) - sche_tasks_dir = os.path.join(script_dir, './sche_tasks/pending') - if not os.path.isdir(sche_tasks_dir): continue - for f in os.listdir(sche_tasks_dir): - m = re.match(r'(\d{4}-\d{2}-\d{2})_(\d{4})_', f) - if m and now >= datetime.strptime(f'{m[1]} {m[2]}', '%Y-%m-%d %H%M'): - raw = open(os.path.join(sche_tasks_dir, f), encoding='utf-8').read() - dq = agent.put_task(f'按scheduled_task_sop执行任务文件 ../sche_tasks/pending/{f}(立刻移到running)\n内容:\n{raw}', source='scheduler') - threading.Thread(target=drain, args=(dq, f), daemon=True).start() - break + print('moved to reflect mode') else: agent.inc_out = True while True: diff --git a/launch.pyw b/launch.pyw index 272c0d3..78d4894 100644 --- a/launch.pyw +++ b/launch.pyw @@ -83,7 +83,7 @@ if __name__ == '__main__': try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); sock.bind(('127.0.0.1', 45762)); sock.listen(1) script_dir = os.path.dirname(os.path.abspath(__file__)) - scheduler_proc = subprocess.Popen([sys.executable, os.path.join(script_dir, "agentmain.py"), "--scheduled", "--llm_no", str(args.llm_no)], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0); + scheduler_proc = subprocess.Popen([sys.executable, os.path.join(script_dir, "agentmain.py"), "--reflect", os.path.join(script_dir, "reflect", "scheduler.py"), "--llm_no", str(args.llm_no)], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0); atexit.register(lambda: (scheduler_proc.kill(), sock.close())) print('[Launch] Task Scheduler started') except OSError: diff --git a/memory/scheduled_task_sop.md b/memory/scheduled_task_sop.md index a2396cc..49a269c 100644 --- a/memory/scheduled_task_sop.md +++ b/memory/scheduled_task_sop.md @@ -1,13 +1,21 @@ # 定时任务 SOP -目录:`../sche_tasks/{pending,running,done}/` -文件名:`YYYY-MM-DD_HHMM_描述.md`,内容含prompt和schedule +目录:`../sche_tasks/` 放任务定义JSON,`../sche_tasks/done/` 放执行报告 -## 流程 -1. [AUTO]唤醒 → `datetime.now()`取当前时间,`ls ../sche_tasks/pending/`,文件名时间≤当前→到期,选择一个 -2. **立即rename到running/**(先占再读,防多进程重复领) -3. 读文件执行 -4. 完成→移到done/,**在文件内追加执行报告**供用户查阅 -5. schedule非once→算下次时间,新建文件到pending/ +## 任务JSON格式(*.json) +```json +{"schedule":"08:00", "repeat":"daily", "enabled":true, "prompt":"..."} +``` +repeat可选:daily | weekly | monthly | once | every_Nh(每N小时)| every_Nd(每N天) -注意sche_tasks目录在../,即你的code root下 +## 触发流程 +1. scheduler.py(reflect/)每60秒轮询 sche_tasks/*.json +2. 条件全满足才触发:enabled=true + 当前时间≥schedule + 冷却时间已过(基于done/最新报告时间戳) +3. 触发时拼prompt,含报告路径 `../sche_tasks/done/YYYY-MM-DD_任务名.md` +4. **收到任务后第一件事**:用 update_working_checkpoint 记录报告目标文件路径,防止长任务执行中遗忘 +5. 执行完毕后将报告写入上述路径(scheduler靠此文件判断今天已执行) + +## 注意 +- once类型:执行一次后冷却100年(实际效果为永久跳过) +- 任务文件只管"干什么",报告路径由scheduler自动生成注入prompt +- sche_tasks目录在../,即code root下 \ No newline at end of file diff --git a/reflect/scheduler.py b/reflect/scheduler.py index d9d5f11..a485627 100644 --- a/reflect/scheduler.py +++ b/reflect/scheduler.py @@ -1,18 +1,71 @@ -import os, re -from datetime import datetime +import os, json +from datetime import datetime, timedelta -INTERVAL = 60 # 原版 55+random*10 +INTERVAL = 60 ONCE = False -script_dir = os.path.dirname(os.path.abspath(__file__)) -PENDING = os.path.join(script_dir, '../sche_tasks/pending') +_dir = os.path.dirname(os.path.abspath(__file__)) +TASKS = os.path.join(_dir, '../sche_tasks') +DONE = os.path.join(_dir, '../sche_tasks/done') + +def _parse_cooldown(repeat): + """解析repeat为冷却时间(比实际周期略短,防漂移)""" + if repeat == 'once': return timedelta(days=999999) + if repeat == 'daily': return timedelta(hours=20) + if repeat == 'weekly': return timedelta(days=6) + if repeat == 'monthly': return timedelta(days=27) + if repeat.startswith('every_'): + parts = repeat.split('_') + n = int(parts[1].rstrip('hdm')) + u = parts[1][-1] + if u == 'h': return timedelta(hours=n) + if u == 'm': return timedelta(minutes=n) + if u == 'd': return timedelta(days=n) + return timedelta(hours=20) + +def _last_run(tid, done_files): + """找最近一次执行时间""" + latest = None + for df in done_files: + if not df.endswith(f'_{tid}.md'): continue + try: + t = datetime.strptime(df[:15], '%Y-%m-%d_%H%M') + if latest is None or t > latest: latest = t + except: continue + return latest def check(): - if not os.path.isdir(PENDING): return None + if not os.path.isdir(TASKS): return None now = datetime.now() - for f in os.listdir(PENDING): - m = re.match(r'(\d{4}-\d{2}-\d{2})_(\d{4})_', f) - if m and now >= datetime.strptime(f'{m[1]} {m[2]}', '%Y-%m-%d %H%M'): - raw = open(os.path.join(PENDING, f), encoding='utf-8').read() - return f'按scheduled_task_sop执行任务文件 ../sche_tasks/pending/{f}(立刻移到running)\n内容:\n{raw}' + os.makedirs(DONE, exist_ok=True) + done_files = set(os.listdir(DONE)) + for f in sorted(os.listdir(TASKS)): + if not f.endswith('.json'): continue + tid = f[:-5] + try: + task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read()) + except: continue + if not task.get('enabled', False): continue + + repeat = task.get('repeat', 'daily') + sched = task.get('schedule', '00:00') + h, m = map(int, sched.split(':')) + + # 还没到schedule时间就跳过 + if now.hour < h or (now.hour == h and now.minute < m): continue + + # 检查冷却 + last = _last_run(tid, done_files) + cooldown = _parse_cooldown(repeat) + if last and (now - last) < cooldown: continue + + # 触发 + ts = now.strftime('%Y-%m-%d_%H%M') + rpt = os.path.join(DONE, f'{ts}_{tid}.md') + prompt = task.get('prompt', '') + return (f'[定时任务] {tid}\n' + f'[报告路径] {rpt}\n\n' + f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n' + f'{prompt}\n\n' + f'完成后将执行报告写入 {rpt}。') return None \ No newline at end of file