Add chat app adapters for QQ, Feishu, WeCom, and DingTalk
This commit is contained in:
317
dingtalkapp.py
Normal file
317
dingtalkapp.py
Normal file
@@ -0,0 +1,317 @@
|
||||
import os, sys, re, threading, asyncio, queue as Q, socket, time, glob, json
|
||||
import requests
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from agentmain import GeneraticAgent
|
||||
from llmcore import mykeys
|
||||
|
||||
try:
|
||||
from dingtalk_stream import AckMessage, CallbackHandler, Credential, DingTalkStreamClient
|
||||
from dingtalk_stream.chatbot import ChatbotMessage
|
||||
except Exception:
|
||||
print("Please install dingtalk-stream to use DingTalk: pip install dingtalk-stream")
|
||||
sys.exit(1)
|
||||
|
||||
agent = GeneraticAgent()
|
||||
agent.verbose = False
|
||||
|
||||
CLIENT_ID = str(mykeys.get("dingtalk_client_id", "") or "").strip()
|
||||
CLIENT_SECRET = str(mykeys.get("dingtalk_client_secret", "") or "").strip()
|
||||
ALLOWED = {str(x).strip() for x in mykeys.get("dingtalk_allowed_users", []) if str(x).strip()}
|
||||
|
||||
_TAG_PATS = [r"<" + t + r">.*?</" + t + r">" for t in ("thinking", "summary", "tool_use", "file_content")]
|
||||
_USER_TASKS = {}
|
||||
|
||||
|
||||
def _clean(text):
|
||||
for pat in _TAG_PATS:
|
||||
text = re.sub(pat, "", text, flags=re.DOTALL)
|
||||
return re.sub(r"\n{3,}", "\n\n", text).strip() or "..."
|
||||
|
||||
|
||||
def _extract_files(text):
|
||||
return re.findall(r"\[FILE:([^\]]+)\]", text or "")
|
||||
|
||||
|
||||
def _strip_files(text):
|
||||
return re.sub(r"\[FILE:[^\]]+\]", "", text or "").strip()
|
||||
|
||||
|
||||
def _split_text(text, limit=1800):
|
||||
text = (text or "").strip() or "..."
|
||||
parts = []
|
||||
while len(text) > limit:
|
||||
cut = text.rfind("\n", 0, limit)
|
||||
if cut < limit * 0.6:
|
||||
cut = limit
|
||||
parts.append(text[:cut].rstrip())
|
||||
text = text[cut:].lstrip()
|
||||
if text:
|
||||
parts.append(text)
|
||||
return parts or ["..."]
|
||||
|
||||
|
||||
def _format_restore():
|
||||
files = glob.glob("./temp/model_responses_*.txt")
|
||||
if not files:
|
||||
return None, "❌ 没有找到历史记录"
|
||||
latest = max(files, key=os.path.getmtime)
|
||||
with open(latest, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
users = re.findall(r"=== USER ===\n(.+?)(?==== |$)", content, re.DOTALL)
|
||||
resps = re.findall(r"=== Response ===.*?\n(.+?)(?==== Prompt|$)", content, re.DOTALL)
|
||||
count, restored = 0, []
|
||||
for u, r in zip(users, resps):
|
||||
u, r = u.strip(), r.strip()[:500]
|
||||
if u and r:
|
||||
restored.extend([f"[USER]: {u}", f"[Agent] {r}"])
|
||||
count += 1
|
||||
if not restored:
|
||||
return None, "❌ 历史记录里没有可恢复内容"
|
||||
return (restored, os.path.basename(latest), count), None
|
||||
|
||||
|
||||
class DingTalkApp:
|
||||
def __init__(self):
|
||||
self.client = None
|
||||
self.access_token = None
|
||||
self.token_expiry = 0
|
||||
self.background_tasks = set()
|
||||
|
||||
async def _get_access_token(self):
|
||||
if self.access_token and time.time() < self.token_expiry:
|
||||
return self.access_token
|
||||
|
||||
def _fetch():
|
||||
resp = requests.post(
|
||||
"https://api.dingtalk.com/v1.0/oauth2/accessToken",
|
||||
json={"appKey": CLIENT_ID, "appSecret": CLIENT_SECRET},
|
||||
timeout=20,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
try:
|
||||
data = await asyncio.to_thread(_fetch)
|
||||
self.access_token = data.get("accessToken")
|
||||
self.token_expiry = time.time() + int(data.get("expireIn", 7200)) - 60
|
||||
return self.access_token
|
||||
except Exception as e:
|
||||
print(f"[DingTalk] token error: {e}")
|
||||
return None
|
||||
|
||||
async def _send_batch_message(self, chat_id, msg_key, msg_param):
|
||||
token = await self._get_access_token()
|
||||
if not token:
|
||||
return False
|
||||
headers = {"x-acs-dingtalk-access-token": token}
|
||||
if chat_id.startswith("group:"):
|
||||
url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send"
|
||||
payload = {
|
||||
"robotCode": CLIENT_ID,
|
||||
"openConversationId": chat_id[6:],
|
||||
"msgKey": msg_key,
|
||||
"msgParam": json.dumps(msg_param, ensure_ascii=False),
|
||||
}
|
||||
else:
|
||||
url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
|
||||
payload = {
|
||||
"robotCode": CLIENT_ID,
|
||||
"userIds": [chat_id],
|
||||
"msgKey": msg_key,
|
||||
"msgParam": json.dumps(msg_param, ensure_ascii=False),
|
||||
}
|
||||
|
||||
def _post():
|
||||
resp = requests.post(url, json=payload, headers=headers, timeout=20)
|
||||
body = resp.text
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(f"HTTP {resp.status_code}: {body[:300]}")
|
||||
try:
|
||||
result = resp.json()
|
||||
except Exception:
|
||||
result = {}
|
||||
errcode = result.get("errcode")
|
||||
if errcode not in (None, 0):
|
||||
raise RuntimeError(f"API errcode={errcode}: {body[:300]}")
|
||||
return True
|
||||
|
||||
try:
|
||||
return await asyncio.to_thread(_post)
|
||||
except Exception as e:
|
||||
print(f"[DingTalk] send error: {e}")
|
||||
return False
|
||||
|
||||
async def send_text(self, chat_id, content):
|
||||
for part in _split_text(content):
|
||||
await self._send_batch_message(chat_id, "sampleMarkdown", {"text": part, "title": "Agent Reply"})
|
||||
|
||||
async def send_done(self, chat_id, raw_text):
|
||||
files = [p for p in _extract_files(raw_text) if os.path.exists(p)]
|
||||
body = _strip_files(_clean(raw_text))
|
||||
if files:
|
||||
body = (body + "\n\n" if body else "") + "\n".join([f"生成文件: {p}" for p in files])
|
||||
await self.send_text(chat_id, body or "...")
|
||||
|
||||
async def handle_command(self, chat_id, cmd):
|
||||
parts = (cmd or "").split()
|
||||
op = (parts[0] if parts else "").lower()
|
||||
if op == "/stop":
|
||||
state = _USER_TASKS.get(chat_id)
|
||||
if state:
|
||||
state["running"] = False
|
||||
agent.abort()
|
||||
await self.send_text(chat_id, "⏹️ 正在停止...")
|
||||
elif op == "/status":
|
||||
llm = agent.get_llm_name() if agent.llmclient else "未配置"
|
||||
await self.send_text(chat_id, f"状态: {'🔴 运行中' if agent.is_running else '🟢 空闲'}\nLLM: [{agent.llm_no}] {llm}")
|
||||
elif op == "/llm":
|
||||
if not agent.llmclient:
|
||||
return await self.send_text(chat_id, "❌ 当前没有可用的 LLM 配置")
|
||||
if len(parts) > 1:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
agent.next_llm(n)
|
||||
await self.send_text(chat_id, f"✅ 已切换到 [{agent.llm_no}] {agent.get_llm_name()}")
|
||||
except Exception:
|
||||
await self.send_text(chat_id, f"用法: /llm <0-{len(agent.list_llms()) - 1}>")
|
||||
else:
|
||||
lines = [f"{'→' if cur else ' '} [{i}] {name}" for i, name, cur in agent.list_llms()]
|
||||
await self.send_text(chat_id, "LLMs:\n" + "\n".join(lines))
|
||||
elif op == "/restore":
|
||||
try:
|
||||
restored_info, err = _format_restore()
|
||||
if err:
|
||||
return await self.send_text(chat_id, err)
|
||||
restored, fname, count = restored_info
|
||||
agent.abort()
|
||||
agent.history.extend(restored)
|
||||
await self.send_text(chat_id, f"✅ 已恢复 {count} 轮对话\n来源: {fname}\n(仅恢复上下文,请输入新问题继续)")
|
||||
except Exception as e:
|
||||
await self.send_text(chat_id, f"❌ 恢复失败: {e}")
|
||||
elif op == "/new":
|
||||
agent.abort()
|
||||
agent.history = []
|
||||
await self.send_text(chat_id, "🆕 已清空当前共享上下文")
|
||||
else:
|
||||
await self.send_text(
|
||||
chat_id,
|
||||
"📖 命令列表:\n/help - 显示帮助\n/status - 查看状态\n/stop - 停止当前任务\n/new - 清空当前上下文\n/restore - 恢复上次对话历史\n/llm [n] - 查看或切换模型",
|
||||
)
|
||||
|
||||
async def run_agent(self, chat_id, text):
|
||||
state = {"running": True}
|
||||
_USER_TASKS[chat_id] = state
|
||||
try:
|
||||
await self.send_text(chat_id, "思考中...")
|
||||
prompt = f"If you need to show files to user, use [FILE:filepath] in your response.\n\n{text}"
|
||||
dq = agent.put_task(prompt, source="dingtalk")
|
||||
last_ping = time.time()
|
||||
while state["running"]:
|
||||
try:
|
||||
item = await asyncio.to_thread(dq.get, True, 3)
|
||||
except Q.Empty:
|
||||
if agent.is_running and time.time() - last_ping > 20:
|
||||
await self.send_text(chat_id, "⏳ 还在处理中,请稍等...")
|
||||
last_ping = time.time()
|
||||
continue
|
||||
if "done" in item:
|
||||
await self.send_done(chat_id, item.get("done", ""))
|
||||
break
|
||||
if not state["running"]:
|
||||
await self.send_text(chat_id, "⏹️ 已停止")
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
print(f"[DingTalk] run_agent error: {e}")
|
||||
traceback.print_exc()
|
||||
await self.send_text(chat_id, f"❌ 错误: {e}")
|
||||
finally:
|
||||
_USER_TASKS.pop(chat_id, None)
|
||||
|
||||
async def on_message(self, content, sender_id, sender_name, conversation_type=None, conversation_id=None):
|
||||
try:
|
||||
if not content:
|
||||
return
|
||||
public_access = not ALLOWED or "*" in ALLOWED
|
||||
if not public_access and sender_id not in ALLOWED:
|
||||
print(f"[DingTalk] unauthorized user: {sender_id}")
|
||||
return
|
||||
is_group = conversation_type == "2" and conversation_id
|
||||
chat_id = f"group:{conversation_id}" if is_group else sender_id
|
||||
print(f"[DingTalk] message from {sender_name} ({sender_id}): {content}")
|
||||
if content.startswith("/"):
|
||||
await self.handle_command(chat_id, content)
|
||||
return
|
||||
task = asyncio.create_task(self.run_agent(chat_id, content))
|
||||
self.background_tasks.add(task)
|
||||
task.add_done_callback(self.background_tasks.discard)
|
||||
except Exception:
|
||||
import traceback
|
||||
|
||||
print("[DingTalk] handle_message error")
|
||||
traceback.print_exc()
|
||||
|
||||
async def start(self):
|
||||
handler = _DingTalkHandler(self)
|
||||
self.client = DingTalkStreamClient(Credential(CLIENT_ID, CLIENT_SECRET))
|
||||
self.client.register_callback_handler(ChatbotMessage.TOPIC, handler)
|
||||
print("[DingTalk] bot starting...")
|
||||
while True:
|
||||
try:
|
||||
await self.client.start()
|
||||
except Exception as e:
|
||||
print(f"[DingTalk] stream error: {e}")
|
||||
print("[DingTalk] reconnect in 5s...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
class _DingTalkHandler(CallbackHandler):
|
||||
def __init__(self, app):
|
||||
super().__init__()
|
||||
self.app = app
|
||||
|
||||
async def process(self, message):
|
||||
try:
|
||||
chatbot_msg = ChatbotMessage.from_dict(message.data)
|
||||
text = ""
|
||||
if getattr(getattr(chatbot_msg, "text", None), "content", None):
|
||||
text = chatbot_msg.text.content.strip()
|
||||
extensions = getattr(chatbot_msg, "extensions", None) or {}
|
||||
recognition = ((extensions.get("content") or {}).get("recognition") or "").strip() if isinstance(extensions, dict) else ""
|
||||
if not text:
|
||||
text = recognition or str((message.data.get("text", {}) or {}).get("content", "") or "").strip()
|
||||
sender_id = getattr(chatbot_msg, "sender_staff_id", None) or getattr(chatbot_msg, "sender_id", None) or "unknown"
|
||||
sender_name = getattr(chatbot_msg, "sender_nick", None) or "Unknown"
|
||||
conversation_type = message.data.get("conversationType")
|
||||
conversation_id = message.data.get("conversationId") or message.data.get("openConversationId")
|
||||
await self.app.on_message(text, str(sender_id), sender_name, conversation_type, conversation_id)
|
||||
return AckMessage.STATUS_OK, "OK"
|
||||
except Exception as e:
|
||||
print(f"[DingTalk] callback error: {e}")
|
||||
return AckMessage.STATUS_OK, "Error"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
_lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
_lock_sock.bind(("127.0.0.1", 19530))
|
||||
except OSError:
|
||||
print("[DingTalk] Another instance is already running, skipping...")
|
||||
sys.exit(1)
|
||||
|
||||
if not CLIENT_ID or not CLIENT_SECRET:
|
||||
print("[DingTalk] ERROR: please set dingtalk_client_id and dingtalk_client_secret in mykey.py or mykey.json")
|
||||
sys.exit(1)
|
||||
if agent.llmclient is None:
|
||||
print("[DingTalk] ERROR: no usable LLM backend found in mykey.py or mykey.json")
|
||||
sys.exit(1)
|
||||
|
||||
log_dir = os.path.join(os.path.dirname(__file__), "temp")
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
_logf = open(os.path.join(log_dir, "dingtalkapp.log"), "a", encoding="utf-8", buffering=1)
|
||||
sys.stdout = sys.stderr = _logf
|
||||
print("[NEW] DingTalk process starting, the above are history infos ...")
|
||||
print(f"[DingTalk] allow list: {'public' if not ALLOWED or '*' in ALLOWED else sorted(ALLOWED)}")
|
||||
threading.Thread(target=agent.run, daemon=True).start()
|
||||
asyncio.run(DingTalkApp().start())
|
||||
Reference in New Issue
Block a user