feat(tgapp): stream MarkdownV2 replies with draft fallback and file segments (#135)

This commit is contained in:
YooooEX
2026-04-24 09:01:49 +08:00
committed by GitHub
parent fd46d61543
commit f516c52a72

View File

@@ -1,91 +1,275 @@
import os, sys, re, threading, asyncio, queue as Q, socket, time import os, sys, re, threading, asyncio, queue as Q, socket, time, random
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
_TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'temp') _TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'temp')
from agentmain import GeneraticAgent from agentmain import GeneraticAgent
try: try:
from telegram import BotCommand from telegram import BotCommand
from telegram.constants import ChatType, MessageLimit, ParseMode
from telegram.ext import ApplicationBuilder, MessageHandler, filters, ContextTypes from telegram.ext import ApplicationBuilder, MessageHandler, filters, ContextTypes
from telegram.helpers import escape_markdown
from telegram.request import HTTPXRequest from telegram.request import HTTPXRequest
except: except:
print("Please ask the agent install python-telegram-bot to use telegram module.") print("Please ask the agent install python-telegram-bot to use telegram module.")
sys.exit(1) sys.exit(1)
from chatapp_common import HELP_TEXT, TELEGRAM_MENU_COMMANDS, format_restore from chatapp_common import (
FILE_HINT,
HELP_TEXT,
TELEGRAM_MENU_COMMANDS,
clean_reply,
extract_files,
format_restore,
split_text,
strip_files,
)
from continue_cmd import handle_frontend_command, reset_conversation from continue_cmd import handle_frontend_command, reset_conversation
from llmcore import mykeys from llmcore import mykeys
agent = GeneraticAgent() agent = GeneraticAgent()
agent.verbose = False agent.verbose = False
agent.inc_out = True
ALLOWED = set(mykeys.get('tg_allowed_users', [])) ALLOWED = set(mykeys.get('tg_allowed_users', []))
_TAG_PATS = [r'<' + t + r'>.*?</' + t + r'>' for t in ('thinking', 'summary', 'tool_use')] _DRAFT_HINT = "thinking..."
_TAG_PATS.append(r'<file_content>.*?</file_content>') _STREAM_SUFFIX = ""
_STREAM_SEGMENT_LIMIT = max(1200, MessageLimit.MAX_TEXT_LENGTH - 256)
_QUEUE_WAIT_SECONDS = 1
_MD_TOKEN_RE = re.compile(
r"(`{3,})([A-Za-z0-9_+-]*)\n([\s\S]*?)\1"
r"|\[([^\]]+)\]\(([^)\n]+)\)"
r"|`([^`\n]+)`"
r"|\*\*([^\n]+?)\*\*"
r"|(?<!\*)\*(?!\*)([^\n]+?)(?<!\*)\*(?!\*)",
re.DOTALL,
)
def _clean(t): def _make_draft_id():
for p in _TAG_PATS: return random.randint(1, 2**31 - 1)
t = re.sub(p, '', t, flags=re.DOTALL)
return re.sub(r'\n{3,}', '\n\n', t).strip() or '...'
import html as _html def _visible_segments(text):
def _inline_md(s): text = (text or "").strip()
s = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', s) return split_text(text, _STREAM_SEGMENT_LIMIT) if text else []
s = re.sub(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)', r'<i>\1</i>', s)
s = re.sub(r'`([^`]+)`', r'<code>\1</code>', s) def _resolve_files(paths):
return s files, seen = [], set()
def _to_html(t): for fpath in paths:
if not os.path.isabs(fpath):
fpath = os.path.join(_TEMP_DIR, fpath)
if fpath in seen or not os.path.exists(fpath):
continue
files.append(fpath)
seen.add(fpath)
return files
def _escape_pre(text):
return escape_markdown(text or "", version=2, entity_type="pre")
def _escape_code(text):
return escape_markdown(text or "", version=2, entity_type="code")
def _escape_link_target(text):
return escape_markdown(text or "", version=2, entity_type="text_link")
def _to_markdown_v2(text):
if not text:
return ""
parts, pos = [], 0 parts, pos = [], 0
for m in re.finditer(r'(`{3,})(?:\w*\n)?([\s\S]*?)\1', t): for match in _MD_TOKEN_RE.finditer(text):
parts.append(_inline_md(_html.escape(t[pos:m.start()]))) parts.append(escape_markdown(text[pos:match.start()], version=2))
parts.append('<pre><code>' + _html.escape(m.group(2)) + '</code></pre>') if match.group(1):
pos = m.end() lang = re.sub(r"[^A-Za-z0-9_+-]", "", match.group(2) or "")
parts.append(_inline_md(_html.escape(t[pos:]))) code = _escape_pre(match.group(3) or "")
return ''.join(parts) header = f"```{lang}\n" if lang else "```\n"
parts.append(f"{header}{code}\n```")
elif match.group(4) is not None:
label = escape_markdown(match.group(4), version=2)
target = _escape_link_target(match.group(5))
parts.append(f"[{label}]({target})")
elif match.group(6) is not None:
parts.append(f"`{_escape_code(match.group(6))}`")
elif match.group(7) is not None:
parts.append(f"*{escape_markdown(match.group(7), version=2)}*")
elif match.group(8) is not None:
parts.append(f"_{escape_markdown(match.group(8), version=2)}_")
pos = match.end()
parts.append(escape_markdown(text[pos:], version=2))
return "".join(parts)
def _is_not_modified_error(exc):
return "not modified" in str(exc).lower()
class _TelegramStreamSession:
def __init__(self, root_msg):
self.root_msg = root_msg
self.private_chat = getattr(getattr(root_msg, "chat", None), "type", "") == ChatType.PRIVATE
self.can_use_draft = self.private_chat
self.draft_id = _make_draft_id()
self.live_msg = None
self.raw_text = ""
self.files = []
self.sent_segments = 0
self.active_display = ""
async def prime(self):
if self.can_use_draft and await self._send_draft(_DRAFT_HINT):
self.active_display = _DRAFT_HINT
return
await self._upsert_live_message(_DRAFT_HINT)
self.active_display = _DRAFT_HINT
async def add_chunk(self, chunk):
if not chunk:
return
self.raw_text += chunk
await self._refresh(done=False, send_files=False)
async def finalize(self, full_text=None, send_files=True):
if full_text is not None:
self.raw_text = full_text
await self._refresh(done=True, send_files=send_files)
async def finish_with_notice(self, notice):
if self.raw_text.strip():
await self.finalize(send_files=False)
await self._reply_text(notice)
return
if self.live_msg is not None:
await self._edit_text(self.live_msg, notice)
self.live_msg = None
self.active_display = ""
return
await self._reply_text(notice)
self.active_display = ""
async def _refresh(self, done, send_files):
cleaned = clean_reply(self.raw_text) if self.raw_text.strip() else ""
self.files = _resolve_files(extract_files(cleaned))
body = strip_files(cleaned)
if done and not body and self.files:
body = "已生成附件"
elif done and not body:
body = "..."
segments = _visible_segments(body)
finalized_target = len(segments) if done else max(len(segments) - 1, 0)
while self.sent_segments < finalized_target:
await self._finalize_segment(segments[self.sent_segments])
self.sent_segments += 1
if done:
if send_files:
await self._send_files()
return
active_text = segments[-1] if segments else _DRAFT_HINT
await self._stream_active(active_text)
async def _stream_active(self, text):
display = (text or _DRAFT_HINT).strip() or _DRAFT_HINT
if display != _DRAFT_HINT:
display = display + _STREAM_SUFFIX
if display == self.active_display:
return
if self.can_use_draft and await self._send_draft(display):
self.active_display = display
return
await self._upsert_live_message(display)
self.active_display = display
async def _finalize_segment(self, text):
final_text = (text or "").strip() or "..."
if self.live_msg is not None:
await self._edit_text(self.live_msg, final_text)
self.live_msg = None
else:
await self._reply_text(final_text)
self.active_display = ""
if self.can_use_draft:
self.draft_id = _make_draft_id()
async def _send_files(self):
for fpath in self.files:
if fpath.lower().endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")):
try:
with open(fpath, "rb") as fp:
await self.root_msg.reply_photo(fp)
except Exception: pass
else:
try:
with open(fpath, "rb") as fp:
await self.root_msg.reply_document(fp)
except Exception: pass
async def _send_draft(self, text):
try:
await self.root_msg.reply_text_draft(
self.draft_id,
_to_markdown_v2(text),
parse_mode=ParseMode.MARKDOWN_V2,
)
return True
except Exception as exc:
if _is_not_modified_error(exc):
return True
print(f"[TG draft fallback] {type(exc).__name__}: {exc}", flush=True)
self.can_use_draft = False
self.draft_id = _make_draft_id()
return False
async def _reply_text(self, text):
markdown = _to_markdown_v2(text)
try:
return await self.root_msg.reply_text(markdown, parse_mode=ParseMode.MARKDOWN_V2)
except Exception as exc:
if _is_not_modified_error(exc):
return None
return await self.root_msg.reply_text(text)
async def _edit_text(self, msg, text):
markdown = _to_markdown_v2(text)
try:
updated = await msg.edit_text(markdown, parse_mode=ParseMode.MARKDOWN_V2)
except Exception as exc:
if _is_not_modified_error(exc):
return msg
updated = await msg.edit_text(text)
return updated if hasattr(updated, "edit_text") else msg
async def _upsert_live_message(self, text):
if self.live_msg is None:
self.live_msg = await self._reply_text(text)
else:
self.live_msg = await self._edit_text(self.live_msg, text)
async def _stream(dq, msg): async def _stream(dq, msg):
last_text = "" session = _TelegramStreamSession(msg)
while True: await session.prime()
await asyncio.sleep(3) try:
item = None while True:
try: try:
while True: item = dq.get_nowait() first = await asyncio.to_thread(dq.get, True, _QUEUE_WAIT_SECONDS)
except Q.Empty: pass except Q.Empty:
if item is None: continue continue
raw = item.get("done") or item.get("next", "") items = [first]
done = "done" in item try:
show = _clean(raw) while True:
if len(show) > 4000: items.append(dq.get_nowait())
# freeze current msg, start a new one except Q.Empty:
try: msg = await msg.reply_text("(continued...)") pass
except Exception: pass done_item = next((item for item in items if "done" in item), None)
last_text = "" if done_item is not None:
show = show[-3900:] await session.finalize(done_item.get("done", ""))
display = show if done else show + "" break
if display != last_text: chunk = "".join(item.get("next", "") for item in items if item.get("next"))
try: await msg.edit_text(_to_html(display), parse_mode='HTML') if chunk:
except Exception: await session.add_chunk(chunk)
try: await msg.edit_text(display) except asyncio.CancelledError:
except Exception: pass await session.finish_with_notice("⏹️ 已停止")
last_text = display except Exception as exc:
if done: print(f"[TG stream error] {type(exc).__name__}: {exc}", flush=True)
files = re.findall(r'\[FILE:([^\]]+)\]', show[-1000:]) await session.finish_with_notice(f"❌ 输出失败: {exc}")
for fpath in files:
if not os.path.isabs(fpath): fpath = os.path.join(_TEMP_DIR, fpath)
if os.path.exists(fpath):
if fpath.lower().endswith(('.png','.jpg','.jpeg','.gif','.webp')):
try: await msg.reply_photo(open(fpath,'rb'))
except Exception: pass
else:
try: await msg.reply_document(open(fpath,'rb'))
except Exception: pass
show = re.sub(r'\[FILE:[^\]]+\]', '', show)
if show.strip():
try: await msg.edit_text(_to_html(show), parse_mode='HTML')
except Exception:
try: await msg.edit_text(show)
except Exception: pass
break
def _normalized_command(text): def _normalized_command(text):
parts = (text or '').strip().split(None, 1) parts = (text or "").strip().split(None, 1)
if not parts: if not parts:
return '' return ''
head = parts[0].lower() head = parts[0].lower()
@@ -105,10 +289,9 @@ async def handle_msg(update, ctx):
uid = update.effective_user.id uid = update.effective_user.id
if ALLOWED and uid not in ALLOWED: if ALLOWED and uid not in ALLOWED:
return await update.message.reply_text("no") return await update.message.reply_text("no")
msg = await update.message.reply_text("thinking...") prompt = f"{FILE_HINT}\n\n{update.message.text}"
prompt = f"If you need to show files to user, use [FILE:filepath] in your response.\n\n{update.message.text}"
dq = agent.put_task(prompt, source="telegram") dq = agent.put_task(prompt, source="telegram")
task = asyncio.create_task(_stream(dq, msg)) task = asyncio.create_task(_stream(dq, update.message))
ctx.user_data['stream_task'] = task ctx.user_data['stream_task'] = task
async def cmd_abort(update, ctx): async def cmd_abort(update, ctx):
@@ -139,9 +322,8 @@ async def handle_photo(update, ctx):
await file.download_to_drive(os.path.join(_TEMP_DIR, fpath)) await file.download_to_drive(os.path.join(_TEMP_DIR, fpath))
caption = update.message.caption caption = update.message.caption
prompt = f"[TIPS] 收到图片temp/{fpath}\n{caption}" if caption else f"[TIPS] 收到图片temp/{fpath},请等待下一步指令" prompt = f"[TIPS] 收到图片temp/{fpath}\n{caption}" if caption else f"[TIPS] 收到图片temp/{fpath},请等待下一步指令"
msg = await update.message.reply_text("thinking...")
dq = agent.put_task(prompt, source="telegram") dq = agent.put_task(prompt, source="telegram")
task = asyncio.create_task(_stream(dq, msg)) task = asyncio.create_task(_stream(dq, update.message))
ctx.user_data['stream_task'] = task ctx.user_data['stream_task'] = task
async def handle_command(update, ctx): async def handle_command(update, ctx):