diff --git a/frontends/tgapp.py b/frontends/tgapp.py index c222c6b..24dec1b 100644 --- a/frontends/tgapp.py +++ b/frontends/tgapp.py @@ -102,7 +102,7 @@ class _TelegramStreamSession: def __init__(self, root_msg): self.root_msg = root_msg self.private_chat = getattr(getattr(root_msg, "chat", None), "type", "") == ChatType.PRIVATE - self.can_use_draft = self.private_chat + self.can_use_draft = False # can not use or streaming dead self.draft_id = _make_draft_id() self.live_msg = None self.raw_text = "" @@ -244,16 +244,12 @@ async def _stream(dq, msg): await session.prime() try: while True: - try: - first = await asyncio.to_thread(dq.get, True, _QUEUE_WAIT_SECONDS) - except Q.Empty: - continue + try: first = await asyncio.to_thread(dq.get, True, _QUEUE_WAIT_SECONDS) + except Q.Empty: continue items = [first] try: - while True: - items.append(dq.get_nowait()) - except Q.Empty: - pass + while True: items.append(dq.get_nowait()) + except Q.Empty: pass done_item = next((item for item in items if "done" in item), None) if done_item is not None: await session.finalize(done_item.get("done", "")) @@ -267,20 +263,16 @@ async def _stream(dq, msg): print(f"[TG stream error] {type(exc).__name__}: {exc}", flush=True) await session.finish_with_notice(f"❌ 输出失败: {exc}") - def _normalized_command(text): parts = (text or "").strip().split(None, 1) - if not parts: - return '' + if not parts: return '' head = parts[0].lower() - if head.startswith('/'): - head = '/' + head[1:].split('@', 1)[0] + if head.startswith('/'): head = '/' + head[1:].split('@', 1)[0] return head + (f" {parts[1].strip()}" if len(parts) > 1 and parts[1].strip() else '') def _cancel_stream_task(ctx): task = ctx.user_data.pop('stream_task', None) - if task and not task.done(): - task.cancel() + if task and not task.done(): task.cancel() async def _sync_commands(application): await application.bot.set_my_commands([BotCommand(command, description) for command, description in TELEGRAM_MENU_COMMANDS]) @@ -314,14 +306,22 @@ async def cmd_llm(update, ctx): async def handle_photo(update, ctx): uid = update.effective_user.id - if ALLOWED and uid not in ALLOWED: - return await update.message.reply_text("no") - photo = update.message.photo[-1] - file = await photo.get_file() - fpath = f"tg_{photo.file_unique_id}.jpg" + if ALLOWED and uid not in ALLOWED: return await update.message.reply_text("no") + if update.message.photo: + photo = update.message.photo[-1] + file = await photo.get_file() + fpath = f"tg_{photo.file_unique_id}.jpg" + kind = "图片" + elif update.message.document: + doc = update.message.document + file = await doc.get_file() + ext = os.path.splitext(doc.file_name or '')[1] or '' + fpath = f"tg_{doc.file_unique_id}{ext}" + kind = "文件" + else: return await file.download_to_drive(os.path.join(_TEMP_DIR, fpath)) caption = update.message.caption - prompt = f"[TIPS] 收到图片temp/{fpath}\n{caption}" if caption else f"[TIPS] 收到图片temp/{fpath},请等待下一步指令" + prompt = f"[TIPS] 收到{kind}temp/{fpath}\n{caption}" if caption else f"[TIPS] 收到{kind}temp/{fpath},请等待下一步指令" dq = agent.put_task(prompt, source="telegram") task = asyncio.create_task(_stream(dq, update.message)) ctx.user_data['stream_task'] = task @@ -386,6 +386,7 @@ if __name__ == '__main__': .request(request).get_updates_request(request).post_init(_sync_commands).build()) app.add_handler(MessageHandler(filters.COMMAND, handle_command)) app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) + app.add_handler(MessageHandler(filters.Document.ALL, handle_photo)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_msg)) app.add_error_handler(_error_handler) app.run_polling(drop_pending_updates=True, poll_interval=1.0, timeout=30) diff --git a/frontends/wechatapp.py b/frontends/wechatapp.py index 1cbf3c4..3c75007 100644 --- a/frontends/wechatapp.py +++ b/frontends/wechatapp.py @@ -289,26 +289,33 @@ def on_message(bot, msg): dq = agent.put_task(prompt, source="wechat") try: bot.send_typing(uid) except: pass - # Wait for completion - result = '' + result = ''; sent = 0; mi = 0; last_turns = 0; last_send = 0 + def _wx_send(text): + try: bot.send_text(uid, text.strip(), context_token=ctx); return True + except Exception as e: + print(f'[WX] send maybe-ok: {e}', file=sys.__stdout__); return True + def _flush(show, final=False): + nonlocal sent, mi, last_send + now = time.time() + if mi < 9 and sent < len(show) and (mi == 0 or now - last_send >= 6): + chunk = show[sent:sent+900]; sent += len(chunk); mi += 1 + if chunk.strip() and _wx_send(chunk): last_send = time.time() + if final: + rest = (show[sent:] + '\n\n[Info] 任务完成')[-1800:] + if rest.strip(): _wx_send(rest) try: while True: item = dq.get(timeout=300) if 'done' in item: result = item['done']; break + raw = item.get('next', '') + turns = raw.count('LLM Running') + if turns > last_turns: + last_turns = turns; _flush(_clean(raw)) except queue.Empty: result = '[超时]' + show = _clean(result); _flush(show, final=True) files = re.findall(r'\[FILE:([^\]]+)\]', result) bad = {'filepath', '', 'path', '', 'file_path', '', '...'} files = [f for f in files if f.strip().lower() not in bad and (f if os.path.isabs(f) else os.path.join(_TEMP_DIR, f)) not in media_paths] - show = _clean(result) - chunks = _split(show) - _MAX_MSGS = 6 - if len(chunks) > _MAX_MSGS: - keep = chunks[:3] + [f'...(省略{len(chunks) - 5}条)...'] + chunks[-2:] - chunks = keep - for chunk in chunks: - try: bot.send_text(uid, chunk, context_token=ctx) - except Exception as e: print(f'[WX] send err: {e}', file=sys.__stdout__) - time.sleep(0.3) for fpath in set(files): if not os.path.isabs(fpath): fpath = os.path.join(_TEMP_DIR, fpath) try: diff --git a/llmcore.py b/llmcore.py index ab1d3b6..2cd5e4d 100644 --- a/llmcore.py +++ b/llmcore.py @@ -74,6 +74,12 @@ def _sanitize_leading_user_msg(msg): msg['content'] = [{"type": "text", "text": '\n'.join(t for t in texts if t)}] return msg +_oldprint = print +def safeprint(*argv): + try: _oldprint(*argv) + except OSError: pass +print = safeprint + def trim_messages_history(history, context_win): compress_history_tags(history) cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) @@ -337,6 +343,7 @@ def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completion payload = {"model": model, "input": _to_responses_input(messages), "stream": stream, "prompt_cache_key": _RESP_CACHE_KEY, "instructions": system or "You are an Omnipotent Executor."} if reasoning_effort: payload["reasoning"] = {"effort": reasoning_effort} + if max_tokens: payload["max_output_tokens"] = max_tokens else: url = auto_make_url(api_base, "chat/completions") if system: messages = [{"role": "system", "content": system}] + messages @@ -502,7 +509,7 @@ class BaseSession: mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_') self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions' self.temperature = cfg.get('temperature', 1) - self.max_tokens = cfg.get('max_tokens', 8192) + self.max_tokens = cfg.get('max_tokens') def _apply_claude_thinking(self, payload): if self.thinking_type: thinking = {"type": self.thinking_type} @@ -543,6 +550,7 @@ def _drop_unsigned_thinking(messages): class ClaudeSession(BaseSession): def raw_ask(self, messages): + if self.max_tokens is None: self.max_tokens = 8192 headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} payload = {"model": self.model, "messages": messages, "max_tokens": self.max_tokens, "stream": True} if self.temperature != 1: payload["temperature"] = self.temperature @@ -599,6 +607,7 @@ class NativeClaudeSession(BaseSession): self.tools = None def raw_ask(self, messages): messages = _drop_unsigned_thinking(_fix_messages(messages)) + if self.max_tokens is None: self.max_tokens = 8192 model = self.model beta_parts = ["claude-code-20250219", "interleaved-thinking-2025-05-14", "redact-thinking-2026-02-12", "prompt-caching-scope-2026-01-05"] if "[1m]" in model.lower():