From b34cffe8019d82041d2962ece3cf5bdba97fc815 Mon Sep 17 00:00:00 2001 From: weijia <126332148+wjl2023@users.noreply.github.com> Date: Wed, 22 Apr 2026 14:12:51 +0800 Subject: [PATCH] feat(fsapp): render agent run as one continuously-updating Feishu card (#131) Rework the Feishu frontend so each user turn renders as a single collapsible task card that patches itself in place, replacing the dq-based streaming path that produced many fragmented messages. - One _TaskCard per turn; hook reacts to summary / exit_reason events from the agent loop and patches the same card. - Each step is a foldable panel: header shows the summary, expanding reveals three sections (auto-hidden when empty): * Thinking - from response.thinking (separate field, not content) * Tool Calls - tool name + truncated JSON args * Output - response.content, with protocol tags stripped so the header summary is not duplicated inside - Final reply rendered as a schema 2.0 markdown card for consistency. - Code-review pass per code_review_principles.md: * _TaskCard owns only stateful card lifecycle (start/step/done/fail) * Pure formatting extracted to module-level _build_step_detail and _fmt_tool_call (no more reaching into card._private from the hook) * Hook is a ~10-line dispatcher * Flattened a 4-level nested lambda into a named function --- frontends/fsapp.py | 219 +++++++++++++++++++++++++++++++-------------- 1 file changed, 152 insertions(+), 67 deletions(-) diff --git a/frontends/fsapp.py b/frontends/fsapp.py index 659881a..515b24e 100644 --- a/frontends/fsapp.py +++ b/frontends/fsapp.py @@ -8,6 +8,7 @@ from frontends.chatapp_common import format_restore from frontends.continue_cmd import handle_frontend_command as handle_continue_frontend, reset_conversation from llmcore import mykeys +import traceback import lark_oapi as lark from lark_oapi.api.im.v1 import * @@ -225,6 +226,7 @@ APP_ID = str(mykeys.get("fs_app_id", "") or "").strip() APP_SECRET = str(mykeys.get("fs_app_secret", "") or "").strip() ALLOWED_USERS = _to_allowed_set(mykeys.get("fs_allowed_users", [])) PUBLIC_ACCESS = not ALLOWED_USERS or "*" in ALLOWED_USERS +AGENT_TIMEOUT_SEC = 900 agent = GeneraticAgent() threading.Thread(target=agent.run, daemon=True).start() @@ -235,35 +237,49 @@ def create_client(): return lark.Client.builder().app_id(APP_ID).app_secret(APP_SECRET).log_level(lark.LogLevel.INFO).build() +def _card_raw(elements): + return json.dumps({ + "schema": "2.0", + "config": {"streaming_mode": False, "width_mode": "fill"}, + "body": {"elements": elements}, + }, ensure_ascii=False) + + def _card(text): - return json.dumps({"config": {"wide_screen_mode": True}, "elements": [{"tag": "markdown", "content": text}]}, ensure_ascii=False) + return _card_raw([{"tag": "markdown", "content": text}]) + + +def _send_raw(receive_id, payload, msg_type, rtype): + body = CreateMessageRequest.builder().receive_id_type(rtype).request_body( + CreateMessageRequestBody.builder().receive_id(receive_id).msg_type(msg_type).content(payload).build() + ).build() + r = client.im.v1.message.create(body) + if r.success(): + return r.data.message_id if r.data else None + print(f"发送失败: {r.code}, {r.msg}") + return None + + +def _patch_card(message_id, card_json): + body = PatchMessageRequest.builder().message_id(message_id).request_body( + PatchMessageRequestBody.builder().content(card_json).build() + ).build() + r = client.im.v1.message.patch(body) + if not r.success(): + print(f"[ERROR] patch_card 失败: {r.code}, {r.msg}") + return r.success() def send_message(receive_id, content, msg_type="text", use_card=False, receive_id_type="open_id"): if use_card: - payload, real_type = _card(content), "interactive" - elif msg_type == "text": - payload, real_type = json.dumps({"text": content}, ensure_ascii=False), "text" - else: - payload, real_type = content, msg_type - body = CreateMessageRequest.builder().receive_id_type(receive_id_type).request_body( - CreateMessageRequestBody.builder().receive_id(receive_id).msg_type(real_type).content(payload).build() - ).build() - response = client.im.v1.message.create(body) - if response.success(): - return response.data.message_id if response.data else None - print(f"发送失败: {response.code}, {response.msg}") - return None + return _send_raw(receive_id, _card(content), "interactive", receive_id_type) + if msg_type == "text": + return _send_raw(receive_id, json.dumps({"text": content}, ensure_ascii=False), "text", receive_id_type) + return _send_raw(receive_id, content, msg_type, receive_id_type) def update_message(message_id, content): - body = PatchMessageRequest.builder().message_id(message_id).request_body( - PatchMessageRequestBody.builder().content(_card(content)).build() - ).build() - response = client.im.v1.message.patch(body) - if not response.success(): - print(f"[ERROR] update_message 失败: {response.code}, {response.msg}") - return response.success() + return _patch_card(message_id, _card(content)) def _upload_image_sync(file_path): @@ -421,6 +437,100 @@ def _build_user_message(message): return "\n".join([p for p in parts if p]).strip(), image_paths +def _fmt_tool_call(tc): + name = tc.get('tool_name', '?') + args = {k: v for k, v in (tc.get('args') or {}).items() if not k.startswith('_')} + return f"- `{name}`({json.dumps(args, ensure_ascii=False)[:200]})" + + +def _build_step_detail(resp, tool_calls): + """从 LLM response + tool_calls 组装单步展开详情(纯函数)。""" + parts = [] + thinking = (getattr(resp, 'thinking', '') or '').strip() if resp else '' + if thinking: + parts.append(f"### 💭 Thinking\n{thinking}") + if tool_calls: + parts.append("### 🛠 Tool Calls\n" + "\n".join(_fmt_tool_call(tc) for tc in tool_calls)) + content = _display_text((getattr(resp, 'content', '') or '')).strip() if resp else '' + if content and content != '...': + parts.append(f"### 📝 Output\n{content}") + return "\n\n".join(parts) + + +class _TaskCard: + """飞书任务卡片:单卡片持续 patch;每步一个独立折叠面板(header 显示 summary,展开看详情)。""" + _DETAIL_LIMIT = 8000 + + def __init__(self, receive_id, rid_type): + self.rid, self.rtype = receive_id, rid_type + self.steps = [] # [(summary, detail), ...] + self.status = "🤔 思考中..." + self.final = None + self.msg_id = None + + def _step_panel(self, idx, summary, detail): + detail = detail or "_(无输出)_" + if len(detail) > self._DETAIL_LIMIT: + detail = detail[:self._DETAIL_LIMIT] + f"\n\n…(已截断,共 {len(detail)} 字符)" + return { + "tag": "collapsible_panel", "expanded": False, + "header": {"title": {"tag": "plain_text", "content": f"Turn {idx} · {summary}"}}, + "elements": [{"tag": "markdown", "content": detail}], + } + + def _build(self): + els = [{"tag": "markdown", "content": f"**{self.status}**"}] + for i, (s, d) in enumerate(self.steps, 1): + els.append(self._step_panel(i, s, d)) + if self.final: + els += [{"tag": "hr"}, {"tag": "markdown", "content": self.final}] + return _card_raw(els) + + def _push(self): + card = self._build() + if self.msg_id: + _patch_card(self.msg_id, card) + else: + self.msg_id = _send_raw(self.rid, card, "interactive", self.rtype) + + # ── 公开接口 ── + + def start(self): + self._push() + + def step(self, summary, detail=""): + self.steps.append((summary, detail)) + self.status = f"⏳ 工作中 · Turn {len(self.steps)}" + self._push() + + def done(self, text): + self.status = "✅ 已完成" + self.final = text or "_(无文本输出)_" + self._push() + + def fail(self, msg): + self.status = f"❌ {msg}" + self._push() + + +def _make_task_hook(card, done_event, on_final): + """飞书任务 hook:每轮 patch 卡片状态;结束触发 on_final(raw) 处理附件。""" + def hook(ctx): + try: + if ctx.get('exit_reason'): + resp = ctx.get('response') + raw = resp.content if hasattr(resp, 'content') else str(resp) + card.done(_display_text(raw)) + on_final(raw) + done_event.set() + elif ctx.get('summary'): + detail = _build_step_detail(ctx.get('response'), ctx.get('tool_calls') or []) + card.step(ctx['summary'], detail) + except Exception as e: + print(f"[fs hook] error: {e}") + return hook + + def handle_message(data): event, message, sender = data.event, data.event.message, data.event.sender open_id = sender.sender_id.open_id @@ -441,57 +551,32 @@ def handle_message(data): def run_agent(): user_tasks[open_id] = {"running": True} + receive_id = chat_id or open_id + rid_type = "chat_id" if chat_id else "open_id" + done_event = threading.Event() + hook_key = f"fs_{open_id}" + card = _TaskCard(receive_id, rid_type) + card.start() + on_final = lambda raw: _send_generated_files(receive_id, raw, receive_id_type=rid_type) + if not hasattr(agent, '_turn_end_hooks'): agent._turn_end_hooks = {} + agent._turn_end_hooks[hook_key] = _make_task_hook(card, done_event, on_final) try: - if chat_id: - msg_id, dq, last_text = send_message(chat_id, "思考中...", use_card=True, receive_id_type="chat_id"), agent.put_task(user_input, source="feishu", images=image_paths), "" - else: - msg_id, dq, last_text = send_message(open_id, "思考中...", use_card=True), agent.put_task(user_input, source="feishu", images=image_paths), "" - while user_tasks.get(open_id, {}).get("running", False): - time.sleep(3) - item = None - try: - while True: - item = dq.get_nowait() - except Exception: - pass - if item is None: - continue - raw = item.get("done") or item.get("next", "") - done = "done" in item - show = _display_text(raw) - if len(show) > 3500: - cut = show[-3000:] - if cut.count("```") % 2 == 1: - cut = "```\n" + cut - if chat_id: - msg_id, last_text, show = send_message(chat_id, "(继续...)", use_card=True, receive_id_type="chat_id"), "", cut - else: - msg_id, last_text, show = send_message(open_id, "(继续...)", use_card=True), "", cut - display = show if done else show + " ⏳" - if display != last_text and msg_id: - update_message(msg_id, display) - last_text = display - if done: - if chat_id: - _send_generated_files(chat_id, raw, receive_id_type="chat_id") - else: - _send_generated_files(open_id, raw) + agent.put_task(user_input, source="feishu", images=image_paths) + start = time.time() + while not done_event.wait(timeout=3): + if not user_tasks.get(open_id, {}).get("running", True): + agent.abort() + card.fail("已停止") + break + if time.time() - start > AGENT_TIMEOUT_SEC: + agent.abort() + card.fail("任务超时") break - if not user_tasks.get(open_id, {}).get("running", True): - if chat_id: - send_message(chat_id, "已停止", receive_id_type="chat_id") - else: - send_message(open_id, "已停止") except Exception as e: - import traceback - - print(f"[ERROR] run_agent 异常: {e}") traceback.print_exc() - if chat_id: - send_message(chat_id, f"错误: {str(e)}", receive_id_type="chat_id") - else: - send_message(open_id, f"错误: {str(e)}") + card.fail(f"错误: {e}") finally: + agent._turn_end_hooks.pop(hook_key, None) user_tasks.pop(open_id, None) threading.Thread(target=run_agent, daemon=True).start()