refactor: scheduler to JSON+cooldown, update SOP

This commit is contained in:
Jiaqing Liang
2026-03-11 12:11:22 +08:00
parent 5dc44ba637
commit 006b915533
4 changed files with 83 additions and 38 deletions

View File

@@ -190,23 +190,7 @@ if __name__ == '__main__':
except Exception as e: print(f'[Reflect] on_done error: {e}')
if once: print('[Reflect] ONCE=True, exiting.'); break
elif args.scheduled:
script_dir = os.path.dirname(os.path.abspath(__file__))
def drain(dq, tag):
while 'done' not in (item := dq.get()): pass
open(os.path.join(script_dir, './temp/scheduler.log'), 'a', encoding='utf-8').write(f'[{datetime.now():%m-%d %H:%M}] {tag}\n{item["done"]}\n\n')
while True:
time.sleep(55 + random.random() * 10)
now = datetime.now()
script_dir = os.path.dirname(os.path.abspath(__file__))
sche_tasks_dir = os.path.join(script_dir, './sche_tasks/pending')
if not os.path.isdir(sche_tasks_dir): continue
for f in os.listdir(sche_tasks_dir):
m = re.match(r'(\d{4}-\d{2}-\d{2})_(\d{4})_', f)
if m and now >= datetime.strptime(f'{m[1]} {m[2]}', '%Y-%m-%d %H%M'):
raw = open(os.path.join(sche_tasks_dir, f), encoding='utf-8').read()
dq = agent.put_task(f'按scheduled_task_sop执行任务文件 ../sche_tasks/pending/{f}立刻移到running\n内容:\n{raw}', source='scheduler')
threading.Thread(target=drain, args=(dq, f), daemon=True).start()
break
print('moved to reflect mode')
else:
agent.inc_out = True
while True:

View File

@@ -83,7 +83,7 @@ if __name__ == '__main__':
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); sock.bind(('127.0.0.1', 45762)); sock.listen(1)
script_dir = os.path.dirname(os.path.abspath(__file__))
scheduler_proc = subprocess.Popen([sys.executable, os.path.join(script_dir, "agentmain.py"), "--scheduled", "--llm_no", str(args.llm_no)], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0);
scheduler_proc = subprocess.Popen([sys.executable, os.path.join(script_dir, "agentmain.py"), "--reflect", os.path.join(script_dir, "reflect", "scheduler.py"), "--llm_no", str(args.llm_no)], creationflags=subprocess.CREATE_NO_WINDOW if os.name=='nt' else 0);
atexit.register(lambda: (scheduler_proc.kill(), sock.close()))
print('[Launch] Task Scheduler started')
except OSError:

View File

@@ -1,13 +1,21 @@
# 定时任务 SOP
目录:`../sche_tasks/{pending,running,done}/`
文件名:`YYYY-MM-DD_HHMM_描述.md`内容含prompt和schedule
目录:`../sche_tasks/` 放任务定义JSON`../sche_tasks/done/` 放执行报告
## 流程
1. [AUTO]唤醒 → `datetime.now()`取当前时间,`ls ../sche_tasks/pending/`,文件名时间≤当前→到期,选择一个
2. **立即rename到running/**(先占再读,防多进程重复领)
3. 读文件执行
4. 完成→移到done/**在文件内追加执行报告**供用户查阅
5. schedule非once→算下次时间新建文件到pending/
## 任务JSON格式*.json
```json
{"schedule":"08:00", "repeat":"daily", "enabled":true, "prompt":"..."}
```
repeat可选daily | weekly | monthly | once | every_Nh每N小时| every_Nd每N天
注意sche_tasks目录在../即你的code root下
## 触发流程
1. scheduler.pyreflect/每60秒轮询 sche_tasks/*.json
2. 条件全满足才触发enabled=true + 当前时间≥schedule + 冷却时间已过基于done/最新报告时间戳)
3. 触发时拼prompt含报告路径 `../sche_tasks/done/YYYY-MM-DD_任务名.md`
4. **收到任务后第一件事**:用 update_working_checkpoint 记录报告目标文件路径,防止长任务执行中遗忘
5. 执行完毕后将报告写入上述路径scheduler靠此文件判断今天已执行
## 注意
- once类型执行一次后冷却100年实际效果为永久跳过
- 任务文件只管"干什么"报告路径由scheduler自动生成注入prompt
- sche_tasks目录在../即code root下

View File

@@ -1,18 +1,71 @@
import os, re
from datetime import datetime
import os, json
from datetime import datetime, timedelta
INTERVAL = 60 # 原版 55+random*10
INTERVAL = 60
ONCE = False
script_dir = os.path.dirname(os.path.abspath(__file__))
PENDING = os.path.join(script_dir, '../sche_tasks/pending')
_dir = os.path.dirname(os.path.abspath(__file__))
TASKS = os.path.join(_dir, '../sche_tasks')
DONE = os.path.join(_dir, '../sche_tasks/done')
def _parse_cooldown(repeat):
"""解析repeat为冷却时间(比实际周期略短,防漂移)"""
if repeat == 'once': return timedelta(days=999999)
if repeat == 'daily': return timedelta(hours=20)
if repeat == 'weekly': return timedelta(days=6)
if repeat == 'monthly': return timedelta(days=27)
if repeat.startswith('every_'):
parts = repeat.split('_')
n = int(parts[1].rstrip('hdm'))
u = parts[1][-1]
if u == 'h': return timedelta(hours=n)
if u == 'm': return timedelta(minutes=n)
if u == 'd': return timedelta(days=n)
return timedelta(hours=20)
def _last_run(tid, done_files):
"""找最近一次执行时间"""
latest = None
for df in done_files:
if not df.endswith(f'_{tid}.md'): continue
try:
t = datetime.strptime(df[:15], '%Y-%m-%d_%H%M')
if latest is None or t > latest: latest = t
except: continue
return latest
def check():
if not os.path.isdir(PENDING): return None
if not os.path.isdir(TASKS): return None
now = datetime.now()
for f in os.listdir(PENDING):
m = re.match(r'(\d{4}-\d{2}-\d{2})_(\d{4})_', f)
if m and now >= datetime.strptime(f'{m[1]} {m[2]}', '%Y-%m-%d %H%M'):
raw = open(os.path.join(PENDING, f), encoding='utf-8').read()
return f'按scheduled_task_sop执行任务文件 ../sche_tasks/pending/{f}立刻移到running\n内容:\n{raw}'
os.makedirs(DONE, exist_ok=True)
done_files = set(os.listdir(DONE))
for f in sorted(os.listdir(TASKS)):
if not f.endswith('.json'): continue
tid = f[:-5]
try:
task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read())
except: continue
if not task.get('enabled', False): continue
repeat = task.get('repeat', 'daily')
sched = task.get('schedule', '00:00')
h, m = map(int, sched.split(':'))
# 还没到schedule时间就跳过
if now.hour < h or (now.hour == h and now.minute < m): continue
# 检查冷却
last = _last_run(tid, done_files)
cooldown = _parse_cooldown(repeat)
if last and (now - last) < cooldown: continue
# 触发
ts = now.strftime('%Y-%m-%d_%H%M')
rpt = os.path.join(DONE, f'{ts}_{tid}.md')
prompt = task.get('prompt', '')
return (f'[定时任务] {tid}\n'
f'[报告路径] {rpt}\n\n'
f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n'
f'{prompt}\n\n'
f'完成后将执行报告写入 {rpt}')
return None