diff --git a/llmcore.py b/llmcore.py index d911827..7d613a1 100644 --- a/llmcore.py +++ b/llmcore.py @@ -199,6 +199,109 @@ def _parse_openai_sse(resp_lines, api_mode="chat_completions"): blocks.append({"type": "tool_use", "id": tc["id"], "name": tc["name"], "input": inp}) return blocks +def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completions', *, + temperature=0.5, max_tokens=None, tools=None, reasoning_effort=None, + max_retries=0, connect_timeout=10, read_timeout=300, proxies=None): + """Shared OpenAI-compatible streaming request with retry. Yields text chunks, returns list[content_block].""" + if 'kimi' in model.lower() or 'moonshot' in model.lower(): temperature = 1.0 + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"} + if api_mode == "responses": + url = auto_make_url(api_base, "responses") + payload = {"model": model, "input": _to_responses_input(messages), "stream": True} + if reasoning_effort: payload["reasoning"] = {"effort": reasoning_effort} + else: + url = auto_make_url(api_base, "chat/completions") + payload = {"model": model, "messages": messages, "temperature": temperature, "stream": True, "stream_options": {"include_usage": True}} + if max_tokens: payload["max_tokens"] = max_tokens + if reasoning_effort: payload["reasoning_effort"] = reasoning_effort + if tools: + if api_mode == "responses": + # Responses API: flatten {type, function: {name, ...}} -> {type, name, ...} + resp_tools = [] + for t in tools: + if t.get("type") == "function" and "function" in t: + rt = {"type": "function"} + rt.update(t["function"]) + resp_tools.append(rt) + else: resp_tools.append(t) + payload["tools"] = resp_tools + else: payload["tools"] = tools + RETRYABLE = {408, 409, 425, 429, 500, 502, 503, 504} + def _delay(resp, attempt): + try: ra = float((resp.headers or {}).get("retry-after")) + except: ra = None + return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt))) + for attempt in range(max_retries + 1): + streamed = False + try: + with requests.post(url, headers=headers, json=payload, stream=True, + timeout=(connect_timeout, read_timeout), proxies=proxies) as r: + if r.status_code >= 400: + if r.status_code in RETRYABLE and attempt < max_retries: + d = _delay(r, attempt) + print(f"[LLM Retry] HTTP {r.status_code}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") + time.sleep(d); continue + # Read error body before raise (stream mode closes connection after raise) + err_body = "" + try: err_body = r.text.strip()[:1200] + except: pass + try: r.raise_for_status() + except requests.HTTPError as e: + e._err_body = err_body; raise + gen = _parse_openai_sse(r.iter_lines(), api_mode) + try: + while True: streamed = True; yield next(gen) + except StopIteration as e: + return e.value or [] + except requests.HTTPError as e: + resp = getattr(e, "response", None); status = getattr(resp, "status_code", None) + if status in RETRYABLE and attempt < max_retries and not streamed: + d = _delay(resp, attempt) + print(f"[LLM Retry] HTTP {status}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") + time.sleep(d); continue + body = ""; rid = ""; ra = ""; ct = "" + try: body = getattr(e, '_err_body', '') or (resp.text or "").strip()[:1200] + except: pass + try: h = resp.headers or {}; rid = h.get("x-request-id","") or h.get("request-id",""); ra = h.get("retry-after",""); ct = h.get("content-type","") + except: pass + err = f"Error: HTTP {status} {e}; content_type: {ct or ''}; retry_after: {ra or ''}; request_id: {rid or ''}; body: {body or ''}" + yield err; return [{"type": "text", "text": err}] + except (requests.Timeout, requests.ConnectionError) as e: + if attempt < max_retries and not streamed: + d = _delay(None, attempt) + print(f"[LLM Retry] {type(e).__name__}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})") + time.sleep(d); continue + err = f"Error: {type(e).__name__}: {e}" + yield err; return [{"type": "text", "text": err}] + except Exception as e: + err = f"Error: {e}" + yield err; return [{"type": "text", "text": err}] + +def _to_responses_input(messages): + result = [] + for msg in messages: + role = str(msg.get("role", "user")).lower() + if role not in ["user", "assistant", "system", "developer"]: role = "user" + if role == "system": role = "developer" # Responses API uses 'developer' instead of 'system' + content = msg.get("content", "") + text_type = "output_text" if role == "assistant" else "input_text" + parts = [] + if isinstance(content, str): + if content: parts.append({"type": text_type, "text": content}) + elif isinstance(content, list): + for part in content: + if not isinstance(part, dict): continue + ptype = part.get("type") + if ptype == "text": + text = part.get("text", "") + if text: parts.append({"type": text_type, "text": text}) + elif ptype == "image_url": + url = (part.get("image_url") or {}).get("url", "") + if url and role != "assistant": parts.append({"type": "input_image", "image_url": url}) + if len(parts) == 0: parts = [{"type": text_type, "text": str(content)}] + result.append({"role": role, "content": parts}) + return result + class ClaudeSession: def __init__(self, cfg): self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') @@ -266,101 +369,12 @@ class LLMSession: if mode in ["responses", "response"]: self.api_mode = "responses" else: self.api_mode = "chat_completions" - def _retry_delay(self, resp, attempt): - retry_after = None - try: - if resp is not None: retry_after = (resp.headers or {}).get("retry-after") - if retry_after is not None: retry_after = float(retry_after) - except: retry_after = None - if retry_after is None: retry_after = min(30.0, 1.5 * (2 ** attempt)) - return max(0.5, float(retry_after)) - - def _to_responses_input(self, messages): - result = [] - for msg in messages: - role = str(msg.get("role", "user")).lower() - if role not in ["user", "assistant", "system", "developer"]: role = "user" - content = msg.get("content", "") - text_type = "output_text" if role == "assistant" else "input_text" - parts = [] - if isinstance(content, str): - if content: parts.append({"type": text_type, "text": content}) - elif isinstance(content, list): - for part in content: - if not isinstance(part, dict): continue - ptype = part.get("type") - if ptype == "text": - text = part.get("text", "") - if text: parts.append({"type": text_type, "text": text}) - elif ptype == "image_url": - url = (part.get("image_url") or {}).get("url", "") - if url and role != "assistant": parts.append({"type": "input_image", "image_url": url}) - if len(parts) == 0: parts = [{"type": text_type, "text": str(content)}] - result.append({"role": role, "content": parts}) - return result - def raw_ask(self, messages, model=None, temperature=0.5): if model is None: model = self.default_model - if 'kimi' in model.lower() or 'moonshot' in model.lower(): temperature = 1.0 # kimi/moonshot only accepts temp 1.0 - headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"} - if self.api_mode == "responses": - url = auto_make_url(self.api_base, "responses") - payload = {"model": model, "input": self._to_responses_input(messages), "temperature": temperature, "stream": True} - if self.reasoning_effort: payload["reasoning"] = {"effort": self.reasoning_effort} - else: - url = auto_make_url(self.api_base, "chat/completions") - payload = {"model": model, "messages": messages, "temperature": temperature, "stream": True, "stream_options": {"include_usage": True}} - if self.reasoning_effort: payload["reasoning_effort"] = self.reasoning_effort - for attempt in range(self.max_retries + 1): - streamed_any = False - try: - with requests.post(url, headers=headers, json=payload, stream=True, - timeout=(self.connect_timeout, self.read_timeout), proxies=self.proxies) as r: - if r.status_code >= 400: - retryable = r.status_code in [408, 409, 425, 429, 500, 502, 503, 504] - if retryable and attempt < self.max_retries: - delay = self._retry_delay(r, attempt) - print(f"[LLM Retry] HTTP {r.status_code}, retry in {delay:.1f}s ({attempt+1}/{self.max_retries+1})") - time.sleep(delay) - continue - r.raise_for_status() - for chunk in _parse_openai_sse(r.iter_lines(), self.api_mode): - streamed_any = True - yield chunk - return - except requests.HTTPError as e: - resp = getattr(e, "response", None) - status = getattr(resp, "status_code", "unknown") - retryable = isinstance(status, int) and status in [408, 409, 425, 429, 500, 502, 503, 504] - if retryable and attempt < self.max_retries and not streamed_any: - delay = self._retry_delay(resp, attempt) - print(f"[LLM Retry] HTTP {status}, retry in {delay:.1f}s ({attempt+1}/{self.max_retries+1})") - time.sleep(delay) - continue - body = "" - try: body = (resp.text or "").strip() - except: body = "" - body = body[:1200] if body else "" - rid = ""; retry_after = ""; ct = "" - try: - h = resp.headers or {} - rid = h.get("x-request-id") or h.get("request-id") or "" - retry_after = h.get("retry-after") or "" - ct = h.get("content-type") or "" - except: pass - yield f"Error: HTTP {status} {str(e)}; content_type: {ct or ''}; retry_after: {retry_after or ''}; request_id: {rid or ''}; body: {body}" - return - except (requests.Timeout, requests.ConnectionError) as e: - if attempt < self.max_retries and not streamed_any: - delay = self._retry_delay(None, attempt) - print(f"[LLM Retry] {type(e).__name__}, retry in {delay:.1f}s ({attempt+1}/{self.max_retries+1})") - time.sleep(delay) - continue - yield f"Error: {type(e).__name__}: {str(e)}" - return - except Exception as e: - yield f"Error: {str(e)}" - return + yield from _openai_stream(self.api_base, self.api_key, messages, model, self.api_mode, + temperature=temperature, reasoning_effort=self.reasoning_effort, + max_retries=self.max_retries, connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, proxies=self.proxies) def make_messages(self, raw_list, omit_images=True): compress_history_tags(raw_list) @@ -495,30 +509,28 @@ class NativeOAISession: self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') self.default_model = cfg.get('model', 'gpt-4o') self.context_win = cfg.get('context_win', 28000) - self.reasoning_effort = cfg.get('reasoning_effort') proxy = cfg.get('proxy') self.proxies = {"http": proxy, "https": proxy} if proxy else None self.history = []; self.system = ''; self.lock = threading.Lock() + self.max_retries = max(0, int(cfg.get('max_retries', 2))) + self.connect_timeout = max(1, int(cfg.get('connect_timeout', 10))) + self.read_timeout = max(5, int(cfg.get('read_timeout', 120))) + effort = cfg.get('reasoning_effort') + effort = None if effort is None else str(effort).strip().lower() + self.reasoning_effort = effort if effort in ('low', 'medium', 'high') else None + if effort and not self.reasoning_effort: print(f"[WARN] Invalid reasoning_effort {effort!r}, ignored.") + mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_') + self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions' def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144, **kw): """OpenAI streaming. yields text chunks, generator return = list[content_block]""" model = model or self.default_model - headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} msgs = ([{"role": "system", "content": system}] if system else []) + messages - payload = {"model": model, "messages": msgs, "temperature": temperature, "max_tokens": max_tokens, "stream": True, "stream_options": {"include_usage": True}} - if tools: payload["tools"] = tools - if self.reasoning_effort: payload["reasoning_effort"] = self.reasoning_effort - try: - resp = requests.post(auto_make_url(self.api_base, "chat/completions"), headers=headers, json=payload, stream=True, timeout=120, proxies=self.proxies) - if resp.status_code != 200: - err = f"Error: HTTP {resp.status_code} {resp.text[:500]}"; yield err; return [{"type": "text", "text": err}] - except Exception as e: - err = f"Error: {e}"; yield err; return [{"type": "text", "text": err}] - gen = _parse_openai_sse(resp.iter_lines(), "chat_completions") - try: - while True: yield next(gen) - except StopIteration as e: - return e.value or [] + return (yield from _openai_stream(self.api_base, self.api_key, msgs, model, self.api_mode, + temperature=temperature, max_tokens=max_tokens, tools=tools, + reasoning_effort=self.reasoning_effort, + max_retries=self.max_retries, connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, proxies=self.proxies)) def ask(self, msg, tools=None, model=None, **kw): assert type(msg) is dict