128 lines
4.8 KiB
Python
128 lines
4.8 KiB
Python
import os, json, time as _time, socket as _socket, logging
|
||
from datetime import datetime, timedelta
|
||
|
||
# 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出
|
||
# reload时mod.__dict__保留_lock,跳过重复绑定
|
||
try: _lock
|
||
except NameError:
|
||
_lock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
|
||
_lock.bind(('127.0.0.1', 45762)); _lock.listen(1)
|
||
|
||
INTERVAL = 120
|
||
ONCE = False
|
||
|
||
_dir = os.path.dirname(os.path.abspath(__file__))
|
||
TASKS = os.path.join(_dir, '../sche_tasks')
|
||
DONE = os.path.join(_dir, '../sche_tasks/done')
|
||
_LOG = os.path.join(_dir, '../sche_tasks/scheduler.log')
|
||
|
||
# --- 日志 ---
|
||
_logger = logging.getLogger('scheduler')
|
||
if not _logger.handlers:
|
||
_logger.setLevel(logging.INFO)
|
||
_fh = logging.FileHandler(_LOG, encoding='utf-8')
|
||
_fh.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s',
|
||
datefmt='%Y-%m-%d %H:%M'))
|
||
_logger.addHandler(_fh)
|
||
|
||
# 默认最大延迟窗口(小时),超过此时间不触发
|
||
DEFAULT_MAX_DELAY = 6
|
||
_l4_t = 0 # last L4 archive time
|
||
|
||
def _parse_cooldown(repeat):
|
||
"""解析repeat为冷却时间(比实际周期略短,防漂移)"""
|
||
if repeat == 'once': return timedelta(days=999999)
|
||
if repeat in ('daily', 'weekday'): return timedelta(hours=20)
|
||
if repeat == 'weekly': return timedelta(days=6)
|
||
if repeat == 'monthly': return timedelta(days=27)
|
||
if repeat.startswith('every_'):
|
||
parts = repeat.split('_')
|
||
n = int(parts[1].rstrip('hdm'))
|
||
u = parts[1][-1]
|
||
if u == 'h': return timedelta(hours=n)
|
||
if u == 'm': return timedelta(minutes=n)
|
||
if u == 'd': return timedelta(days=n)
|
||
_logger.warning(f'Unknown repeat type: {repeat}, fallback to 20h cooldown')
|
||
return timedelta(hours=20)
|
||
|
||
def _last_run(tid, done_files):
|
||
"""找最近一次执行时间"""
|
||
latest = None
|
||
for df in done_files:
|
||
if not df.endswith(f'_{tid}.md'): continue
|
||
try:
|
||
t = datetime.strptime(df[:15], '%Y-%m-%d_%H%M')
|
||
if latest is None or t > latest: latest = t
|
||
except: continue
|
||
return latest
|
||
|
||
def check():
|
||
# L4 archive cron (silent, every 12h)
|
||
global _l4_t
|
||
if _time.time() - _l4_t > 43200:
|
||
_l4_t = _time.time()
|
||
try:
|
||
import sys; sys.path.insert(0, os.path.join(_dir, '../memory/L4_raw_sessions'))
|
||
from compress_session import batch_process
|
||
raw_dir = os.path.join(_dir, '../temp/model_responses')
|
||
r = batch_process(raw_dir, dry_run=False)
|
||
print(f'[L4 cron] {r}')
|
||
except Exception as e:
|
||
_logger.error(f'L4 archive failed: {e}')
|
||
|
||
if not os.path.isdir(TASKS): return None
|
||
now = datetime.now()
|
||
os.makedirs(DONE, exist_ok=True)
|
||
done_files = set(os.listdir(DONE))
|
||
for f in sorted(os.listdir(TASKS)):
|
||
if not f.endswith('.json'): continue
|
||
tid = f[:-5]
|
||
try:
|
||
task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read())
|
||
except Exception as e:
|
||
_logger.error(f'JSON parse error for {f}: {e}')
|
||
continue
|
||
if not task.get('enabled', False): continue
|
||
|
||
repeat = task.get('repeat', 'daily')
|
||
sched = task.get('schedule', '00:00')
|
||
try:
|
||
h, m = map(int, sched.split(':'))
|
||
except Exception as e:
|
||
_logger.error(f'Invalid schedule format in {f}: {sched!r} ({e})')
|
||
continue
|
||
|
||
# weekday任务:周末跳过
|
||
if repeat == 'weekday' and now.weekday() >= 5: continue
|
||
|
||
# 还没到schedule时间就跳过
|
||
if now.hour < h or (now.hour == h and now.minute < m): continue
|
||
|
||
# 执行窗口检查:超过max_delay小时则跳过(防止开机太晚触发过时任务)
|
||
max_delay = task.get('max_delay_hours', DEFAULT_MAX_DELAY)
|
||
sched_minutes = h * 60 + m
|
||
now_minutes = now.hour * 60 + now.minute
|
||
if (now_minutes - sched_minutes) > max_delay * 60:
|
||
_logger.info(f'SKIP {tid}: {now_minutes - sched_minutes}min past schedule, '
|
||
f'exceeds max_delay={max_delay}h')
|
||
continue
|
||
|
||
# 检查冷却
|
||
last = _last_run(tid, done_files)
|
||
cooldown = _parse_cooldown(repeat)
|
||
if last and (now - last) < cooldown: continue
|
||
|
||
# 触发
|
||
_logger.info(f'TRIGGER {tid} (repeat={repeat}, schedule={sched}, '
|
||
f'last_run={last})')
|
||
ts = now.strftime('%Y-%m-%d_%H%M')
|
||
rpt = os.path.join(DONE, f'{ts}_{tid}.md')
|
||
prompt = task.get('prompt', '')
|
||
return (f'[定时任务] {tid}\n'
|
||
f'[报告路径] {rpt}\n\n'
|
||
f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n'
|
||
f'{prompt}\n\n'
|
||
f'完成后将执行报告写入 {rpt}。')
|
||
|
||
return None
|