Files
GenericAgent/fsapp.py
2026-03-13 16:51:50 +08:00

466 lines
18 KiB
Python

import json, os, re, sys, threading, time
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, PROJECT_ROOT)
os.chdir(PROJECT_ROOT)
import lark_oapi as lark
from lark_oapi.api.im.v1 import *
from agentmain import GeneraticAgent
from chatapp_common import clean_reply, extract_files, format_restore, public_access, strip_files, to_allowed_set
from llmcore import mykeys
_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif"}
_AUDIO_EXTS = {".opus", ".mp3", ".wav", ".m4a", ".aac"}
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm"}
_FILE_TYPE_MAP = {
".opus": "opus",
".mp4": "mp4",
".pdf": "pdf",
".doc": "doc",
".docx": "doc",
".xls": "xls",
".xlsx": "xls",
".ppt": "ppt",
".pptx": "ppt",
}
_MSG_TYPE_MAP = {"image": "[image]", "audio": "[audio]", "file": "[file]", "media": "[media]", "sticker": "[sticker]"}
TEMP_DIR = os.path.join(PROJECT_ROOT, "temp")
MEDIA_DIR = os.path.join(TEMP_DIR, "feishu_media")
os.makedirs(MEDIA_DIR, exist_ok=True)
def _display_text(text):
return strip_files(clean_reply(text)) or "..."
def _parse_json(raw):
try:
return json.loads(raw) if raw else {}
except Exception:
return {}
def _extract_share_card_content(content_json, msg_type):
parts = []
if msg_type == "share_chat":
parts.append(f"[shared chat: {content_json.get('chat_id', '')}]")
elif msg_type == "share_user":
parts.append(f"[shared user: {content_json.get('user_id', '')}]")
elif msg_type == "interactive":
parts.extend(_extract_interactive_content(content_json))
elif msg_type == "share_calendar_event":
parts.append(f"[shared calendar event: {content_json.get('event_key', '')}]")
elif msg_type == "system":
parts.append("[system message]")
elif msg_type == "merge_forward":
parts.append("[merged forward messages]")
return "\n".join([p for p in parts if p]).strip() or f"[{msg_type}]"
def _extract_interactive_content(content):
parts = []
if isinstance(content, str):
try:
content = json.loads(content)
except Exception:
return [content] if content.strip() else []
if not isinstance(content, dict):
return parts
title = content.get("title")
if isinstance(title, dict):
title_text = title.get("content", "") or title.get("text", "")
if title_text:
parts.append(f"title: {title_text}")
elif isinstance(title, str) and title:
parts.append(f"title: {title}")
elements = content.get("elements", [])
if isinstance(elements, list):
for row in elements:
if isinstance(row, dict):
parts.extend(_extract_element_content(row))
elif isinstance(row, list):
for el in row:
parts.extend(_extract_element_content(el))
card = content.get("card", {})
if card:
parts.extend(_extract_interactive_content(card))
header = content.get("header", {})
if isinstance(header, dict):
header_title = header.get("title", {})
if isinstance(header_title, dict):
header_text = header_title.get("content", "") or header_title.get("text", "")
if header_text:
parts.append(f"title: {header_text}")
return [p for p in parts if p]
def _extract_element_content(element):
parts = []
if not isinstance(element, dict):
return parts
tag = element.get("tag", "")
if tag in ("markdown", "lark_md"):
content = element.get("content", "")
if content:
parts.append(content)
elif tag == "div":
text = element.get("text", {})
if isinstance(text, dict):
text_content = text.get("content", "") or text.get("text", "")
if text_content:
parts.append(text_content)
elif isinstance(text, str) and text:
parts.append(text)
for field in element.get("fields", []) or []:
if isinstance(field, dict):
field_text = field.get("text", {})
if isinstance(field_text, dict):
content = field_text.get("content", "") or field_text.get("text", "")
if content:
parts.append(content)
elif tag == "a":
href = element.get("href", "")
text = element.get("text", "")
if href:
parts.append(f"link: {href}")
if text:
parts.append(text)
elif tag == "button":
text = element.get("text", {})
if isinstance(text, dict):
content = text.get("content", "") or text.get("text", "")
if content:
parts.append(content)
url = element.get("url", "") or (element.get("multi_url", {}) or {}).get("url", "")
if url:
parts.append(f"link: {url}")
elif tag == "img":
alt = element.get("alt", {})
if isinstance(alt, dict):
parts.append(alt.get("content", "[image]") or "[image]")
else:
parts.append("[image]")
for child in element.get("elements", []) or []:
parts.extend(_extract_element_content(child))
for col in element.get("columns", []) or []:
for child in (col.get("elements", []) if isinstance(col, dict) else []):
parts.extend(_extract_element_content(child))
return parts
def _extract_post_content(content_json):
def _parse_block(block):
if not isinstance(block, dict) or not isinstance(block.get("content"), list):
return None, []
texts, images = [], []
if block.get("title"):
texts.append(block.get("title"))
for row in block["content"]:
if not isinstance(row, list):
continue
for el in row:
if not isinstance(el, dict):
continue
tag = el.get("tag")
if tag in ("text", "a"):
texts.append(el.get("text", ""))
elif tag == "at":
texts.append(f"@{el.get('user_name', 'user')}")
elif tag == "img" and el.get("image_key"):
images.append(el["image_key"])
text = " ".join([t for t in texts if t]).strip()
return text or None, images
root = content_json
if isinstance(root, dict) and isinstance(root.get("post"), dict):
root = root["post"]
if not isinstance(root, dict):
return "", []
if "content" in root:
text, imgs = _parse_block(root)
if text or imgs:
return text or "", imgs
for key in ("zh_cn", "en_us", "ja_jp"):
if key in root:
text, imgs = _parse_block(root[key])
if text or imgs:
return text or "", imgs
for val in root.values():
if isinstance(val, dict):
text, imgs = _parse_block(val)
if text or imgs:
return text or "", imgs
return "", []
APP_ID = str(mykeys.get("fs_app_id", "") or "").strip()
APP_SECRET = str(mykeys.get("fs_app_secret", "") or "").strip()
ALLOWED_USERS = to_allowed_set(mykeys.get("fs_allowed_users", []))
PUBLIC_ACCESS = public_access(ALLOWED_USERS)
agent = GeneraticAgent()
threading.Thread(target=agent.run, daemon=True).start()
client, user_tasks = None, {}
def create_client(): return lark.Client.builder().app_id(APP_ID).app_secret(APP_SECRET).log_level(lark.LogLevel.INFO).build()
def _card(text): return json.dumps({"config": {"wide_screen_mode": True}, "elements": [{"tag": "markdown", "content": text}]}, ensure_ascii=False)
def send_message(open_id, content, msg_type="text", use_card=False):
if use_card:
payload, real_type = _card(content), "interactive"
elif msg_type == "text":
payload, real_type = json.dumps({"text": content}, ensure_ascii=False), "text"
else:
payload, real_type = content, msg_type
body = CreateMessageRequest.builder().receive_id_type("open_id").request_body(
CreateMessageRequestBody.builder().receive_id(open_id).msg_type(real_type).content(payload).build()
).build()
response = client.im.v1.message.create(body)
if response.success():
return response.data.message_id if response.data else None
print(f"发送失败: {response.code}, {response.msg}")
return None
def update_message(message_id, content):
body = PatchMessageRequest.builder().message_id(message_id).request_body(
PatchMessageRequestBody.builder().content(_card(content)).build()
).build()
response = client.im.v1.message.patch(body)
if not response.success():
print(f"[ERROR] update_message 失败: {response.code}, {response.msg}")
return response.success()
def _read_file_obj(response):
return response.file.read() if hasattr(response.file, "read") else response.file
def _upload_image_sync(file_path):
try:
with open(file_path, "rb") as f:
request = CreateImageRequest.builder().request_body(
CreateImageRequestBody.builder().image_type("message").image(f).build()
).build()
response = client.im.v1.image.create(request)
if response.success():
return response.data.image_key
print(f"[ERROR] upload image failed: {response.code}, {response.msg}")
except Exception as e:
print(f"[ERROR] upload image failed {file_path}: {e}")
return None
def _upload_file_sync(file_path):
ext = os.path.splitext(file_path)[1].lower()
file_type = _FILE_TYPE_MAP.get(ext, "stream")
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
request = CreateFileRequest.builder().request_body(
CreateFileRequestBody.builder().file_type(file_type).file_name(file_name).file(f).build()
).build()
response = client.im.v1.file.create(request)
if response.success():
return response.data.file_key
print(f"[ERROR] upload file failed: {response.code}, {response.msg}")
except Exception as e:
print(f"[ERROR] upload file failed {file_path}: {e}")
return None
def _download_resource(message_id, file_key, resource_type, label):
try:
request = GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(resource_type).build()
response = client.im.v1.message_resource.get(request)
if response.success():
return _read_file_obj(response), response.file_name
print(f"[ERROR] download {label} failed: {response.code}, {response.msg}")
except Exception as e:
print(f"[ERROR] download {label} failed {file_key}: {e}")
return None, None
def _download_and_save_media(msg_type, content_json, message_id):
file_key = content_json.get("image_key") if msg_type == "image" else content_json.get("file_key")
resource_type = "image" if msg_type == "image" else ("file" if msg_type == "audio" else msg_type)
default_ext = ".jpg" if msg_type == "image" else ".opus" if msg_type == "audio" else ""
if not (file_key and message_id):
return None, None
data, filename = _download_resource(message_id, file_key, resource_type, msg_type)
if data:
filename = filename or f"{file_key[:16]}{default_ext}"
if msg_type == "audio" and filename and not filename.endswith(".opus"):
filename += ".opus"
file_path = os.path.join(MEDIA_DIR, os.path.basename(filename))
with open(file_path, "wb") as f:
f.write(data)
return file_path, filename
return None, None
def _describe_media(msg_type, file_path, filename):
return f"[{msg_type}: {filename}]\n[{'Image' if msg_type == 'image' else 'File'}: source: {file_path}]"
def _send_local_file(open_id, file_path):
if not os.path.isfile(file_path):
send_message(open_id, f"⚠️ 文件不存在: {file_path}")
return False
ext = os.path.splitext(file_path)[1].lower()
is_image = ext in _IMAGE_EXTS
file_key = _upload_image_sync(file_path) if is_image else _upload_file_sync(file_path)
if file_key:
key_name, msg_type = ("image_key", "image") if is_image else ("file_key", "media" if ext in _AUDIO_EXTS or ext in _VIDEO_EXTS else "file")
send_message(open_id, json.dumps({key_name: file_key}, ensure_ascii=False), msg_type=msg_type)
return True
send_message(open_id, f"⚠️ 文件发送失败: {os.path.basename(file_path)}")
return False
def _send_generated_files(open_id, raw_text):
for file_path in extract_files(raw_text):
_send_local_file(open_id, file_path)
def _append_media(parts, image_paths, msg_type, file_path, filename):
if not (file_path and filename):
parts.append(f"[{msg_type}: download failed]")
return
parts.append(_describe_media(msg_type, file_path, filename))
if msg_type == "image":
image_paths.append(file_path)
def _build_user_message(message):
msg_type = message.message_type
message_id = message.message_id
content_json = _parse_json(message.content)
parts, image_paths = [], []
if msg_type == "text":
text = str(content_json.get("text", "") or "").strip()
if text:
parts.append(text)
elif msg_type == "post":
text, image_keys = _extract_post_content(content_json)
if text:
parts.append(text)
for image_key in image_keys:
file_path, filename = _download_and_save_media("image", {"image_key": image_key}, message_id)
_append_media(parts, image_paths, "image", file_path, filename)
elif msg_type in ("image", "audio", "file", "media"):
file_path, filename = _download_and_save_media(msg_type, content_json, message_id)
_append_media(parts, image_paths, msg_type, file_path, filename)
elif msg_type in ("share_chat", "share_user", "interactive", "share_calendar_event", "system", "merge_forward"):
parts.append(_extract_share_card_content(content_json, msg_type))
else:
parts.append(_MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]"))
return "\n".join([p for p in parts if p]).strip(), image_paths
def handle_message(data):
event, message, sender = data.event, data.event.message, data.event.sender
open_id = sender.sender_id.open_id
if not PUBLIC_ACCESS and open_id not in ALLOWED_USERS:
print(f"未授权用户: {open_id}")
return
user_input, image_paths = _build_user_message(message)
if not user_input:
send_message(open_id, f"⚠️ 暂不支持处理此类飞书消息:{message.message_type}")
return
print(f"收到消息 [{open_id}] ({message.message_type}, {len(image_paths)} images): {user_input[:200]}")
if message.message_type == "text" and user_input.startswith("/"):
return handle_command(open_id, user_input)
def run_agent():
user_tasks[open_id] = {"running": True}
try:
msg_id, dq, last_text = send_message(open_id, "思考中...", use_card=True), agent.put_task(user_input, source="feishu", images=image_paths), ""
while user_tasks.get(open_id, {}).get("running", False):
time.sleep(3)
item = None
try:
while True:
item = dq.get_nowait()
except Exception:
pass
if item is None:
continue
raw = item.get("done") or item.get("next", "")
done = "done" in item
show = _display_text(raw)
if len(show) > 3500:
cut = show[-3000:]
if cut.count("```") % 2 == 1:
cut = "```\n" + cut
msg_id, last_text, show = send_message(open_id, "(继续...)", use_card=True), "", cut
display = show if done else show + ""
if display != last_text and msg_id:
update_message(msg_id, display)
last_text = display
if done:
_send_generated_files(open_id, raw)
break
if not user_tasks.get(open_id, {}).get("running", True):
send_message(open_id, "⏹️ 已停止")
except Exception as e:
import traceback
print(f"[ERROR] run_agent 异常: {e}")
traceback.print_exc()
send_message(open_id, f"❌ 错误: {str(e)}")
finally:
user_tasks.pop(open_id, None)
threading.Thread(target=run_agent, daemon=True).start()
def handle_command(open_id, cmd):
if cmd == "/stop":
if open_id in user_tasks:
user_tasks[open_id]["running"] = False
agent.abort()
send_message(open_id, "⏹️ 正在停止...")
elif cmd == "/new":
agent.abort()
agent.history = []
send_message(open_id, "🆕 已清空当前共享上下文")
elif cmd == "/help":
send_message(open_id, "📖 命令列表:\n/stop - 停止当前任务\n/status - 查看状态\n/restore - 恢复上次对话历史\n/new - 开启新对话\n/help - 显示帮助")
elif cmd == "/status":
send_message(open_id, f"状态: {'🟢 空闲' if not agent.is_running else '🔴 运行中'}")
elif cmd == "/restore":
try:
restored_info, err = format_restore()
if err:
return send_message(open_id, err)
restored, fname, count = restored_info
agent.abort()
agent.history.extend(restored)
send_message(open_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)")
except Exception as e:
send_message(open_id, f"❌ 恢复失败: {e}")
else:
send_message(open_id, f"❓ 未知命令: {cmd}")
def main():
global client
if not APP_ID or not APP_SECRET:
print("错误: 请在 mykey.py 或 mykey.json 中配置 fs_app_id 和 fs_app_secret")
sys.exit(1)
client = create_client()
handler = lark.EventDispatcherHandler.builder("", "").register_p2_im_message_receive_v1(handle_message).build()
cli = lark.ws.Client(APP_ID, APP_SECRET, event_handler=handler, log_level=lark.LogLevel.INFO)
print("=" * 50 + "\n飞书 Agent 已启动(长连接模式)\n" + f"App ID: {APP_ID}\n等待消息...\n" + "=" * 50)
cli.start()
if __name__ == "__main__":
main()