import os, json, socket as _socket, logging from datetime import datetime, timedelta # 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出 # reload时mod.__dict__保留_lock,跳过重复绑定 try: _lock except NameError: _lock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) _lock.bind(('127.0.0.1', 45762)); _lock.listen(1) INTERVAL = 60 ONCE = False _dir = os.path.dirname(os.path.abspath(__file__)) TASKS = os.path.join(_dir, '../sche_tasks') DONE = os.path.join(_dir, '../sche_tasks/done') _LOG = os.path.join(_dir, '../sche_tasks/scheduler.log') # --- 日志 --- _logger = logging.getLogger('scheduler') if not _logger.handlers: _logger.setLevel(logging.INFO) _fh = logging.FileHandler(_LOG, encoding='utf-8') _fh.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M')) _logger.addHandler(_fh) # 默认最大延迟窗口(小时),超过此时间不触发 DEFAULT_MAX_DELAY = 6 def _parse_cooldown(repeat): """解析repeat为冷却时间(比实际周期略短,防漂移)""" if repeat == 'once': return timedelta(days=999999) if repeat in ('daily', 'weekday'): return timedelta(hours=20) if repeat == 'weekly': return timedelta(days=6) if repeat == 'monthly': return timedelta(days=27) if repeat.startswith('every_'): parts = repeat.split('_') n = int(parts[1].rstrip('hdm')) u = parts[1][-1] if u == 'h': return timedelta(hours=n) if u == 'm': return timedelta(minutes=n) if u == 'd': return timedelta(days=n) _logger.warning(f'Unknown repeat type: {repeat}, fallback to 20h cooldown') return timedelta(hours=20) def _last_run(tid, done_files): """找最近一次执行时间""" latest = None for df in done_files: if not df.endswith(f'_{tid}.md'): continue try: t = datetime.strptime(df[:15], '%Y-%m-%d_%H%M') if latest is None or t > latest: latest = t except: continue return latest def check(): if not os.path.isdir(TASKS): return None now = datetime.now() os.makedirs(DONE, exist_ok=True) done_files = set(os.listdir(DONE)) for f in sorted(os.listdir(TASKS)): if not f.endswith('.json'): continue tid = f[:-5] try: task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read()) except Exception as e: _logger.error(f'JSON parse error for {f}: {e}') continue if not task.get('enabled', False): continue repeat = task.get('repeat', 'daily') sched = task.get('schedule', '00:00') try: h, m = map(int, sched.split(':')) except Exception as e: _logger.error(f'Invalid schedule format in {f}: {sched!r} ({e})') continue # weekday任务:周末跳过 if repeat == 'weekday' and now.weekday() >= 5: continue # 还没到schedule时间就跳过 if now.hour < h or (now.hour == h and now.minute < m): continue # 执行窗口检查:超过max_delay小时则跳过(防止开机太晚触发过时任务) max_delay = task.get('max_delay_hours', DEFAULT_MAX_DELAY) sched_minutes = h * 60 + m now_minutes = now.hour * 60 + now.minute if (now_minutes - sched_minutes) > max_delay * 60: _logger.info(f'SKIP {tid}: {now_minutes - sched_minutes}min past schedule, ' f'exceeds max_delay={max_delay}h') continue # 检查冷却 last = _last_run(tid, done_files) cooldown = _parse_cooldown(repeat) if last and (now - last) < cooldown: continue # 触发 _logger.info(f'TRIGGER {tid} (repeat={repeat}, schedule={sched}, ' f'last_run={last})') ts = now.strftime('%Y-%m-%d_%H%M') rpt = os.path.join(DONE, f'{ts}_{tid}.md') prompt = task.get('prompt', '') return (f'[定时任务] {tid}\n' f'[报告路径] {rpt}\n\n' f'先读 scheduled_task_sop 了解执行流程,然后执行以下任务:\n\n' f'{prompt}\n\n' f'完成后将执行报告写入 {rpt}。') return None def health_check(): """检查所有定时任务的健康状态,返回结构化报告""" if not os.path.isdir(TASKS): return {'error': 'TASKS directory not found'} now = datetime.now() os.makedirs(DONE, exist_ok=True) done_files = set(os.listdir(DONE)) results = [] for f in sorted(os.listdir(TASKS)): if not f.endswith('.json'): continue tid = f[:-5] try: task = json.loads(open(os.path.join(TASKS, f), encoding='utf-8').read()) except Exception as e: results.append({'task': tid, 'status': 'ERROR', 'detail': f'JSON parse: {e}'}) continue enabled = task.get('enabled', False) repeat = task.get('repeat', 'daily') sched = task.get('schedule', '00:00') last = _last_run(tid, done_files) cooldown = _parse_cooldown(repeat) # 判断健康状态 if not enabled: status = 'DISABLED' elif last is None: status = 'NEVER_RUN' elif repeat == 'once': status = 'COMPLETED' if last else 'PENDING' else: # 检查是否超过预期间隔的1.5倍 expected_gap = cooldown * 1.25 # 略大于冷却时间 if (now - last) > expected_gap: status = 'OVERDUE' else: status = 'HEALTHY' results.append({ 'task': tid, 'status': status, 'enabled': enabled, 'repeat': repeat, 'schedule': sched, 'last_run': last.strftime('%Y-%m-%d %H:%M') if last else None, 'cooldown_hours': cooldown.total_seconds() / 3600, }) return results