import os, sys, re, threading, asyncio, queue as Q, socket, time, random sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) _TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'temp') from agentmain import GeneraticAgent try: from telegram import BotCommand from telegram.constants import ChatType, MessageLimit, ParseMode from telegram.ext import ApplicationBuilder, MessageHandler, filters, ContextTypes from telegram.helpers import escape_markdown from telegram.request import HTTPXRequest except: print("Please ask the agent install python-telegram-bot to use telegram module.") sys.exit(1) from chatapp_common import ( FILE_HINT, HELP_TEXT, TELEGRAM_MENU_COMMANDS, clean_reply, extract_files, format_restore, split_text, strip_files, ) from continue_cmd import handle_frontend_command, reset_conversation from llmcore import mykeys agent = GeneraticAgent() agent.verbose = False agent.inc_out = True ALLOWED = set(mykeys.get('tg_allowed_users', [])) _DRAFT_HINT = "thinking..." _STREAM_SUFFIX = " ⏳" _STREAM_SEGMENT_LIMIT = max(1200, MessageLimit.MAX_TEXT_LENGTH - 256) _QUEUE_WAIT_SECONDS = 1 _MD_TOKEN_RE = re.compile( r"(`{3,})([A-Za-z0-9_+-]*)\n([\s\S]*?)\1" r"|\[([^\]]+)\]\(([^)\n]+)\)" r"|`([^`\n]+)`" r"|\*\*([^\n]+?)\*\*" r"|(? 1 and parts[1].strip() else '') def _cancel_stream_task(ctx): task = ctx.user_data.pop('stream_task', None) if task and not task.done(): task.cancel() async def _sync_commands(application): await application.bot.set_my_commands([BotCommand(command, description) for command, description in TELEGRAM_MENU_COMMANDS]) async def handle_msg(update, ctx): uid = update.effective_user.id if ALLOWED and uid not in ALLOWED: return await update.message.reply_text("no") prompt = f"{FILE_HINT}\n\n{update.message.text}" dq = agent.put_task(prompt, source="telegram") task = asyncio.create_task(_stream(dq, update.message)) ctx.user_data['stream_task'] = task async def cmd_abort(update, ctx): _cancel_stream_task(ctx) agent.abort() await update.message.reply_text("⏹️ 正在停止...") async def cmd_llm(update, ctx): args = (update.message.text or '').split() if len(args) > 1: try: n = int(args[1]) agent.next_llm(n) await update.message.reply_text(f"✅ 已切换到 [{agent.llm_no}] {agent.get_llm_name()}") except (ValueError, IndexError): await update.message.reply_text(f"用法: /llm <0-{len(agent.list_llms())-1}>") else: lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in agent.list_llms()] await update.message.reply_text("LLMs:\n" + "\n".join(lines)) async def handle_photo(update, ctx): uid = update.effective_user.id if ALLOWED and uid not in ALLOWED: return await update.message.reply_text("no") if update.message.photo: photo = update.message.photo[-1] file = await photo.get_file() fpath = f"tg_{photo.file_unique_id}.jpg" kind = "图片" elif update.message.document: doc = update.message.document file = await doc.get_file() ext = os.path.splitext(doc.file_name or '')[1] or '' fpath = f"tg_{doc.file_unique_id}{ext}" kind = "文件" else: return await file.download_to_drive(os.path.join(_TEMP_DIR, fpath)) caption = update.message.caption prompt = f"[TIPS] 收到{kind}temp/{fpath}\n{caption}" if caption else f"[TIPS] 收到{kind}temp/{fpath},请等待下一步指令" dq = agent.put_task(prompt, source="telegram") task = asyncio.create_task(_stream(dq, update.message)) ctx.user_data['stream_task'] = task async def handle_command(update, ctx): uid = update.effective_user.id if ALLOWED and uid not in ALLOWED: return await update.message.reply_text("no") cmd = _normalized_command(update.message.text) op = cmd.split()[0] if cmd else '' if op == '/help': return await update.message.reply_text(HELP_TEXT) if op == '/status': llm = agent.get_llm_name() if agent.llmclient else '未配置' return await update.message.reply_text(f"状态: {'🔴 运行中' if agent.is_running else '🟢 空闲'}\nLLM: [{agent.llm_no}] {llm}") if op == '/stop': return await cmd_abort(update, ctx) if op == '/llm': return await cmd_llm(update, ctx) if op == '/new': _cancel_stream_task(ctx) return await update.message.reply_text(reset_conversation(agent)) if op == '/restore': _cancel_stream_task(ctx) try: restored_info, err = format_restore() if err: return await update.message.reply_text(err) restored, fname, count = restored_info agent.abort() agent.history.extend(restored) return await update.message.reply_text(f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)") except Exception as e: return await update.message.reply_text(f"❌ 恢复失败: {e}") if op == '/continue': if cmd != '/continue': _cancel_stream_task(ctx) return await update.message.reply_text(handle_frontend_command(agent, cmd)) return await update.message.reply_text(HELP_TEXT) if __name__ == '__main__': try: # Single instance lock using socket _lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); _lock_sock.bind(('127.0.0.1', 19527)) except OSError: print('[Telegram] Another instance is already running, skiping...') sys.exit(1) if not ALLOWED: print('[Telegram] ERROR: tg_allowed_users in mykey.py is empty or missing. Set it to avoid unauthorized access.') sys.exit(1) _logf = open(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'temp', 'tgapp.log'), 'a', encoding='utf-8', buffering=1) sys.stdout = sys.stderr = _logf print('[NEW] New process starting, the above are history infos ...') threading.Thread(target=agent.run, daemon=True).start() proxy = mykeys.get('proxy') if proxy: print('proxy:', proxy) else: print('proxy: ') async def _error_handler(update, context: ContextTypes.DEFAULT_TYPE): print(f"[{time.strftime('%m-%d %H:%M')}] TG error: {context.error}", flush=True) while True: try: print(f"TG bot starting... {time.strftime('%m-%d %H:%M')}") # Recreate request and app objects on each restart to avoid stale connections request_kwargs = dict(read_timeout=30, write_timeout=30, connect_timeout=30, pool_timeout=30) if proxy: request_kwargs['proxy'] = proxy request = HTTPXRequest(**request_kwargs) app = (ApplicationBuilder().token(mykeys['tg_bot_token']) .request(request).get_updates_request(request).post_init(_sync_commands).build()) app.add_handler(MessageHandler(filters.COMMAND, handle_command)) app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) app.add_handler(MessageHandler(filters.Document.ALL, handle_photo)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_msg)) app.add_error_handler(_error_handler) app.run_polling(drop_pending_updates=True, poll_interval=1.0, timeout=30) except Exception as e: print(f"[{time.strftime('%m-%d %H:%M')}] polling crashed: {e}", flush=True) time.sleep(10) asyncio.set_event_loop(asyncio.new_event_loop())