import os, json, re, time, requests, sys, threading, urllib3, base64, mimetypes, uuid from datetime import datetime urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) _RESP_CACHE_KEY = str(uuid.uuid4()) def _load_mykeys(): try: import mykey; return {k: v for k, v in vars(mykey).items() if not k.startswith('_')} except ImportError: pass p = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mykey.json') if not os.path.exists(p): raise Exception('[ERROR] mykey.py or mykey.json not found, please create one from mykey_template.') with open(p, encoding='utf-8') as f: return json.load(f) def __getattr__(name): # once guard in PEP 562 if name in ('mykeys', 'proxies'): mk = _load_mykeys() proxy = mk.get("proxy", 'http://127.0.0.1:2082') px = {"http": proxy, "https": proxy} if proxy else None globals().update(mykeys=mk, proxies=px) if mk.get('langfuse_config'): try: from plugins import langfuse_tracing except Exception: pass return globals()[name] raise AttributeError(f"module 'llmcore' has no attribute {name}") def compress_history_tags(messages, keep_recent=10, max_len=800, force=False): """Compress // tags in older messages to save tokens.""" compress_history_tags._cd = getattr(compress_history_tags, '_cd', 0) + 1 if force: compress_history_tags._cd = 0 if compress_history_tags._cd % 5 != 0: return messages _before = sum(len(json.dumps(m, ensure_ascii=False)) for m in messages) _pats = {tag: re.compile(rf'(<{tag}>)([\s\S]*?)()') for tag in ('thinking', 'think', 'tool_use', 'tool_result')} _hist_pat = re.compile(r'<(history|key_info)>[\s\S]*?') def _trunc_str(s): return s[:max_len//2] + '\n...[Truncated]...\n' + s[-max_len//2:] if isinstance(s, str) and len(s) > max_len else s def _trunc(text): text = _hist_pat.sub(lambda m: f'<{m.group(1)}>[...]', text) for pat in _pats.values(): text = pat.sub(lambda m: m.group(1) + _trunc_str(m.group(2)) + m.group(3), text) return text for i, msg in enumerate(messages): if i >= len(messages) - keep_recent: break c = msg['content'] if isinstance(c, str): msg['content'] = _trunc(c) elif isinstance(c, list): for b in c: if not isinstance(b, dict): continue t = b.get('type') if t == 'text' and isinstance(b.get('text'), str): b['text'] = _trunc(b['text']) elif t == 'tool_result': tc = b.get('content') if isinstance(tc, str): b['content'] = _trunc_str(tc) elif isinstance(tc, list): for sub in tc: if isinstance(sub, dict) and sub.get('type') == 'text': sub['text'] = _trunc_str(sub.get('text')) elif t == 'tool_use' and isinstance(b.get('input'), dict): for k, v in b['input'].items(): b['input'][k] = _trunc_str(v) print(f"[Cut] {_before} -> {sum(len(json.dumps(m, ensure_ascii=False)) for m in messages)}") return messages def _sanitize_leading_user_msg(msg): """把 user 消息里的 tool_result 块改写成纯文本,避免孤立引用。 history 统一使用 Claude content-block 格式:content 是 list of blocks。""" msg = dict(msg) # 浅拷贝外层 dict content = msg.get('content') if not isinstance(content, list): return msg texts = [] for block in content: if not isinstance(block, dict): continue if block.get('type') == 'tool_result': c = block.get('content', '') if isinstance(c, list): # content 本身也可能是 list[{type:text,text:...}] texts.extend(b.get('text', '') for b in c if isinstance(b, dict)) else: texts.append(str(c)) elif block.get('type') == 'text': texts.append(block.get('text', '')) msg['content'] = [{"type": "text", "text": '\n'.join(t for t in texts if t)}] return msg def trim_messages_history(history, context_win): compress_history_tags(history) cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) print(f'[Debug] Current context: {cost} chars, {len(history)} messages.') if cost > context_win * 3: compress_history_tags(history, keep_recent=4, force=True) # trim breaks cache, so compress more btw target = context_win * 3 * 0.6 while len(history) > 5 and cost > target: history.pop(0) while history and history[0].get('role') != 'user': history.pop(0) if history and history[0].get('role') == 'user': history[0] = _sanitize_leading_user_msg(history[0]) cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) print(f'[Debug] Trimmed context, current: {cost} chars, {len(history)} messages.') def auto_make_url(base, path): b, p = base.rstrip('/'), path.strip('/') if b.endswith('$'): return b[:-1].rstrip('/') if b.endswith(p): return b return f"{b}/{p}" if re.search(r'/v\d+(/|$)', b) else f"{b}/v1/{p}" def _parse_claude_sse(resp_lines): """Parse Anthropic SSE stream. Yields text chunks, returns list[content_block].""" content_blocks = []; current_block = None; tool_json_buf = "" stop_reason = None; got_message_stop = False; warn = None for line in resp_lines: if not line: continue line = line.decode('utf-8') if isinstance(line, bytes) else line if not line.startswith("data:"): continue data_str = line[5:].lstrip() if data_str == "[DONE]": break try: evt = json.loads(data_str) except Exception as e: print(f"[SSE] JSON parse error: {e}, line: {data_str[:200]}") continue evt_type = evt.get("type", "") if evt_type == "message_start": usage = evt.get("message", {}).get("usage", {}) _record_usage(usage, "messages") elif evt_type == "content_block_start": block = evt.get("content_block", {}) if block.get("type") == "text": current_block = {"type": "text", "text": ""} elif block.get("type") == "thinking": current_block = {"type": "thinking", "thinking": "", "signature": ""} elif block.get("type") == "tool_use": current_block = {"type": "tool_use", "id": block.get("id", ""), "name": block.get("name", ""), "input": {}} tool_json_buf = "" elif evt_type == "content_block_delta": delta = evt.get("delta", {}) if delta.get("type") == "text_delta": text = delta.get("text", "") if current_block and current_block.get("type") == "text": current_block["text"] += text if text: yield text elif delta.get("type") == "thinking_delta": if current_block and current_block.get("type") == "thinking": current_block["thinking"] += delta.get("thinking", "") elif delta.get("type") == "signature_delta": if current_block and current_block.get("type") == "thinking": current_block["signature"] = current_block.get("signature", "") + delta.get("signature", "") elif delta.get("type") == "input_json_delta": tool_json_buf += delta.get("partial_json", "") elif evt_type == "content_block_stop": if current_block: if current_block["type"] == "tool_use": try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {} except: current_block["input"] = {"_raw": tool_json_buf} content_blocks.append(current_block) current_block = None elif evt_type == "message_delta": delta = evt.get("delta", {}) stop_reason = delta.get("stop_reason", stop_reason) out_usage = evt.get("usage", {}) out_tokens = out_usage.get("output_tokens", 0) if out_tokens: print(f"[Output] tokens={out_tokens} stop_reason={stop_reason}") elif evt_type == "message_stop": got_message_stop = True elif evt_type == "error": err = evt.get("error", {}) emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) warn = f"\n\n!!!Error: SSE {emsg}"; break if not warn: if not got_message_stop and not stop_reason: warn = "\n\n[!!! 流异常中断,未收到完整响应 !!!]" elif stop_reason == "max_tokens": warn = "\n\n[!!! Response truncated: max_tokens !!!]" if warn: print(f"[WARN] {warn.strip()}") content_blocks.append({"type": "text", "text": warn}); yield warn return content_blocks def _try_parse_tool_args(raw): """Parse tool args string; split concatenated JSON objects like {..}{..} if needed. Returns list of parsed dicts.""" if not raw: return [{}] try: return [json.loads(raw)] except: pass parts = re.split(r'(?<=\})(?=\{)', raw) if len(parts) > 1: parsed = [] for p in parts: try: parsed.append(json.loads(p)) except: return [{"_raw": raw}] return parsed return [{"_raw": raw}] def _parse_openai_sse(resp_lines, api_mode="chat_completions"): """Parse OpenAI SSE stream (chat_completions or responses API). Yields text chunks, returns list[content_block]. content_block: {type:'text', text:str} | {type:'tool_use', id:str, name:str, input:dict} """ content_text = "" if api_mode == "responses": seen_delta = False; fc_buf = {}; current_fc_idx = None for line in resp_lines: if not line: continue line = line.decode('utf-8', errors='replace') if isinstance(line, bytes) else line if not line.startswith("data:"): continue data_str = line[5:].lstrip() if data_str == "[DONE]": break try: evt = json.loads(data_str) except: continue etype = evt.get("type", "") if etype == "response.output_text.delta": delta = evt.get("delta", "") if delta: seen_delta = True; content_text += delta; yield delta elif etype == "response.output_text.done" and not seen_delta: text = evt.get("text", "") if text: content_text += text; yield text elif etype == "response.output_item.added": item = evt.get("item", {}) if item.get("type") == "function_call": idx = evt.get("output_index", 0) fc_buf[idx] = {"id": item.get("call_id", item.get("id", "")), "name": item.get("name", ""), "args": ""} current_fc_idx = idx elif etype == "response.function_call_arguments.delta": idx = evt.get("output_index", current_fc_idx or 0) if idx in fc_buf: fc_buf[idx]["args"] += evt.get("delta", "") elif etype == "response.function_call_arguments.done": idx = evt.get("output_index", current_fc_idx or 0) if idx in fc_buf: fc_buf[idx]["args"] = evt.get("arguments", fc_buf[idx]["args"]) elif etype == "error": err = evt.get("error", {}) emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) if emsg: content_text += f"!!!Error: {emsg}"; yield f"!!!Error: {emsg}" break elif etype == "response.completed": usage = evt.get("response", {}).get("usage", {}) _record_usage(usage, api_mode) break blocks = [] if content_text: blocks.append({"type": "text", "text": content_text}) for idx in sorted(fc_buf): fc = fc_buf[idx] inps = _try_parse_tool_args(fc["args"]) for i, inp in enumerate(inps): bid = fc["id"] or '' if len(inps) > 1: bid = f"{bid}_{i}" if bid else f"split_{i}" blocks.append({"type": "tool_use", "id": bid, "name": fc["name"], "input": inp}) return blocks else: tc_buf = {} # index -> {id, name, args} reasoning_text = "" for line in resp_lines: if not line: continue line = line.decode('utf-8', errors='replace') if isinstance(line, bytes) else line if not line.startswith("data:"): continue data_str = line[5:].lstrip() if data_str == "[DONE]": break try: evt = json.loads(data_str) except: continue ch = (evt.get("choices") or [{}])[0] delta = ch.get("delta") or {} if delta.get("reasoning_content"): reasoning_text += delta["reasoning_content"] if delta.get("content"): text = delta["content"]; content_text += text; yield text for tc in (delta.get("tool_calls") or []): idx = tc.get("index", 0) has_name = bool(tc.get("function", {}).get("name")) if idx not in tc_buf: if has_name or not tc_buf: tc_buf[idx] = {"id": tc.get("id") or '', "name": "", "args": ""} else: idx = max(tc_buf) if has_name: tc_buf[idx]["name"] = tc["function"]["name"] if tc.get("function", {}).get("arguments"): tc_buf[idx]["args"] += tc["function"]["arguments"] if tc.get("id") and not tc_buf[idx]["id"]: tc_buf[idx]["id"] = tc["id"] usage = evt.get("usage") if usage: _record_usage(usage, api_mode) blocks = [] if reasoning_text: blocks.append({"type": "thinking", "thinking": reasoning_text}) if content_text: blocks.append({"type": "text", "text": content_text}) for idx in sorted(tc_buf): tc = tc_buf[idx] inps = _try_parse_tool_args(tc["args"]) for i, inp in enumerate(inps): bid = tc["id"] or '' if len(inps) > 1: bid = f"{bid}_{i}" if bid else f"split_{i}" blocks.append({"type": "tool_use", "id": bid, "name": tc["name"], "input": inp}) return blocks def _record_usage(usage, api_mode): if not usage: return if api_mode == 'responses': cached = (usage.get("input_tokens_details") or {}).get("cached_tokens", 0) inp = usage.get("input_tokens", 0) print(f"[Cache] input={inp} cached={cached}") elif api_mode == 'chat_completions': cached = (usage.get("prompt_tokens_details") or {}).get("cached_tokens", 0) inp = usage.get("prompt_tokens", 0) print(f"[Cache] input={inp} cached={cached}") elif api_mode == 'messages': ci, cr, inp = usage.get("cache_creation_input_tokens", 0), usage.get("cache_read_input_tokens", 0), usage.get("input_tokens", 0) print(f"[Cache] input={inp} creation={ci} read={cr}") def _parse_openai_json(data, api_mode="chat_completions"): blocks = [] if api_mode == "responses": _record_usage(data.get("usage") or {}, api_mode) for item in (data.get("output") or []): if item.get("type") == "message": for p in (item.get("content") or []): if p.get("type") in ("output_text", "text") and p.get("text"): blocks.append({"type": "text", "text": p["text"]}); yield p["text"] elif item.get("type") == "function_call": try: args = json.loads(item.get("arguments", "")) if item.get("arguments") else {} except: args = {"_raw": item.get("arguments", "")} blocks.append({"type": "tool_use", "id": item.get("call_id", item.get("id", "")), "name": item.get("name", ""), "input": args}) else: _record_usage(data.get("usage") or {}, api_mode) msg = (data.get("choices") or [{}])[0].get("message", {}) reasoning = msg.get("reasoning_content", "") if reasoning: blocks.append({"type": "thinking", "thinking": reasoning}) content = msg.get("content", "") if content: blocks.append({"type": "text", "text": content}); yield content for tc in (msg.get("tool_calls") or []): fn = tc.get("function", {}) try: args = json.loads(fn.get("arguments", "")) if fn.get("arguments") else {} except: args = {"_raw": fn.get("arguments", "")} blocks.append({"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": args}) return blocks def _stamp_oai_cache_markers(messages, model): """Add cache_control to last 2 user messages for Anthropic models via OAI-compatible relay.""" ml = model.lower() if not any(k in ml for k in ('claude', 'anthropic')): return user_idxs = [i for i, m in enumerate(messages) if m.get('role') == 'user'] for idx in user_idxs[-2:]: c = messages[idx].get('content') if isinstance(c, str): messages[idx] = {**messages[idx], 'content': [{'type': 'text', 'text': c, 'cache_control': {'type': 'ephemeral'}}]} elif isinstance(c, list) and c: c = list(c); c[-1] = dict(c[-1], cache_control={'type': 'ephemeral'}) messages[idx] = {**messages[idx], 'content': c} def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completions', *, system=None, temperature=0.5, max_tokens=None, tools=None, reasoning_effort=None, max_retries=0, connect_timeout=10, read_timeout=300, proxies=None, stream=True): """Shared OpenAI-compatible streaming request with retry. Yields text chunks, returns list[content_block].""" ml = model.lower() if 'kimi' in ml or 'moonshot' in ml: temperature = 1 elif 'minimax' in ml: temperature = max(0.01, min(temperature, 1.0)) # MiniMax requires temp in (0, 1] headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"} if api_mode == "responses": url = auto_make_url(api_base, "responses") payload = {"model": model, "input": _to_responses_input(messages), "stream": stream, "prompt_cache_key": _RESP_CACHE_KEY, "instructions": system or "You are an Omnipotent Executor."} if reasoning_effort: payload["reasoning"] = {"effort": reasoning_effort} if max_tokens: payload["max_output_tokens"] = max_tokens else: url = auto_make_url(api_base, "chat/completions") if system: messages = [{"role": "system", "content": system}] + messages _stamp_oai_cache_markers(messages, model) payload = {"model": model, "messages": messages, "stream": stream} if stream: payload["stream_options"] = {"include_usage": True} if temperature != 1: payload["temperature"] = temperature if max_tokens: payload["max_completion_tokens" if ml.startswith(("gpt-5", "o1", "o2", "o3", "o4")) else "max_tokens"] = max_tokens if reasoning_effort: payload["reasoning_effort"] = reasoning_effort if tools: payload["tools"] = _prepare_oai_tools(tools, api_mode) RETRYABLE = {408, 409, 425, 429, 500, 502, 503, 504, 529} def _delay(resp, attempt): try: ra = float((resp.headers or {}).get("retry-after")) except: ra = None return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt))) for attempt in range(max_retries + 1): streamed = False try: with requests.post(url, headers=headers, json=payload, stream=stream, timeout=(connect_timeout, read_timeout), proxies=proxies) as r: if r.status_code >= 400: if r.status_code in RETRYABLE and attempt < max_retries: d = _delay(r, attempt) print(f"[LLM Retry] HTTP {r.status_code}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") time.sleep(d); continue body = "" try: body = r.text.strip()[:500] except: pass err = f"!!!Error: HTTP {r.status_code}" + (f": {body}" if body else "") yield err; return [{"type": "text", "text": err}] gen = _parse_openai_sse(r.iter_lines(), api_mode) if stream else _parse_openai_json(r.json(), api_mode) try: while True: streamed = True; yield next(gen) except StopIteration as e: return e.value or [] except (requests.Timeout, requests.ConnectionError) as e: if attempt < max_retries and not streamed: d = _delay(None, attempt) print(f"[LLM Retry] {type(e).__name__}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") time.sleep(d); continue err = f"!!!Error: {type(e).__name__}" yield err; return [{"type": "text", "text": err}] except Exception as e: err = f"!!!Error: {type(e).__name__}: {e}" yield err; return [{"type": "text", "text": err}] def _prepare_oai_tools(tools, api_mode="chat_completions"): if api_mode == "responses": resp_tools = [] for t in tools: if t.get("type") == "function" and "function" in t: rt = {"type": "function"}; rt.update(t["function"]) resp_tools.append(rt) else: resp_tools.append(t) return resp_tools return tools def _to_responses_input(messages): result, pending = [], [] for msg in messages: role = str(msg.get("role", "user")).lower() if role == "tool": cid = msg.get("tool_call_id") or (pending.pop(0) if pending else f"call_{uuid.uuid4().hex[:8]}") result.append({"type": "function_call_output", "call_id": cid, "output": msg.get("content", "")}) continue if role not in ["user", "assistant", "system", "developer"]: role = "user" if role == "system": role = "developer" # Responses API uses 'developer' instead of 'system' content = msg.get("content", "") text_type = "output_text" if role == "assistant" else "input_text" parts = [] if isinstance(content, str): if content: parts.append({"type": text_type, "text": content}) elif isinstance(content, list): for part in content: if not isinstance(part, dict): continue ptype = part.get("type") if ptype == "text": text = part.get("text", "") if text: parts.append({"type": text_type, "text": text}) elif ptype == "image_url": url = (part.get("image_url") or {}).get("url", "") if url and role != "assistant": parts.append({"type": "input_image", "image_url": url}) if len(parts) == 0 and not isinstance(content, list): parts = [{"type": text_type, "text": str(content) or '[empty]'}] if parts: result.append({"role": role, "content": parts}) pending = [] for tc in (msg.get("tool_calls") or []): f = tc.get("function", {}) cid = tc.get("id") or f"call_{uuid.uuid4().hex[:8]}" pending.append(cid) result.append({"type": "function_call", "call_id": cid, "name": f.get("name", ""), "arguments": f.get("arguments", "")}) return result def _msgs_claude2oai(messages): result = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") blocks = content if isinstance(content, list) else [{"type": "text", "text": str(content)}] if role == "assistant": text_parts, tool_calls, reasoning = [], [], "" for b in blocks: if not isinstance(b, dict): continue if b.get("type") == "thinking" and b.get("thinking"): reasoning = b["thinking"] elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) elif b.get("type") == "tool_use": tool_calls.append({ "id": b.get("id") or '', "type": "function", "function": {"name": b.get("name", ""), "arguments": json.dumps(b.get("input", {}), ensure_ascii=False)} }) m = {"role": "assistant"} if reasoning: m["reasoning_content"] = reasoning if text_parts: m["content"] = text_parts else: m["content"] = "" if tool_calls: m["tool_calls"] = tool_calls result.append(m) elif role == "user": text_parts = [] for b in blocks: if not isinstance(b, dict): continue if b.get("type") == "tool_result": if text_parts: result.append({"role": "user", "content": text_parts}) text_parts = [] tr = b.get("content", "") if isinstance(tr, list): tr = "\n".join(x.get("text", "") for x in tr if isinstance(x, dict) and x.get("type") == "text") result.append({"role": "tool", "tool_call_id": b.get("tool_use_id") or '', "content": tr if isinstance(tr, str) else str(tr)}) elif b.get("type") == "image": src = b.get("source") or {} if src.get("type") == "base64" and src.get("data"): text_parts.append({"type": "image_url", "image_url": {"url": f"data:{src.get('media_type', 'image/png')};base64,{src.get('data', '')}"}}) elif b.get("type") == "image_url": text_parts.append(b) elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) if text_parts: result.append({"role": "user", "content": text_parts}) else: result.append(msg) return result class BaseSession: def __init__(self, cfg): self.api_key = cfg['apikey'] self.api_base = cfg['apibase'].rstrip('/') self.model = cfg.get('model', '') self.context_win = cfg.get('context_win', 24000) self.history = [] self.lock = threading.Lock() self.system = "" self.name = cfg.get('name', self.model) proxy = cfg.get('proxy') self.proxies = {"http": proxy, "https": proxy} if proxy else None self.max_retries = max(0, int(cfg.get('max_retries', 1))) self.stream = cfg.get('stream', True) default_ct, default_rt = (5, 30) if self.stream else (10, 240) self.connect_timeout = max(1, int(cfg.get('timeout', default_ct))) self.read_timeout = max(5, int(cfg.get('read_timeout', default_rt))) def _enum(key, valid): v = cfg.get(key); v = None if v is None else str(v).strip().lower() return v if not v or v in valid else print(f"[WARN] Invalid {key} {v!r}, ignored.") self.reasoning_effort = _enum('reasoning_effort', {'none', 'minimal', 'low', 'medium', 'high', 'xhigh'}) self.thinking_type = _enum('thinking_type', {'adaptive', 'enabled', 'disabled'}) self.thinking_budget_tokens = cfg.get('thinking_budget_tokens') mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_') self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions' self.temperature = cfg.get('temperature', 1) self.max_tokens = cfg.get('max_tokens', 8192) def _apply_claude_thinking(self, payload): if self.thinking_type: thinking = {"type": self.thinking_type} if self.thinking_type == 'enabled': if self.thinking_budget_tokens is None: print("[WARN] thinking_type='enabled' requires thinking_budget_tokens, ignored.") else: thinking["budget_tokens"] = self.thinking_budget_tokens; payload["thinking"] = thinking else: payload["thinking"] = thinking if self.reasoning_effort: effort = {'low': 'low', 'medium': 'medium', 'high': 'high', 'xhigh': 'max'}.get(self.reasoning_effort) if effort: payload["output_config"] = {"effort": effort} else: print(f"[WARN] reasoning_effort {self.reasoning_effort!r} is unsupported for Claude output_config.effort, ignored.") def ask(self, prompt, stream=False): def _ask_gen(): with self.lock: self.history.append({"role": "user", "content": [{"type": "text", "text": prompt}]}) trim_messages_history(self.history, self.context_win) messages = self.make_messages(self.history) content_blocks = None; content = '' gen = self.raw_ask(messages) try: while True: chunk = next(gen); content += chunk; yield chunk except StopIteration as e: content_blocks = e.value or [] if len(content_blocks) > 1: print(f"[DEBUG BaseSession.ask] content_blocks: {content_blocks}") for block in (content_blocks or []): if block.get('type', '') == 'tool_use': tu = {'name': block.get('name', ''), 'arguments': block.get('input', {})} yield f'{json.dumps(tu, ensure_ascii=False)}' if not content.startswith("!!!Error:"): self.history.append({"role": "assistant", "content": [{"type": "text", "text": content}]}) return _ask_gen() if stream else ''.join(list(_ask_gen())) def _keep_claude_block(b): return not isinstance(b, dict) or b.get("type") != "thinking" or b.get("signature") def _drop_unsigned_thinking(messages): for m in messages: m["content"] = [b for b in m["content"] if _keep_claude_block(b)] return messages class ClaudeSession(BaseSession): def raw_ask(self, messages): headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} payload = {"model": self.model, "messages": messages, "max_tokens": self.max_tokens, "stream": True} if self.temperature != 1: payload["temperature"] = self.temperature self._apply_claude_thinking(payload) if self.system: payload["system"] = [{"type": "text", "text": self.system, "cache_control": {"type": "persistent"}}] try: with requests.post(auto_make_url(self.api_base, "messages"), headers=headers, json=payload, stream=True, timeout=(self.connect_timeout, self.read_timeout)) as r: if r.status_code != 200: raise Exception(f"HTTP {r.status_code} {r.content.decode('utf-8', errors='replace')[:500]}") return (yield from _parse_claude_sse(r.iter_lines())) or [] except Exception as e: yield (err := f"!!!Error: {e}") return [{"type": "text", "text": err}] def make_messages(self, raw_list): msgs = _drop_unsigned_thinking([{"role": m['role'], "content": list(m['content'])} for m in raw_list]) user_idxs = [i for i, m in enumerate(msgs) if m['role'] == 'user'] for idx in user_idxs[-2:]: msgs[idx]["content"][-1] = dict(msgs[idx]["content"][-1], cache_control={"type": "ephemeral"}) return msgs class LLMSession(BaseSession): def raw_ask(self, messages): return (yield from _openai_stream(self.api_base, self.api_key, messages, self.model, self.api_mode, temperature=self.temperature, reasoning_effort=self.reasoning_effort, max_tokens=self.max_tokens, max_retries=self.max_retries, stream=self.stream, connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, proxies=self.proxies)) def make_messages(self, raw_list): return _msgs_claude2oai(raw_list) def _fix_messages(messages): """修复 messages 符合 Claude API:交替、tool_use/tool_result 配对""" if not messages: return messages _wrap = lambda c: c if isinstance(c, list) else [{"type": "text", "text": str(c)}] fixed = [] for m in messages: if fixed and m['role'] == fixed[-1]['role']: fixed[-1] = {**fixed[-1], 'content': _wrap(fixed[-1]['content']) + [{"type": "text", "text": "\n"}] + _wrap(m['content'])}; continue if fixed and fixed[-1]['role'] == 'assistant' and m['role'] == 'user': uses = [b.get('id') for b in fixed[-1].get('content', []) if isinstance(b, dict) and b.get('type') == 'tool_use' and b.get('id')] has = {b.get('tool_use_id') for b in _wrap(m['content']) if isinstance(b, dict) and b.get('type') == 'tool_result'} miss = [uid for uid in uses if uid not in has] if miss: m = {**m, 'content': [{"type": "tool_result", "tool_use_id": uid, "content": "(error)"} for uid in miss] + _wrap(m['content'])} fixed.append(m) while fixed and fixed[0]['role'] != 'user': fixed.pop(0) return fixed class NativeClaudeSession(BaseSession): def __init__(self, cfg): super().__init__(cfg) self.context_win = cfg.get("context_win", 28000) self.fake_cc_system_prompt = cfg.get("fake_cc_system_prompt", False) self.user_agent = cfg.get("user_agent", "claude-cli/2.1.113 (external, cli)") self._session_id = str(uuid.uuid4()) self._account_uuid = str(uuid.uuid4()) self._device_id = uuid.uuid4().hex + uuid.uuid4().hex[:32] self.tools = None def raw_ask(self, messages): messages = _drop_unsigned_thinking(_fix_messages(messages)) model = self.model beta_parts = ["claude-code-20250219", "interleaved-thinking-2025-05-14", "redact-thinking-2026-02-12", "prompt-caching-scope-2026-01-05"] if "[1m]" in model.lower(): beta_parts.insert(1, "context-1m-2025-08-07"); model = model.replace("[1m]", "").replace("[1M]", "") headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": ",".join(beta_parts), "anthropic-dangerous-direct-browser-access": "true", "user-agent": self.user_agent, "x-app": "cli"} if self.api_key.startswith("sk-ant-"): headers["x-api-key"] = self.api_key else: headers["authorization"] = f"Bearer {self.api_key}" payload = {"model": model, "messages": messages, "max_tokens": self.max_tokens, "stream": self.stream} if self.temperature != 1: payload["temperature"] = self.temperature self._apply_claude_thinking(payload) payload["metadata"] = {"user_id": json.dumps({"device_id": self._device_id, "account_uuid": self._account_uuid, "session_id": self._session_id}, separators=(',', ':'))} if self.tools: claude_tools = openai_tools_to_claude(self.tools) tools = [dict(t) for t in claude_tools]; tools[-1]["cache_control"] = {"type": "ephemeral"} payload["tools"] = tools else: print("[ERROR] No tools provided for this session.") payload['system'] = [{"type": "text", "text": "You are Claude Code, Anthropic's official CLI for Claude.", "cache_control": {"type": "ephemeral"}}] if self.system: if self.fake_cc_system_prompt: messages[0]["content"].insert(0, {"type": "text", "text": self.system}) else: payload["system"] = [{"type": "text", "text": self.system}] user_idxs = [i for i, m in enumerate(messages) if m['role'] == 'user'] for idx in user_idxs[-2:]: messages[idx] = {**messages[idx], "content": list(messages[idx]["content"])} messages[idx]["content"][-1] = dict(messages[idx]["content"][-1], cache_control={"type": "ephemeral"}) try: with requests.post(auto_make_url(self.api_base, "messages")+'?beta=true', headers=headers, json=payload, stream=self.stream, timeout=(self.connect_timeout, self.read_timeout)) as resp: if resp.status_code != 200: raise Exception(f"HTTP {resp.status_code} {resp.content.decode('utf-8', errors='replace')[:500]}") if self.stream: return (yield from _parse_claude_sse(resp.iter_lines())) or [] else: data = resp.json(); content_blocks = data.get("content", []) _record_usage(data.get("usage", {}), "messages") for b in content_blocks: if b.get("type") == "text": yield b.get("text", "") elif b.get("type") == "thinking": yield "" return content_blocks except Exception as e: yield (err := f"!!!Error: {e}") return [{"type": "text", "text": err}] def ask(self, msg): assert type(msg) is dict with self.lock: self.history.append(msg) trim_messages_history(self.history, self.context_win) messages = [{"role": m["role"], "content": list(m["content"])} for m in self.history] content_blocks = None gen = self.raw_ask(messages) try: while True: yield next(gen) except StopIteration as e: content_blocks = e.value or [] if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("!!!Error:")): self.history.append({"role": "assistant", "content": content_blocks}) text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"] content = "\n".join(text_parts).strip() tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in content_blocks if b.get("type") == "tool_use"] if not tool_calls: tool_calls, content = _parse_text_tool_calls(content) thinking_parts = [b["thinking"] for b in content_blocks if b.get("type") == "thinking"] thinking = "\n".join(thinking_parts).strip() if not thinking: think_pattern = r"(.*?)" think_match = re.search(think_pattern, content, re.DOTALL) if think_match: thinking = think_match.group(1).strip() content = re.sub(think_pattern, "", content, flags=re.DOTALL) return MockResponse(thinking, content, tool_calls, str(content_blocks)) class NativeOAISession(NativeClaudeSession): def raw_ask(self, messages): messages = _fix_messages(messages) return (yield from _openai_stream(self.api_base, self.api_key, _msgs_claude2oai(messages), self.model, self.api_mode, system=self.system, temperature=self.temperature, max_tokens=self.max_tokens, tools=self.tools, reasoning_effort=self.reasoning_effort, max_retries=self.max_retries, connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, proxies=self.proxies, stream=self.stream)) def openai_tools_to_claude(tools): """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}].""" result = [] for t in tools: if 'input_schema' in t: result.append(t); continue # 已是claude格式 fn = t.get('function', t) result.append({'name': fn['name'], 'description': fn.get('description', ''), 'input_schema': fn.get('parameters', {'type': 'object', 'properties': {}})}) return result class MockFunction: def __init__(self, name, arguments): self.name, self.arguments = name, arguments class MockToolCall: def __init__(self, name, args, id=''): arg_str = json.dumps(args, ensure_ascii=False) if isinstance(args, (dict, list)) else (args or '{}') self.function = MockFunction(name, arg_str); self.id = id class MockResponse: def __init__(self, thinking, content, tool_calls, raw, stop_reason='end_turn'): self.thinking = thinking; self.content = content self.tool_calls = tool_calls; self.raw = raw self.stop_reason = 'tool_use' if tool_calls else stop_reason def __repr__(self): return f"" class ToolClient: def __init__(self, backend, auto_save_tokens=True): self.backend = backend self.auto_save_tokens = auto_save_tokens self.last_tools = '' self.name = self.backend.name self.total_cd_tokens = 0 def chat(self, messages, tools=None): full_prompt = self._build_protocol_prompt(messages, tools) print("Full prompt length:", len(full_prompt), 'chars') prompt_log = full_prompt gen = self.backend.ask(full_prompt, stream=True) _write_llm_log('Prompt', prompt_log) raw_text = ''; summarytag = '[NextWillSummary]' for chunk in gen: raw_text += chunk if chunk != summarytag: yield chunk if raw_text.endswith(summarytag): self.last_tools = ''; raw_text = raw_text[:-len(summarytag)] _write_llm_log('Response', raw_text) return self._parse_mixed_response(raw_text) def _estimate_content_len(self, content): if isinstance(content, str): return len(content) if isinstance(content, list): total = 0 for part in content: if not isinstance(part, dict): continue if part.get("type") == "text": total += len(part.get("text", "")) elif part.get("type") == "image_url": total += 1000 return total return len(str(content)) def _prepare_tool_instruction(self, tools): tool_instruction = "" if not tools: return tool_instruction tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':')) _en = os.environ.get('GA_LANG') == 'en' if _en: tool_instruction = f""" ### Interaction Protocol (must follow strictly, always in effect) Follow these steps to think and act: 1. **Think**: Analyze the current situation and strategy inside `` tags. 2. **Summarize**: Output a minimal one-line (<30 words) physical snapshot in ``: new info from last tool result + current tool call intent. This goes into long-term working memory. Must contain real information, no filler. 3. **Act**: If you need to call tools, output one or more ** blocks** after your reply, then stop. """ else: tool_instruction = f""" ### 交互协议 (必须严格遵守,持续有效) 请按照以下步骤思考并行动: 1. **思考**: 在 `` 标签中先进行思考,分析现状和策略。 2. **总结**: 在 `` 中输出*极为简短*的高度概括的单行(<30字)物理快照,包括上次工具调用结果产生的新信息+本次工具调用意图。此内容将进入长期工作记忆,记录关键信息,严禁输出无实际信息增量的描述。 3. **行动**: 如需调用工具,请在回复正文之后输出一个(或多个)**块**,然后结束。 """ tool_instruction += f'\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n\n### Tools (mounted, always in effect):\n{tools_json}\n' if self.auto_save_tokens and self.last_tools == tools_json: tool_instruction = "\n### Tools: still active, **ready to call**. Protocol unchanged.\n" if _en else "\n### 工具库状态:持续有效(code_run/file_read等),**可正常调用**。调用协议沿用。\n" else: self.total_cd_tokens = 0 self.last_tools = tools_json return tool_instruction def _build_protocol_prompt(self, messages, tools): system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "") history_msgs = [m for m in messages if m['role'].lower() != 'system'] tool_instruction = self._prepare_tool_instruction(tools) system = ""; user = "" if system_content: system += f"{system_content}\n" system += f"{tool_instruction}" for m in history_msgs: role = "USER" if m['role'] == 'user' else "ASSISTANT" user += f"=== {role} ===\n" for tr in m.get('tool_results', []): user += f'{tr["content"]}\n' user += str(m['content']) + "\n" self.total_cd_tokens += self._estimate_content_len(user) if self.total_cd_tokens > 9000: self.last_tools = '' user += "=== ASSISTANT ===\n" return system + user def _parse_mixed_response(self, text): remaining_text = text; thinking = '' think_pattern = r"(.*?)" think_match = re.search(think_pattern, text, re.DOTALL) if think_match: thinking = think_match.group(1).strip() remaining_text = re.sub(think_pattern, "", remaining_text, flags=re.DOTALL) tool_calls = []; json_strs = []; errors = [] tool_pattern = r"<(?:tool_use|tool_call)>((?:(?!<(?:tool_use|tool_call)>).){15,}?)" tool_all = re.findall(tool_pattern, remaining_text, re.DOTALL) if tool_all: tool_all = [s.strip() for s in tool_all] json_strs.extend([s for s in tool_all if s.startswith('{') and s.endswith('}')]) remaining_text = re.sub(tool_pattern, "", remaining_text, flags=re.DOTALL) elif '' in remaining_text: weaktoolstr = remaining_text.split('')[-1].strip().strip('><') json_str = weaktoolstr if weaktoolstr.endswith('}') else '' if json_str == '' and '```' in weaktoolstr and weaktoolstr.split('```')[0].strip().endswith('}'): json_str = weaktoolstr.split('```')[0].strip() if json_str: json_strs.append(json_str) remaining_text = remaining_text.replace(''+weaktoolstr, "") elif '"name":' in remaining_text and '"arguments":' in remaining_text: json_match = re.search(r'\{.*"name":.*\}', remaining_text, re.DOTALL) if json_match: json_str = json_match.group(0).strip() json_strs.append(json_str) remaining_text = remaining_text.replace(json_str, "").strip() for json_str in json_strs: try: data = tryparse(json_str) func_name = data.get('name') or data.get('function') or data.get('tool') args = data.get('arguments') or data.get('args') or data.get('params') or data.get('parameters') if args is None: args = data if func_name: tool_calls.append(MockToolCall(func_name, args)) except json.JSONDecodeError as e: errors.append({'err': f"[Warn] Failed to parse tool_use JSON: {json_str}", 'bad_json': f'Failed to parse tool_use JSON: {json_str[:200]}'}) self.last_tools = '' # llm肯定忘了tool schema了,再提供下 except Exception as e: errors.append({'err': f'[Warn] Exception during tool_use parsing: {str(e)} {str(data)}'}) if len(tool_calls) == 0: for e in errors: print(e['err']) if 'bad_json' in e: tool_calls.append(MockToolCall('bad_json', {'msg': e['bad_json']})) content = remaining_text.strip() return MockResponse(thinking, content, tool_calls, text) def _parse_text_tool_calls(content): """Fallback: extract tool calls from text when model doesn't use native tool_use blocks.""" tcs = [] # try JSON array: [{"type":"tool_use", "name":..., "input":...}] _jp = next((p for p in ['[{"type":"tool_use"', '[{"type": "tool_use"'] if p in content), None) if _jp and content.endswith('}]'): try: idx = content.index(_jp); raw = json.loads(content[idx:]) tcs = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in raw if b.get("type") == "tool_use"] return tcs, content[:idx].strip() except: pass # try XML tags: {"name":..., "arguments":...} _xp = r"<(?:tool_use|tool_call)>((?:(?!<(?:tool_use|tool_call)>).){15,}?)" for s in re.findall(_xp, content, re.DOTALL): try: d = tryparse(s.strip()); name = d.get('name') args = d.get('arguments') or d.get('args') or d.get('input') or {} if name: tcs.append(MockToolCall(name, args)) except: pass if tcs: content = re.sub(_xp, "", content, flags=re.DOTALL).strip() return tcs, content def _write_llm_log(label, content): log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp/model_responses') os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, f'model_responses_{os.getpid()}.txt') ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') with open(log_path, 'a', encoding='utf-8', errors='replace') as f: f.write(f"=== {label} === {ts}\n{content}\n\n") def tryparse(json_str): try: return json.loads(json_str) except: pass json_str = json_str.strip().strip('`').replace('json\n', '', 1).strip() try: return json.loads(json_str) except: pass try: return json.loads(json_str[:-1]) except: pass if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1] return json.loads(json_str) class MixinSession: """Multi-session fallback with spring-back to primary.""" def __init__(self, all_sessions, cfg): self._retries, self._base_delay = cfg.get('max_retries', 3), cfg.get('base_delay', 1.5) self._spring_sec = cfg.get('spring_back', 300) self._sessions = [all_sessions[i].backend if isinstance(i, int) else next(s.backend for s in all_sessions if type(s) is not dict and s.backend.name == i) for i in cfg.get('llm_nos', [])] is_native = lambda s: 'Native' in s.__class__.__name__ groups = {is_native(s) for s in self._sessions} assert len(groups) == 1, f"MixinSession: sessions must be in same group (Native or non-Native), got {[type(s).__name__ for s in self._sessions]}" self.name = '|'.join(s.name for s in self._sessions) import copy; self._sessions[0] = copy.copy(self._sessions[0]) self._orig_raw_asks = [s.raw_ask for s in self._sessions] self._sessions[0].raw_ask = self._raw_ask self.model = getattr(self._sessions[0], 'model', None) self._cur_idx, self._switched_at = 0, 0.0 def __getattr__(self, name): return getattr(self._sessions[0], name) _BROADCAST_ATTRS = frozenset({'system', 'tools', 'temperature', 'max_tokens', 'reasoning_effort', 'history'}) def __setattr__(self, name, value): if name in self._BROADCAST_ATTRS: for s in self._sessions: v = openai_tools_to_claude(value) if name == 'tools' and type(s) is NativeClaudeSession else value setattr(s, name, v) else: object.__setattr__(self, name, value) @property def primary(self): return self._sessions[0] def _pick(self): if self._cur_idx and time.time() - self._switched_at > self._spring_sec: self._cur_idx = 0 return self._cur_idx def _raw_ask(self, *args, **kwargs): base, n = self._pick(), len(self._sessions) test_error = lambda x: isinstance(x, str) and x.lstrip().startswith(('!!!Error:', '[Error:')) for attempt in range(self._retries + 1): idx = (base + attempt) % n gen = self._orig_raw_asks[idx](*args, **kwargs) print(f'[MixinSession] Using session ({self._sessions[idx].name})') last_chunk, return_val, yielded = None, [], False try: while True: chunk = next(gen); last_chunk = chunk if not yielded and test_error(chunk): continue yield chunk; yielded = True except StopIteration as e: return_val = e.value or [] is_err = test_error(last_chunk) if not is_err: if attempt > 0: self._cur_idx = idx; self._switched_at = time.time() return return_val if attempt >= self._retries: yield last_chunk; return return_val nxt = (base + attempt + 1) % n if nxt == base: # full round failed, delay before next rnd = (attempt + 1) // n delay = min(30, self._base_delay * (1.5 ** rnd)) print(f'[MixinSession] {last_chunk[:80]}, round {rnd} exhausted, retry in {delay:.1f}s') time.sleep(delay) else: print(f'[MixinSession] {last_chunk[:80]}, retry {attempt+1}/{self._retries} (s{idx}→s{nxt})') THINKING_PROMPT_ZH = """ ### 行动规范(持续有效) 每次回复请先在回复文字中包含一个 中输出极简单行(<30字)物理快照:上次结果新信息+本次意图。此内容进入长期工作记忆。 \n**若用户需求未完成,必须进行工具调用!** """.strip() THINKING_PROMPT_EN = """ ### Action Protocol (always in effect) The reply body should first include a minimal one-line (<30 words) physical snapshot in : new info from last result + current intent. This goes into long-term working memory. \n**If the user's request is not yet complete, tool calls are required!** """.strip() class NativeToolClient: @staticmethod def _thinking_prompt(): return THINKING_PROMPT_EN if os.environ.get('GA_LANG') == 'en' else THINKING_PROMPT_ZH def __init__(self, backend): self.backend = backend self.backend.system = self._thinking_prompt() self.name = self.backend.name self._pending_tool_ids = [] def set_system(self, extra_system): combined = f"{extra_system}\n\n{self._thinking_prompt()}" if extra_system else self._thinking_prompt() if combined != self.backend.system: print(f"[Debug] Updated system prompt, length {len(combined)} chars.") self.backend.system = combined def chat(self, messages, tools=None): if tools: self.backend.tools = tools combined_content = []; resp = None; tool_results = [] for msg in messages: c = msg.get('content', '') if msg['role'] == 'system': self.set_system(c); continue if isinstance(c, str): combined_content.append({"type": "text", "text": c}) elif isinstance(c, list): combined_content.extend(c) if msg['role'] == 'user' and msg.get('tool_results'): tool_results.extend(msg['tool_results']) tr_id_set = set(); tool_result_blocks = [] for tr in tool_results: tool_use_id, content = tr.get("tool_use_id", ""), tr.get("content", "") tr_id_set.add(tool_use_id) if tool_use_id: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tool_use_id, "content": tr.get("content", "")}) else: combined_content = [{"type": "text", "text": f'{content}'}] + combined_content for tid in self._pending_tool_ids: if tid not in tr_id_set: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tid, "content": ""}) self._pending_tool_ids = [] merged = {"role": "user", "content": tool_result_blocks + combined_content} _write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2)) gen = self.backend.ask(merged) try: while True: chunk = next(gen); yield chunk except StopIteration as e: resp = e.value if resp: _write_llm_log('Response', resp.raw) if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls] return resp