From f16b5b7214b2c18eb617bff7352be6b5be5950ad Mon Sep 17 00:00:00 2001 From: Xinyi Wang <90703326+ViviqwerAsd@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:46:16 +0800 Subject: [PATCH 1/2] Refactor chat app adapters to reduce duplicated scaffolding --- chatapp_common.py | 178 +++++++++++++++++++++++++++++++++ dingtalkapp.py | 225 +++++------------------------------------ qqapp.py | 250 ++++++---------------------------------------- wecomapp.py | 240 +++++++------------------------------------- 4 files changed, 271 insertions(+), 622 deletions(-) create mode 100644 chatapp_common.py diff --git a/chatapp_common.py b/chatapp_common.py new file mode 100644 index 0000000..2a7b9a6 --- /dev/null +++ b/chatapp_common.py @@ -0,0 +1,178 @@ +import asyncio, glob, os, queue as Q, re, socket, sys, time + +HELP_TEXT = "📖 命令列表:\n/help - 显示帮助\n/status - 查看状态\n/stop - 停止当前任务\n/new - 清空当前上下文\n/restore - 恢复上次对话历史\n/llm [n] - 查看或切换模型" +FILE_HINT = "If you need to show files to user, use [FILE:filepath] in your response." +TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] + + +def clean_reply(text): + for pat in TAG_PATS: + text = re.sub(pat, "", text or "", flags=re.DOTALL) + return re.sub(r"\n{3,}", "\n\n", text).strip() or "..." + + +def extract_files(text): + return re.findall(r"\[FILE:([^\]]+)\]", text or "") + + +def strip_files(text): + return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip() + + +def split_text(text, limit): + text, parts = (text or "").strip() or "...", [] + while len(text) > limit: + cut = text.rfind("\n", 0, limit) + if cut < limit * 0.6: + cut = limit + parts.append(text[:cut].rstrip()) + text = text[cut:].lstrip() + return parts + ([text] if text else []) or ["..."] + + +def format_restore(): + files = glob.glob("./temp/model_responses_*.txt") + if not files: + return None, "❌ 没有找到历史记录" + latest = max(files, key=os.path.getmtime) + with open(latest, "r", encoding="utf-8") as f: + content = f.read() + users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL) + resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL) + restored = [] + for u, r in zip(users, resps): + u, r = u.strip(), r.strip()[:500] + if u and r: + restored.extend([f"[USER]: {u}", f"[Agent] {r}"]) + if not restored: + return None, "❌ 历史记录里没有可恢复内容" + return (restored, os.path.basename(latest), len(restored) // 2), None + + +def build_done_text(raw_text): + files = [p for p in extract_files(raw_text) if os.path.exists(p)] + body = strip_files(clean_reply(raw_text)) + if files: + body = (body + "\n\n" if body else "") + "\n".join(f"生成文件: {p}" for p in files) + return body or "..." + + +def public_access(allowed): + return not allowed or "*" in allowed + + +def allowed_label(allowed): + return "public" if public_access(allowed) else sorted(allowed) + + +def ensure_single_instance(port, label): + try: + lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + lock_sock.bind(("127.0.0.1", port)) + return lock_sock + except OSError: + print(f"[{label}] Another instance is already running, skipping...") + sys.exit(1) + + +def require_runtime(agent, label, **required): + missing = [k for k, v in required.items() if not v] + if missing: + print(f"[{label}] ERROR: please set {', '.join(missing)} in mykey.py or mykey.json") + sys.exit(1) + if agent.llmclient is None: + print(f"[{label}] ERROR: no usable LLM backend found in mykey.py or mykey.json") + sys.exit(1) + + +def redirect_log(script_file, log_name, label, allowed): + log_dir = os.path.join(os.path.dirname(script_file), "temp") + os.makedirs(log_dir, exist_ok=True) + logf = open(os.path.join(log_dir, log_name), "a", encoding="utf-8", buffering=1) + sys.stdout = sys.stderr = logf + print(f"[NEW] {label} process starting, the above are history infos ...") + print(f"[{label}] allow list: {allowed_label(allowed)}") + + +class AgentChatMixin: + label = "Chat" + source = "chat" + split_limit = 1500 + ping_interval = 20 + + def __init__(self, agent, user_tasks): + self.agent, self.user_tasks = agent, user_tasks + + async def send_text(self, chat_id, content, **ctx): + raise NotImplementedError + + async def send_done(self, chat_id, raw_text, **ctx): + await self.send_text(chat_id, build_done_text(raw_text), **ctx) + + async def handle_command(self, chat_id, cmd, **ctx): + parts = (cmd or "").split() + op = (parts[0] if parts else "").lower() + if op == "/stop": + state = self.user_tasks.get(chat_id) + if state: + state["running"] = False + self.agent.abort() + return await self.send_text(chat_id, "⏹️ 正在停止...", **ctx) + if op == "/status": + llm = self.agent.get_llm_name() if self.agent.llmclient else "未配置" + return await self.send_text(chat_id, f"状态: {'🔴 运行中' if self.agent.is_running else '🟢 空闲'}\nLLM: [{self.agent.llm_no}] {llm}", **ctx) + if op == "/llm": + if not self.agent.llmclient: + return await self.send_text(chat_id, "❌ 当前没有可用的 LLM 配置", **ctx) + if len(parts) > 1: + try: + self.agent.next_llm(int(parts[1])) + return await self.send_text(chat_id, f"✅ 已切换到 [{self.agent.llm_no}] {self.agent.get_llm_name()}", **ctx) + except Exception: + return await self.send_text(chat_id, f"用法: /llm <0-{len(self.agent.list_llms()) - 1}>", **ctx) + lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in self.agent.list_llms()] + return await self.send_text(chat_id, "LLMs:\n" + "\n".join(lines), **ctx) + if op == "/restore": + try: + restored_info, err = format_restore() + if err: + return await self.send_text(chat_id, err, **ctx) + restored, fname, count = restored_info + self.agent.abort() + self.agent.history.extend(restored) + return await self.send_text(chat_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)", **ctx) + except Exception as e: + return await self.send_text(chat_id, f"❌ 恢复失败: {e}", **ctx) + if op == "/new": + self.agent.abort() + self.agent.history = [] + return await self.send_text(chat_id, "🆕 已清空当前共享上下文", **ctx) + return await self.send_text(chat_id, HELP_TEXT, **ctx) + + async def run_agent(self, chat_id, text, **ctx): + state = {"running": True} + self.user_tasks[chat_id] = state + try: + await self.send_text(chat_id, "思考中...", **ctx) + dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source) + last_ping = time.time() + while state["running"]: + try: + item = await asyncio.to_thread(dq.get, True, 3) + except Q.Empty: + if self.agent.is_running and time.time() - last_ping > self.ping_interval: + await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx) + last_ping = time.time() + continue + if "done" in item: + await self.send_done(chat_id, item.get("done", ""), **ctx) + break + if not state["running"]: + await self.send_text(chat_id, "⏹️ 已停止", **ctx) + except Exception as e: + import traceback + print(f"[{self.label}] run_agent error: {e}") + traceback.print_exc() + await self.send_text(chat_id, f"❌ 错误: {e}", **ctx) + finally: + self.user_tasks.pop(chat_id, None) diff --git a/dingtalkapp.py b/dingtalkapp.py index d4a07dc..bcb2faa 100644 --- a/dingtalkapp.py +++ b/dingtalkapp.py @@ -1,8 +1,9 @@ -import os, sys, re, threading, asyncio, queue as Q, socket, time, glob, json +import asyncio, json, os, sys, threading, time import requests sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from agentmain import GeneraticAgent +from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text from llmcore import mykeys try: @@ -12,82 +13,26 @@ except Exception: print("Please install dingtalk-stream to use DingTalk: pip install dingtalk-stream") sys.exit(1) -agent = GeneraticAgent() -agent.verbose = False - +agent = GeneraticAgent(); agent.verbose = False CLIENT_ID = str(mykeys.get("dingtalk_client_id", "") or "").strip() CLIENT_SECRET = str(mykeys.get("dingtalk_client_secret", "") or "").strip() ALLOWED = {str(x).strip() for x in mykeys.get("dingtalk_allowed_users", []) if str(x).strip()} - -_TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] -_USER_TASKS = {} +USER_TASKS = {} -def _clean(text): - for pat in _TAG_PATS: - text = re.sub(pat, "", text, flags=re.DOTALL) - return re.sub(r"\n{3,}", "\n\n", text).strip() or "..." +class DingTalkApp(AgentChatMixin): + label, source, split_limit = "DingTalk", "dingtalk", 1800 - -def _extract_files(text): - return re.findall(r"\[FILE:([^\]]+)\]", text or "") - - -def _strip_files(text): - return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip() - - -def _split_text(text, limit=1800): - text = (text or "").strip() or "..." - parts = [] - while len(text) > limit: - cut = text.rfind("\n", 0, limit) - if cut < limit * 0.6: - cut = limit - parts.append(text[:cut].rstrip()) - text = text[cut:].lstrip() - if text: - parts.append(text) - return parts or ["..."] - - -def _format_restore(): - files = glob.glob("./temp/model_responses_*.txt") - if not files: - return None, "❌ 没有找到历史记录" - latest = max(files, key=os.path.getmtime) - with open(latest, "r", encoding="utf-8") as f: - content = f.read() - users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL) - resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL) - count, restored = 0, [] - for u, r in zip(users, resps): - u, r = u.strip(), r.strip()[:500] - if u and r: - restored.extend([f"[USER]: {u}", f"[Agent] {r}"]) - count += 1 - if not restored: - return None, "❌ 历史记录里没有可恢复内容" - return (restored, os.path.basename(latest), count), None - - -class DingTalkApp: def __init__(self): - self.client = None - self.access_token = None - self.token_expiry = 0 - self.background_tasks = set() + super().__init__(agent, USER_TASKS) + self.client, self.access_token, self.token_expiry, self.background_tasks = None, None, 0, set() async def _get_access_token(self): if self.access_token and time.time() < self.token_expiry: return self.access_token def _fetch(): - resp = requests.post( - "https://api.dingtalk.com/v1.0/oauth2/accessToken", - json={"appKey": CLIENT_ID, "appSecret": CLIENT_SECRET}, - timeout=20, - ) + resp = requests.post("https://api.dingtalk.com/v1.0/oauth2/accessToken", json={"appKey": CLIENT_ID, "appSecret": CLIENT_SECRET}, timeout=20) resp.raise_for_status() return resp.json() @@ -107,30 +52,17 @@ class DingTalkApp: headers = {"x-acs-dingtalk-access-token": token} if chat_id.startswith("group:"): url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send" - payload = { - "robotCode": CLIENT_ID, - "openConversationId": chat_id[6:], - "msgKey": msg_key, - "msgParam": json.dumps(msg_param, ensure_ascii=False), - } + payload = {"robotCode": CLIENT_ID, "openConversationId": chat_id[6:], "msgKey": msg_key, "msgParam": json.dumps(msg_param, ensure_ascii=False)} else: url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" - payload = { - "robotCode": CLIENT_ID, - "userIds": [chat_id], - "msgKey": msg_key, - "msgParam": json.dumps(msg_param, ensure_ascii=False), - } + payload = {"robotCode": CLIENT_ID, "userIds": [chat_id], "msgKey": msg_key, "msgParam": json.dumps(msg_param, ensure_ascii=False)} def _post(): resp = requests.post(url, json=payload, headers=headers, timeout=20) body = resp.text if resp.status_code != 200: raise RuntimeError(f"HTTP {resp.status_code}: {body[:300]}") - try: - result = resp.json() - except Exception: - result = {} + result = resp.json() if "json" in resp.headers.get("content-type", "") else {} errcode = result.get("errcode") if errcode not in (None, 0): raise RuntimeError(f"API errcode={errcode}: {body[:300]}") @@ -143,119 +75,32 @@ class DingTalkApp: return False async def send_text(self, chat_id, content): - for part in _split_text(content): + for part in split_text(content, self.split_limit): await self._send_batch_message(chat_id, "sampleMarkdown", {"text": part, "title": "Agent Reply"}) - async def send_done(self, chat_id, raw_text): - files = [p for p in _extract_files(raw_text) if os.path.exists(p)] - body = _strip_files(_clean(raw_text)) - if files: - body = (body + "\n\n" if body else "") + "\n".join([f"生成文件: {p}" for p in files]) - await self.send_text(chat_id, body or "...") - - async def handle_command(self, chat_id, cmd): - parts = (cmd or "").split() - op = (parts[0] if parts else "").lower() - if op == "/stop": - state = _USER_TASKS.get(chat_id) - if state: - state["running"] = False - agent.abort() - await self.send_text(chat_id, "⏹️ 正在停止...") - elif op == "/status": - llm = agent.get_llm_name() if agent.llmclient else "未配置" - await self.send_text(chat_id, f"状态: {'🔴 运行中' if agent.is_running else '🟢 空闲'}\nLLM: [{agent.llm_no}] {llm}") - elif op == "/llm": - if not agent.llmclient: - return await self.send_text(chat_id, "❌ 当前没有可用的 LLM 配置") - if len(parts) > 1: - try: - n = int(parts[1]) - agent.next_llm(n) - await self.send_text(chat_id, f"✅ 已切换到 [{agent.llm_no}] {agent.get_llm_name()}") - except Exception: - await self.send_text(chat_id, f"用法: /llm <0-{len(agent.list_llms()) - 1}>") - else: - lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in agent.list_llms()] - await self.send_text(chat_id, "LLMs:\n" + "\n".join(lines)) - elif op == "/restore": - try: - restored_info, err = _format_restore() - if err: - return await self.send_text(chat_id, err) - restored, fname, count = restored_info - agent.abort() - agent.history.extend(restored) - await self.send_text(chat_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)") - except Exception as e: - await self.send_text(chat_id, f"❌ 恢复失败: {e}") - elif op == "/new": - agent.abort() - agent.history = [] - await self.send_text(chat_id, "🆕 已清空当前共享上下文") - else: - await self.send_text( - chat_id, - "📖 命令列表:\n/help - 显示帮助\n/status - 查看状态\n/stop - 停止当前任务\n/new - 清空当前上下文\n/restore - 恢复上次对话历史\n/llm [n] - 查看或切换模型", - ) - - async def run_agent(self, chat_id, text): - state = {"running": True} - _USER_TASKS[chat_id] = state - try: - await self.send_text(chat_id, "思考中...") - prompt = f"If you need to show files to user, use [FILE:filepath] in your response.\n\n{text}" - dq = agent.put_task(prompt, source="dingtalk") - last_ping = time.time() - while state["running"]: - try: - item = await asyncio.to_thread(dq.get, True, 3) - except Q.Empty: - if agent.is_running and time.time() - last_ping > 20: - await self.send_text(chat_id, "⏳ 还在处理中,请稍等...") - last_ping = time.time() - continue - if "done" in item: - await self.send_done(chat_id, item.get("done", "")) - break - if not state["running"]: - await self.send_text(chat_id, "⏹️ 已停止") - except Exception as e: - import traceback - - print(f"[DingTalk] run_agent error: {e}") - traceback.print_exc() - await self.send_text(chat_id, f"❌ 错误: {e}") - finally: - _USER_TASKS.pop(chat_id, None) - async def on_message(self, content, sender_id, sender_name, conversation_type=None, conversation_id=None): try: if not content: return - public_access = not ALLOWED or "*" in ALLOWED - if not public_access and sender_id not in ALLOWED: + if not public_access(ALLOWED) and sender_id not in ALLOWED: print(f"[DingTalk] unauthorized user: {sender_id}") return is_group = conversation_type == "2" and conversation_id chat_id = f"group:{conversation_id}" if is_group else sender_id print(f"[DingTalk] message from {sender_name} ({sender_id}): {content}") if content.startswith("/"): - await self.handle_command(chat_id, content) - return + return await self.handle_command(chat_id, content) task = asyncio.create_task(self.run_agent(chat_id, content)) self.background_tasks.add(task) task.add_done_callback(self.background_tasks.discard) except Exception: import traceback - print("[DingTalk] handle_message error") traceback.print_exc() async def start(self): - handler = _DingTalkHandler(self) self.client = DingTalkStreamClient(Credential(CLIENT_ID, CLIENT_SECRET)) - self.client.register_callback_handler(ChatbotMessage.TOPIC, handler) + self.client.register_callback_handler(ChatbotMessage.TOPIC, _DingTalkHandler(self)) print("[DingTalk] bot starting...") while True: try: @@ -274,44 +119,22 @@ class _DingTalkHandler(CallbackHandler): async def process(self, message): try: chatbot_msg = ChatbotMessage.from_dict(message.data) - text = "" - if getattr(getattr(chatbot_msg, "text", None), "content", None): - text = chatbot_msg.text.content.strip() + text = getattr(getattr(chatbot_msg, "text", None), "content", "") or "" extensions = getattr(chatbot_msg, "extensions", None) or {} recognition = ((extensions.get("content") or {}).get("recognition") or "").strip() if isinstance(extensions, dict) else "" - if not text: + if not (text := text.strip()): text = recognition or str((message.data.get("text", {}) or {}).get("content", "") or "").strip() - sender_id = getattr(chatbot_msg, "sender_staff_id", None) or getattr(chatbot_msg, "sender_id", None) or "unknown" + sender_id = str(getattr(chatbot_msg, "sender_staff_id", None) or getattr(chatbot_msg, "sender_id", None) or "unknown") sender_name = getattr(chatbot_msg, "sender_nick", None) or "Unknown" - conversation_type = message.data.get("conversationType") - conversation_id = message.data.get("conversationId") or message.data.get("openConversationId") - await self.app.on_message(text, str(sender_id), sender_name, conversation_type, conversation_id) - return AckMessage.STATUS_OK, "OK" + await self.app.on_message(text, sender_id, sender_name, message.data.get("conversationType"), message.data.get("conversationId") or message.data.get("openConversationId")) except Exception as e: print(f"[DingTalk] callback error: {e}") - return AckMessage.STATUS_OK, "Error" + return AckMessage.STATUS_OK, "OK" if __name__ == "__main__": - try: - _lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - _lock_sock.bind(("127.0.0.1", 19530)) - except OSError: - print("[DingTalk] Another instance is already running, skipping...") - sys.exit(1) - - if not CLIENT_ID or not CLIENT_SECRET: - print("[DingTalk] ERROR: please set dingtalk_client_id and dingtalk_client_secret in mykey.py or mykey.json") - sys.exit(1) - if agent.llmclient is None: - print("[DingTalk] ERROR: no usable LLM backend found in mykey.py or mykey.json") - sys.exit(1) - - log_dir = os.path.join(os.path.dirname(__file__), "temp") - os.makedirs(log_dir, exist_ok=True) - _logf = open(os.path.join(log_dir, "dingtalkapp.log"), "a", encoding="utf-8", buffering=1) - sys.stdout = sys.stderr = _logf - print("[NEW] DingTalk process starting, the above are history infos ...") - print(f"[DingTalk] allow list: {'public' if not ALLOWED or '*' in ALLOWED else sorted(ALLOWED)}") + _LOCK_SOCK = ensure_single_instance(19530, "DingTalk") + require_runtime(agent, "DingTalk", dingtalk_client_id=CLIENT_ID, dingtalk_client_secret=CLIENT_SECRET) + redirect_log(__file__, "dingtalkapp.log", "DingTalk", ALLOWED) threading.Thread(target=agent.run, daemon=True).start() asyncio.run(DingTalkApp().start()) diff --git a/qqapp.py b/qqapp.py index 5e65002..cd4223f 100644 --- a/qqapp.py +++ b/qqapp.py @@ -1,8 +1,9 @@ -import os, sys, re, threading, asyncio, queue as Q, socket, time, glob +import asyncio, os, sys, threading, time from collections import deque sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from agentmain import GeneraticAgent +from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text from llmcore import mykeys try: @@ -12,73 +13,19 @@ except Exception: print("Please install qq-botpy to use QQ module: pip install qq-botpy") sys.exit(1) -agent = GeneraticAgent() -agent.verbose = False - +agent = GeneraticAgent(); agent.verbose = False APP_ID = str(mykeys.get("qq_app_id", "") or "").strip() APP_SECRET = str(mykeys.get("qq_app_secret", "") or "").strip() ALLOWED = {str(x).strip() for x in mykeys.get("qq_allowed_users", []) if str(x).strip()} - -_TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] -_PROCESSED_IDS = deque(maxlen=1000) -_USER_TASKS = {} -_SEQ_LOCK = threading.Lock() -_MSG_SEQ = 1 +PROCESSED_IDS, USER_TASKS = deque(maxlen=1000), {} +SEQ_LOCK, MSG_SEQ = threading.Lock(), 1 def _next_msg_seq(): - global _MSG_SEQ - with _SEQ_LOCK: - _MSG_SEQ += 1 - return _MSG_SEQ - - -def _clean(text): - for pat in _TAG_PATS: - text = re.sub(pat, "", text, flags=re.DOTALL) - return re.sub(r"\n{3,}", "\n\n", text).strip() or "..." - - -def _extract_files(text): - return re.findall(r"\[FILE:([^\]]+)\]", text or "") - - -def _strip_files(text): - return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip() - - -def _split_text(text, limit=1500): - text = (text or "").strip() or "..." - parts = [] - while len(text) > limit: - cut = text.rfind("\n", 0, limit) - if cut < limit * 0.6: - cut = limit - parts.append(text[:cut].rstrip()) - text = text[cut:].lstrip() - if text: - parts.append(text) - return parts or ["..."] - - -def _format_restore(): - files = glob.glob("./temp/model_responses_*.txt") - if not files: - return None, "❌ 没有找到历史记录" - latest = max(files, key=os.path.getmtime) - with open(latest, "r", encoding="utf-8") as f: - content = f.read() - users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL) - resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL) - count, restored = 0, [] - for u, r in zip(users, resps): - u, r = u.strip(), r.strip()[:500] - if u and r: - restored.extend([f"[USER]: {u}", f"[Agent] {r}"]) - count += 1 - if not restored: - return None, "❌ 历史记录里没有可恢复内容" - return (restored, os.path.basename(latest), count), None + global MSG_SEQ + with SEQ_LOCK: + MSG_SEQ += 1 + return MSG_SEQ def _build_intents(): @@ -86,16 +33,7 @@ def _build_intents(): return botpy.Intents(public_messages=True, direct_message=True) except Exception: intents = botpy.Intents.none() if hasattr(botpy.Intents, "none") else botpy.Intents() - for attr in ( - "public_messages", - "public_guild_messages", - "direct_message", - "direct_messages", - "c2c_message", - "c2c_messages", - "group_at_message", - "group_at_messages", - ): + for attr in ("public_messages", "public_guild_messages", "direct_message", "direct_messages", "c2c_message", "c2c_messages", "group_at_message", "group_at_messages"): if hasattr(intents, attr): try: setattr(intents, attr, True) @@ -105,15 +43,12 @@ def _build_intents(): def _make_bot_class(app): - intents = _build_intents() - - class _QQBot(botpy.Client): + class QQBot(botpy.Client): def __init__(self): - super().__init__(intents=intents, ext_handlers=False) + super().__init__(intents=_build_intents(), ext_handlers=False) async def on_ready(self): - name = getattr(getattr(self, "robot", None), "name", "QQBot") - print(f"[QQ] bot ready: {name}") + print(f"[QQ] bot ready: {getattr(getattr(self, 'robot', None), 'name', 'QQBot')}") async def on_c2c_message_create(self, message: C2CMessage): await app.on_message(message, is_group=False) @@ -124,154 +59,50 @@ def _make_bot_class(app): async def on_direct_message_create(self, message): await app.on_message(message, is_group=False) - return _QQBot + return QQBot -class QQApp: +class QQApp(AgentChatMixin): + label, source, split_limit = "QQ", "qq", 1500 + def __init__(self): + super().__init__(agent, USER_TASKS) self.client = None async def send_text(self, chat_id, content, *, msg_id=None, is_group=False): if not self.client: return - for part in _split_text(content): - seq = _next_msg_seq() - if is_group: - await self.client.api.post_group_message( - group_openid=chat_id, - msg_type=0, - content=part, - msg_id=msg_id, - msg_seq=seq, - ) - else: - await self.client.api.post_c2c_message( - openid=chat_id, - msg_type=0, - content=part, - msg_id=msg_id, - msg_seq=seq, - ) - - async def send_done(self, chat_id, raw_text, *, msg_id=None, is_group=False): - files = [p for p in _extract_files(raw_text) if os.path.exists(p)] - body = _strip_files(_clean(raw_text)) - if files: - body = (body + "\n\n" if body else "") + "\n".join([f"生成文件: {p}" for p in files]) - await self.send_text(chat_id, body or "...", msg_id=msg_id, is_group=is_group) - - async def handle_command(self, chat_id, cmd, *, msg_id=None, is_group=False): - parts = (cmd or "").split() - op = (parts[0] if parts else "").lower() - if op == "/stop": - state = _USER_TASKS.get(chat_id) - if state: - state["running"] = False - agent.abort() - await self.send_text(chat_id, "⏹️ 正在停止...", msg_id=msg_id, is_group=is_group) - elif op == "/status": - llm = agent.get_llm_name() if agent.llmclient else "未配置" - await self.send_text(chat_id, f"状态: {'🔴 运行中' if agent.is_running else '🟢 空闲'}\nLLM: [{agent.llm_no}] {llm}", msg_id=msg_id, is_group=is_group) - elif op == "/llm": - if not agent.llmclient: - return await self.send_text(chat_id, "❌ 当前没有可用的 LLM 配置", msg_id=msg_id, is_group=is_group) - if len(parts) > 1: - try: - n = int(parts[1]) - agent.next_llm(n) - await self.send_text(chat_id, f"✅ 已切换到 [{agent.llm_no}] {agent.get_llm_name()}", msg_id=msg_id, is_group=is_group) - except Exception: - await self.send_text(chat_id, f"用法: /llm <0-{len(agent.list_llms()) - 1}>", msg_id=msg_id, is_group=is_group) - else: - lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in agent.list_llms()] - await self.send_text(chat_id, "LLMs:\n" + "\n".join(lines), msg_id=msg_id, is_group=is_group) - elif op == "/restore": - try: - restored_info, err = _format_restore() - if err: - return await self.send_text(chat_id, err, msg_id=msg_id, is_group=is_group) - restored, fname, count = restored_info - agent.abort() - agent.history.extend(restored) - await self.send_text(chat_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)", msg_id=msg_id, is_group=is_group) - except Exception as e: - await self.send_text(chat_id, f"❌ 恢复失败: {e}", msg_id=msg_id, is_group=is_group) - elif op == "/new": - agent.abort() - agent.history = [] - await self.send_text(chat_id, "🆕 已清空当前共享上下文", msg_id=msg_id, is_group=is_group) - else: - await self.send_text( - chat_id, - "📖 命令列表:\n/help - 显示帮助\n/status - 查看状态\n/stop - 停止当前任务\n/new - 清空当前上下文\n/restore - 恢复上次对话历史\n/llm [n] - 查看或切换模型", - msg_id=msg_id, - is_group=is_group, - ) - - async def run_agent(self, chat_id, text, *, msg_id=None, is_group=False): - state = {"running": True} - _USER_TASKS[chat_id] = state - try: - await self.send_text(chat_id, "思考中...", msg_id=msg_id, is_group=is_group) - prompt = f"If you need to show files to user, use [FILE:filepath] in your response.\n\n{text}" - dq = agent.put_task(prompt, source="qq") - last_ping = time.time() - while state["running"]: - try: - item = await asyncio.to_thread(dq.get, True, 3) - except Q.Empty: - if agent.is_running and time.time() - last_ping > 20: - await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", msg_id=msg_id, is_group=is_group) - last_ping = time.time() - continue - if "done" in item: - await self.send_done(chat_id, item.get("done", ""), msg_id=msg_id, is_group=is_group) - break - if not state["running"]: - await self.send_text(chat_id, "⏹️ 已停止", msg_id=msg_id, is_group=is_group) - except Exception as e: - import traceback - - print(f"[QQ] run_agent error: {e}") - traceback.print_exc() - await self.send_text(chat_id, f"❌ 错误: {e}", msg_id=msg_id, is_group=is_group) - finally: - _USER_TASKS.pop(chat_id, None) + api = self.client.api.post_group_message if is_group else self.client.api.post_c2c_message + key = "group_openid" if is_group else "openid" + for part in split_text(content, self.split_limit): + await api(**{key: chat_id, "msg_type": 0, "content": part, "msg_id": msg_id, "msg_seq": _next_msg_seq()}) async def on_message(self, data, is_group=False): try: msg_id = getattr(data, "id", None) - if msg_id in _PROCESSED_IDS: + if msg_id in PROCESSED_IDS: return - _PROCESSED_IDS.append(msg_id) + PROCESSED_IDS.append(msg_id) content = (getattr(data, "content", "") or "").strip() if not content: return author = getattr(data, "author", None) - if is_group: - chat_id = str(getattr(data, "group_openid", "") or "") - user_id = str(getattr(author, "member_openid", "") or getattr(author, "id", "") or "unknown") - else: - user_id = str(getattr(author, "user_openid", "") or getattr(author, "id", "") or "unknown") - chat_id = user_id - public_access = not ALLOWED or "*" in ALLOWED - if not public_access and user_id not in ALLOWED: + user_id = str(getattr(author, "member_openid" if is_group else "user_openid", "") or getattr(author, "id", "") or "unknown") + chat_id = str(getattr(data, "group_openid", "") or user_id) if is_group else user_id + if not public_access(ALLOWED) and user_id not in ALLOWED: print(f"[QQ] unauthorized user: {user_id}") return print(f"[QQ] message from {user_id} ({'group' if is_group else 'c2c'}): {content}") if content.startswith("/"): - await self.handle_command(chat_id, content, msg_id=msg_id, is_group=is_group) - return + return await self.handle_command(chat_id, content, msg_id=msg_id, is_group=is_group) asyncio.create_task(self.run_agent(chat_id, content, msg_id=msg_id, is_group=is_group)) except Exception: import traceback - print("[QQ] handle_message error") traceback.print_exc() async def start(self): - BotClass = _make_bot_class(self) - self.client = BotClass() + self.client = _make_bot_class(self)() while True: try: print(f"[QQ] bot starting... {time.strftime('%m-%d %H:%M')}") @@ -283,25 +114,8 @@ class QQApp: if __name__ == "__main__": - try: - _lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - _lock_sock.bind(("127.0.0.1", 19528)) - except OSError: - print("[QQ] Another instance is already running, skipping...") - sys.exit(1) - - if not APP_ID or not APP_SECRET: - print("[QQ] ERROR: please set qq_app_id and qq_app_secret in mykey.py or mykey.json") - sys.exit(1) - if agent.llmclient is None: - print("[QQ] ERROR: no usable LLM backend found in mykey.py or mykey.json") - sys.exit(1) - - log_dir = os.path.join(os.path.dirname(__file__), "temp") - os.makedirs(log_dir, exist_ok=True) - _logf = open(os.path.join(log_dir, "qqapp.log"), "a", encoding="utf-8", buffering=1) - sys.stdout = sys.stderr = _logf - print("[NEW] QQ process starting, the above are history infos ...") - print(f"[QQ] allow list: {'public' if not ALLOWED or '*' in ALLOWED else sorted(ALLOWED)}") + _LOCK_SOCK = ensure_single_instance(19528, "QQ") + require_runtime(agent, "QQ", qq_app_id=APP_ID, qq_app_secret=APP_SECRET) + redirect_log(__file__, "qqapp.log", "QQ", ALLOWED) threading.Thread(target=agent.run, daemon=True).start() asyncio.run(QQApp().start()) diff --git a/wecomapp.py b/wecomapp.py index b5ef37b..bd37ddd 100644 --- a/wecomapp.py +++ b/wecomapp.py @@ -1,8 +1,9 @@ -import os, sys, re, threading, asyncio, queue as Q, socket, time, glob +import asyncio, os, sys, threading from collections import deque sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from agentmain import GeneraticAgent +from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text from llmcore import mykeys try: @@ -11,211 +12,64 @@ except Exception: print("Please install wecom_aibot_sdk to use WeCom: pip install wecom_aibot_sdk") sys.exit(1) -agent = GeneraticAgent() -agent.verbose = False - +agent = GeneraticAgent(); agent.verbose = False BOT_ID = str(mykeys.get("wecom_bot_id", "") or "").strip() SECRET = str(mykeys.get("wecom_secret", "") or "").strip() WELCOME = str(mykeys.get("wecom_welcome_message", "") or "").strip() ALLOWED = {str(x).strip() for x in mykeys.get("wecom_allowed_users", []) if str(x).strip()} - -_TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] -_PROCESSED_IDS = deque(maxlen=1000) -_USER_TASKS = {} +PROCESSED_IDS, USER_TASKS = deque(maxlen=1000), {} -def _clean(text): - for pat in _TAG_PATS: - text = re.sub(pat, "", text, flags=re.DOTALL) - return re.sub(r"\n{3,}", "\n\n", text).strip() or "..." +class WeComApp(AgentChatMixin): + label, source, split_limit = "WeCom", "wecom", 1200 - -def _extract_files(text): - return re.findall(r"\[FILE:([^\]]+)\]", text or "") - - -def _strip_files(text): - return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip() - - -def _split_text(text, limit=1200): - text = (text or "").strip() or "..." - parts = [] - while len(text) > limit: - cut = text.rfind("\n", 0, limit) - if cut < limit * 0.6: - cut = limit - parts.append(text[:cut].rstrip()) - text = text[cut:].lstrip() - if text: - parts.append(text) - return parts or ["..."] - - -def _format_restore(): - files = glob.glob("./temp/model_responses_*.txt") - if not files: - return None, "❌ 没有找到历史记录" - latest = max(files, key=os.path.getmtime) - with open(latest, "r", encoding="utf-8") as f: - content = f.read() - users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL) - resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL) - count, restored = 0, [] - for u, r in zip(users, resps): - u, r = u.strip(), r.strip()[:500] - if u and r: - restored.extend([f"[USER]: {u}", f"[Agent] {r}"]) - count += 1 - if not restored: - return None, "❌ 历史记录里没有可恢复内容" - return (restored, os.path.basename(latest), count), None - - -class WeComApp: def __init__(self): - self.client = None - self.chat_frames = {} - - def _body(self, frame): - if hasattr(frame, "body"): - return frame.body or {} - if isinstance(frame, dict): - return frame.get("body", frame) - return {} + super().__init__(agent, USER_TASKS) + self.client, self.chat_frames = None, {} async def send_text(self, chat_id, content): - if not self.client: + if not self.client or chat_id not in self.chat_frames: + if chat_id not in self.chat_frames: + print(f"[WeCom] no frame found for chat: {chat_id}") return - frame = self.chat_frames.get(chat_id) - if not frame: - print(f"[WeCom] no frame found for chat: {chat_id}") - return - for part in _split_text(content): - stream_id = generate_req_id("stream") - await self.client.reply_stream(frame, stream_id, part, finish=True) - - async def send_done(self, chat_id, raw_text): - files = [p for p in _extract_files(raw_text) if os.path.exists(p)] - body = _strip_files(_clean(raw_text)) - if files: - body = (body + "\n\n" if body else "") + "\n".join([f"生成文件: {p}" for p in files]) - await self.send_text(chat_id, body or "...") - - async def handle_command(self, chat_id, cmd): - parts = (cmd or "").split() - op = (parts[0] if parts else "").lower() - if op == "/stop": - state = _USER_TASKS.get(chat_id) - if state: - state["running"] = False - agent.abort() - await self.send_text(chat_id, "⏹️ 正在停止...") - elif op == "/status": - llm = agent.get_llm_name() if agent.llmclient else "未配置" - await self.send_text(chat_id, f"状态: {'🔴 运行中' if agent.is_running else '🟢 空闲'}\nLLM: [{agent.llm_no}] {llm}") - elif op == "/llm": - if not agent.llmclient: - return await self.send_text(chat_id, "❌ 当前没有可用的 LLM 配置") - if len(parts) > 1: - try: - n = int(parts[1]) - agent.next_llm(n) - await self.send_text(chat_id, f"✅ 已切换到 [{agent.llm_no}] {agent.get_llm_name()}") - except Exception: - await self.send_text(chat_id, f"用法: /llm <0-{len(agent.list_llms()) - 1}>") - else: - lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in agent.list_llms()] - await self.send_text(chat_id, "LLMs:\n" + "\n".join(lines)) - elif op == "/restore": - try: - restored_info, err = _format_restore() - if err: - return await self.send_text(chat_id, err) - restored, fname, count = restored_info - agent.abort() - agent.history.extend(restored) - await self.send_text(chat_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)") - except Exception as e: - await self.send_text(chat_id, f"❌ 恢复失败: {e}") - elif op == "/new": - agent.abort() - agent.history = [] - await self.send_text(chat_id, "🆕 已清空当前共享上下文") - else: - await self.send_text( - chat_id, - "📖 命令列表:\n/help - 显示帮助\n/status - 查看状态\n/stop - 停止当前任务\n/new - 清空当前上下文\n/restore - 恢复上次对话历史\n/llm [n] - 查看或切换模型", - ) - - async def run_agent(self, chat_id, text): - state = {"running": True} - _USER_TASKS[chat_id] = state - try: - await self.send_text(chat_id, "思考中...") - prompt = f"If you need to show files to user, use [FILE:filepath] in your response.\n\n{text}" - dq = agent.put_task(prompt, source="wecom") - last_ping = time.time() - while state["running"]: - try: - item = await asyncio.to_thread(dq.get, True, 3) - except Q.Empty: - if agent.is_running and time.time() - last_ping > 20: - await self.send_text(chat_id, "⏳ 还在处理中,请稍等...") - last_ping = time.time() - continue - if "done" in item: - await self.send_done(chat_id, item.get("done", "")) - break - if not state["running"]: - await self.send_text(chat_id, "⏹️ 已停止") - except Exception as e: - import traceback - - print(f"[WeCom] run_agent error: {e}") - traceback.print_exc() - await self.send_text(chat_id, f"❌ 错误: {e}") - finally: - _USER_TASKS.pop(chat_id, None) + frame = self.chat_frames[chat_id] + for part in split_text(content, self.split_limit): + await self.client.reply_stream(frame, generate_req_id("stream"), part, finish=True) async def on_text(self, frame): try: - body = self._body(frame) + body = frame.body if hasattr(frame, "body") else frame.get("body", frame) if isinstance(frame, dict) else {} if not isinstance(body, dict): return msg_id = body.get("msgid") or f"{body.get('chatid', '')}_{body.get('sendertime', '')}" - if msg_id in _PROCESSED_IDS: + if msg_id in PROCESSED_IDS: return - _PROCESSED_IDS.append(msg_id) + PROCESSED_IDS.append(msg_id) from_info = body.get("from", {}) if isinstance(body.get("from", {}), dict) else {} sender_id = str(from_info.get("userid", "") or "unknown") chat_id = str(body.get("chatid", "") or sender_id) content = str((body.get("text", {}) or {}).get("content", "") or "").strip() if not content: return - public_access = not ALLOWED or "*" in ALLOWED - if not public_access and sender_id not in ALLOWED: + if not public_access(ALLOWED) and sender_id not in ALLOWED: print(f"[WeCom] unauthorized user: {sender_id}") return self.chat_frames[chat_id] = frame print(f"[WeCom] message from {sender_id}: {content}") if content.startswith("/"): - await self.handle_command(chat_id, content) - return + return await self.handle_command(chat_id, content) asyncio.create_task(self.run_agent(chat_id, content)) except Exception: import traceback - print("[WeCom] handle_message error") traceback.print_exc() async def on_enter_chat(self, frame): - if not WELCOME or not self.client: - return - try: - await self.client.reply_welcome(frame, {"msgtype": "text", "text": {"content": WELCOME}}) - except Exception as e: - print(f"[WeCom] welcome error: {e}") + if WELCOME and self.client: + try: + await self.client.reply_welcome(frame, {"msgtype": "text", "text": {"content": WELCOME}}) + except Exception as e: + print(f"[WeCom] welcome error: {e}") async def on_connected(self, frame): print("[WeCom] connected") @@ -230,19 +84,16 @@ class WeComApp: print(f"[WeCom] error: {frame}") async def start(self): - self.client = WSClient({ - "bot_id": BOT_ID, - "secret": SECRET, - "reconnect_interval": 1000, - "max_reconnect_attempts": -1, - "heartbeat_interval": 30000, - }) - self.client.on("connected", self.on_connected) - self.client.on("authenticated", self.on_authenticated) - self.client.on("disconnected", self.on_disconnected) - self.client.on("error", self.on_error) - self.client.on("message.text", self.on_text) - self.client.on("event.enter_chat", self.on_enter_chat) + self.client = WSClient({"bot_id": BOT_ID, "secret": SECRET, "reconnect_interval": 1000, "max_reconnect_attempts": -1, "heartbeat_interval": 30000}) + for event, handler in { + "connected": self.on_connected, + "authenticated": self.on_authenticated, + "disconnected": self.on_disconnected, + "error": self.on_error, + "message.text": self.on_text, + "event.enter_chat": self.on_enter_chat, + }.items(): + self.client.on(event, handler) print("[WeCom] bot starting...") await self.client.connect_async() while True: @@ -250,25 +101,8 @@ class WeComApp: if __name__ == "__main__": - try: - _lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - _lock_sock.bind(("127.0.0.1", 19529)) - except OSError: - print("[WeCom] Another instance is already running, skipping...") - sys.exit(1) - - if not BOT_ID or not SECRET: - print("[WeCom] ERROR: please set wecom_bot_id and wecom_secret in mykey.py or mykey.json") - sys.exit(1) - if agent.llmclient is None: - print("[WeCom] ERROR: no usable LLM backend found in mykey.py or mykey.json") - sys.exit(1) - - log_dir = os.path.join(os.path.dirname(__file__), "temp") - os.makedirs(log_dir, exist_ok=True) - _logf = open(os.path.join(log_dir, "wecomapp.log"), "a", encoding="utf-8", buffering=1) - sys.stdout = sys.stderr = _logf - print("[NEW] WeCom process starting, the above are history infos ...") - print(f"[WeCom] allow list: {'public' if not ALLOWED or '*' in ALLOWED else sorted(ALLOWED)}") + _LOCK_SOCK = ensure_single_instance(19529, "WeCom") + require_runtime(agent, "WeCom", wecom_bot_id=BOT_ID, wecom_secret=SECRET) + redirect_log(__file__, "wecomapp.log", "WeCom", ALLOWED) threading.Thread(target=agent.run, daemon=True).start() asyncio.run(WeComApp().start()) From 44bda94a360364ee3ee4c4533553a07a457a8cc7 Mon Sep 17 00:00:00 2001 From: Xinyi Wang <90703326+ViviqwerAsd@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:51:50 +0800 Subject: [PATCH 2/2] Refactor Feishu adapter to trim duplicated helpers --- chatapp_common.py | 8 +++ fsapp.py | 175 ++++++++++++++-------------------------------- 2 files changed, 59 insertions(+), 124 deletions(-) diff --git a/chatapp_common.py b/chatapp_common.py index 2a7b9a6..88d345a 100644 --- a/chatapp_common.py +++ b/chatapp_common.py @@ -61,6 +61,14 @@ def public_access(allowed): return not allowed or "*" in allowed +def to_allowed_set(value): + if value is None: + return set() + if isinstance(value, str): + value = [value] + return {str(x).strip() for x in value if str(x).strip()} + + def allowed_label(allowed): return "public" if public_access(allowed) else sorted(allowed) diff --git a/fsapp.py b/fsapp.py index 48d592d..ec53b99 100644 --- a/fsapp.py +++ b/fsapp.py @@ -1,11 +1,4 @@ -import glob -import json -import os -import queue as Q -import re -import sys -import threading -import time +import json, os, re, sys, threading, time PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, PROJECT_ROOT) @@ -15,9 +8,9 @@ import lark_oapi as lark from lark_oapi.api.im.v1 import * from agentmain import GeneraticAgent +from chatapp_common import clean_reply, extract_files, format_restore, public_access, strip_files, to_allowed_set from llmcore import mykeys -_TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] _IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif"} _AUDIO_EXTS = {".opus", ".mp3", ".wav", ".m4a", ".aac"} _VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm"} @@ -39,37 +32,13 @@ MEDIA_DIR = os.path.join(TEMP_DIR, "feishu_media") os.makedirs(MEDIA_DIR, exist_ok=True) -def _clean(text): - for pat in _TAG_PATS: - text = re.sub(pat, "", text, flags=re.DOTALL) - return re.sub(r"\n{3,}", "\n\n", text).strip() or "..." - - -def _extract_files(text): - return re.findall(r"\[FILE:([^\]]+)\]", text or "") - - -def _strip_files(text): - return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip() - - def _display_text(text): - return _strip_files(_clean(text)) or "..." - - -def _to_allowed_set(value): - if value is None: - return set() - if isinstance(value, str): - value = [value] - return {str(x).strip() for x in value if str(x).strip()} + return strip_files(clean_reply(text)) or "..." def _parse_json(raw): - if not raw: - return {} try: - return json.loads(raw) + return json.loads(raw) if raw else {} except Exception: return {} @@ -229,20 +198,16 @@ def _extract_post_content(content_json): APP_ID = str(mykeys.get("fs_app_id", "") or "").strip() APP_SECRET = str(mykeys.get("fs_app_secret", "") or "").strip() -ALLOWED_USERS = _to_allowed_set(mykeys.get("fs_allowed_users", [])) -PUBLIC_ACCESS = not ALLOWED_USERS or "*" in ALLOWED_USERS +ALLOWED_USERS = to_allowed_set(mykeys.get("fs_allowed_users", [])) +PUBLIC_ACCESS = public_access(ALLOWED_USERS) agent = GeneraticAgent() threading.Thread(target=agent.run, daemon=True).start() client, user_tasks = None, {} -def create_client(): - return lark.Client.builder().app_id(APP_ID).app_secret(APP_SECRET).log_level(lark.LogLevel.INFO).build() - - -def _card(text): - return json.dumps({"config": {"wide_screen_mode": True}, "elements": [{"tag": "markdown", "content": text}]}, ensure_ascii=False) +def create_client(): return lark.Client.builder().app_id(APP_ID).app_secret(APP_SECRET).log_level(lark.LogLevel.INFO).build() +def _card(text): return json.dumps({"config": {"wide_screen_mode": True}, "elements": [{"tag": "markdown", "content": text}]}, ensure_ascii=False) def send_message(open_id, content, msg_type="text", use_card=False): @@ -272,6 +237,10 @@ def update_message(message_id, content): return response.success() +def _read_file_obj(response): + return response.file.read() if hasattr(response.file, "read") else response.file + + def _upload_image_sync(file_path): try: with open(file_path, "rb") as f: @@ -305,51 +274,29 @@ def _upload_file_sync(file_path): return None -def _download_image_sync(message_id, image_key): - try: - request = GetMessageResourceRequest.builder().message_id(message_id).file_key(image_key).type("image").build() - response = client.im.v1.message_resource.get(request) - if response.success(): - data = response.file.read() if hasattr(response.file, "read") else response.file - return data, response.file_name - print(f"[ERROR] download image failed: {response.code}, {response.msg}") - except Exception as e: - print(f"[ERROR] download image failed {image_key}: {e}") - return None, None - - -def _download_file_sync(message_id, file_key, resource_type="file"): - if resource_type == "audio": - resource_type = "file" +def _download_resource(message_id, file_key, resource_type, label): try: request = GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(resource_type).build() response = client.im.v1.message_resource.get(request) if response.success(): - data = response.file.read() if hasattr(response.file, "read") else response.file - return data, response.file_name - print(f"[ERROR] download {resource_type} failed: {response.code}, {response.msg}") + return _read_file_obj(response), response.file_name + print(f"[ERROR] download {label} failed: {response.code}, {response.msg}") except Exception as e: - print(f"[ERROR] download {resource_type} failed {file_key}: {e}") + print(f"[ERROR] download {label} failed {file_key}: {e}") return None, None def _download_and_save_media(msg_type, content_json, message_id): - data, filename = None, None - if msg_type == "image": - image_key = content_json.get("image_key") - if image_key and message_id: - data, filename = _download_image_sync(message_id, image_key) - if not filename: - filename = f"{image_key[:16]}.jpg" - elif msg_type in ("audio", "file", "media"): - file_key = content_json.get("file_key") - if file_key and message_id: - data, filename = _download_file_sync(message_id, file_key, msg_type) - if not filename: - filename = file_key[:16] - if msg_type == "audio" and filename and not filename.endswith(".opus"): - filename = f"{filename}.opus" - if data and filename: + file_key = content_json.get("image_key") if msg_type == "image" else content_json.get("file_key") + resource_type = "image" if msg_type == "image" else ("file" if msg_type == "audio" else msg_type) + default_ext = ".jpg" if msg_type == "image" else ".opus" if msg_type == "audio" else "" + if not (file_key and message_id): + return None, None + data, filename = _download_resource(message_id, file_key, resource_type, msg_type) + if data: + filename = filename or f"{file_key[:16]}{default_ext}" + if msg_type == "audio" and filename and not filename.endswith(".opus"): + filename += ".opus" file_path = os.path.join(MEDIA_DIR, os.path.basename(filename)) with open(file_path, "wb") as f: f.write(data) @@ -358,13 +305,7 @@ def _download_and_save_media(msg_type, content_json, message_id): def _describe_media(msg_type, file_path, filename): - if msg_type == "image": - return f"[image: {filename}]\n[Image: source: {file_path}]" - if msg_type == "audio": - return f"[audio: {filename}]\n[File: source: {file_path}]" - if msg_type in ("file", "media"): - return f"[{msg_type}: {filename}]\n[File: source: {file_path}]" - return f"[{msg_type}]\n[File: source: {file_path}]" + return f"[{msg_type}: {filename}]\n[{'Image' if msg_type == 'image' else 'File'}: source: {file_path}]" def _send_local_file(open_id, file_path): @@ -372,26 +313,30 @@ def _send_local_file(open_id, file_path): send_message(open_id, f"⚠️ 文件不存在: {file_path}") return False ext = os.path.splitext(file_path)[1].lower() - if ext in _IMAGE_EXTS: - image_key = _upload_image_sync(file_path) - if image_key: - send_message(open_id, json.dumps({"image_key": image_key}, ensure_ascii=False), msg_type="image") - return True - else: - file_key = _upload_file_sync(file_path) - if file_key: - msg_type = "media" if ext in _AUDIO_EXTS or ext in _VIDEO_EXTS else "file" - send_message(open_id, json.dumps({"file_key": file_key}, ensure_ascii=False), msg_type=msg_type) - return True + is_image = ext in _IMAGE_EXTS + file_key = _upload_image_sync(file_path) if is_image else _upload_file_sync(file_path) + if file_key: + key_name, msg_type = ("image_key", "image") if is_image else ("file_key", "media" if ext in _AUDIO_EXTS or ext in _VIDEO_EXTS else "file") + send_message(open_id, json.dumps({key_name: file_key}, ensure_ascii=False), msg_type=msg_type) + return True send_message(open_id, f"⚠️ 文件发送失败: {os.path.basename(file_path)}") return False def _send_generated_files(open_id, raw_text): - for file_path in _extract_files(raw_text): + for file_path in extract_files(raw_text): _send_local_file(open_id, file_path) +def _append_media(parts, image_paths, msg_type, file_path, filename): + if not (file_path and filename): + parts.append(f"[{msg_type}: download failed]") + return + parts.append(_describe_media(msg_type, file_path, filename)) + if msg_type == "image": + image_paths.append(file_path) + + def _build_user_message(message): msg_type = message.message_type message_id = message.message_id @@ -407,19 +352,10 @@ def _build_user_message(message): parts.append(text) for image_key in image_keys: file_path, filename = _download_and_save_media("image", {"image_key": image_key}, message_id) - if file_path and filename: - parts.append(_describe_media("image", file_path, filename)) - image_paths.append(file_path) - else: - parts.append("[image: download failed]") + _append_media(parts, image_paths, "image", file_path, filename) elif msg_type in ("image", "audio", "file", "media"): file_path, filename = _download_and_save_media(msg_type, content_json, message_id) - if file_path and filename: - parts.append(_describe_media(msg_type, file_path, filename)) - if msg_type == "image": - image_paths.append(file_path) - else: - parts.append(f"[{msg_type}: download failed]") + _append_media(parts, image_paths, msg_type, file_path, filename) elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"): parts.append(_extract_share_card_content(content_json, msg_type)) else: @@ -500,22 +436,13 @@ def handle_command(open_id, cmd): send_message(open_id, f"状态: {'🟢 空闲' if not agent.is_running else '🔴 运行中'}") elif cmd == "/restore": try: - files = glob.glob("./temp/model_responses_*.txt") - if not files: - return send_message(open_id, "❌ 没有找到历史记录") - latest = max(files, key=os.path.getmtime) - with open(latest, "r", encoding="utf-8") as f: - content = f.read() - users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL) - resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL) - count = 0 - for u, r in zip(users, resps): - u, r = u.strip(), r.strip()[:500] - if u and r: - agent.history.extend([f"[USER]: {u}", f"[Agent] {r}"]) - count += 1 + restored_info, err = format_restore() + if err: + return send_message(open_id, err) + restored, fname, count = restored_info agent.abort() - send_message(open_id, f"✅ 已恢复 {count} 轮对话\n来源: {os.path.basename(latest)}\n(仅恢复上下文,请输入新问题继续)") + agent.history.extend(restored) + send_message(open_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)") except Exception as e: send_message(open_id, f"❌ 恢复失败: {e}") else: