refactor: unify OpenAI streaming into _openai_stream(), add Responses API support
- Extract shared _openai_stream() from LLMSession and NativeOAISession - Add Responses API compatibility: system->developer role, flat tools format, no temperature - Improve error handling: read body before raise_for_status in stream mode - Add retry mechanism to NativeOAISession (was missing) - Add api_mode, connect_timeout, read_timeout config to NativeOAISession
This commit is contained in:
230
llmcore.py
230
llmcore.py
@@ -199,6 +199,109 @@ def _parse_openai_sse(resp_lines, api_mode="chat_completions"):
|
|||||||
blocks.append({"type": "tool_use", "id": tc["id"], "name": tc["name"], "input": inp})
|
blocks.append({"type": "tool_use", "id": tc["id"], "name": tc["name"], "input": inp})
|
||||||
return blocks
|
return blocks
|
||||||
|
|
||||||
|
def _openai_stream(api_base, api_key, messages, model, api_mode='chat_completions', *,
|
||||||
|
temperature=0.5, max_tokens=None, tools=None, reasoning_effort=None,
|
||||||
|
max_retries=0, connect_timeout=10, read_timeout=300, proxies=None):
|
||||||
|
"""Shared OpenAI-compatible streaming request with retry. Yields text chunks, returns list[content_block]."""
|
||||||
|
if 'kimi' in model.lower() or 'moonshot' in model.lower(): temperature = 1.0
|
||||||
|
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"}
|
||||||
|
if api_mode == "responses":
|
||||||
|
url = auto_make_url(api_base, "responses")
|
||||||
|
payload = {"model": model, "input": _to_responses_input(messages), "stream": True}
|
||||||
|
if reasoning_effort: payload["reasoning"] = {"effort": reasoning_effort}
|
||||||
|
else:
|
||||||
|
url = auto_make_url(api_base, "chat/completions")
|
||||||
|
payload = {"model": model, "messages": messages, "temperature": temperature, "stream": True, "stream_options": {"include_usage": True}}
|
||||||
|
if max_tokens: payload["max_tokens"] = max_tokens
|
||||||
|
if reasoning_effort: payload["reasoning_effort"] = reasoning_effort
|
||||||
|
if tools:
|
||||||
|
if api_mode == "responses":
|
||||||
|
# Responses API: flatten {type, function: {name, ...}} -> {type, name, ...}
|
||||||
|
resp_tools = []
|
||||||
|
for t in tools:
|
||||||
|
if t.get("type") == "function" and "function" in t:
|
||||||
|
rt = {"type": "function"}
|
||||||
|
rt.update(t["function"])
|
||||||
|
resp_tools.append(rt)
|
||||||
|
else: resp_tools.append(t)
|
||||||
|
payload["tools"] = resp_tools
|
||||||
|
else: payload["tools"] = tools
|
||||||
|
RETRYABLE = {408, 409, 425, 429, 500, 502, 503, 504}
|
||||||
|
def _delay(resp, attempt):
|
||||||
|
try: ra = float((resp.headers or {}).get("retry-after"))
|
||||||
|
except: ra = None
|
||||||
|
return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt)))
|
||||||
|
for attempt in range(max_retries + 1):
|
||||||
|
streamed = False
|
||||||
|
try:
|
||||||
|
with requests.post(url, headers=headers, json=payload, stream=True,
|
||||||
|
timeout=(connect_timeout, read_timeout), proxies=proxies) as r:
|
||||||
|
if r.status_code >= 400:
|
||||||
|
if r.status_code in RETRYABLE and attempt < max_retries:
|
||||||
|
d = _delay(r, attempt)
|
||||||
|
print(f"[LLM Retry] HTTP {r.status_code}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})")
|
||||||
|
time.sleep(d); continue
|
||||||
|
# Read error body before raise (stream mode closes connection after raise)
|
||||||
|
err_body = ""
|
||||||
|
try: err_body = r.text.strip()[:1200]
|
||||||
|
except: pass
|
||||||
|
try: r.raise_for_status()
|
||||||
|
except requests.HTTPError as e:
|
||||||
|
e._err_body = err_body; raise
|
||||||
|
gen = _parse_openai_sse(r.iter_lines(), api_mode)
|
||||||
|
try:
|
||||||
|
while True: streamed = True; yield next(gen)
|
||||||
|
except StopIteration as e:
|
||||||
|
return e.value or []
|
||||||
|
except requests.HTTPError as e:
|
||||||
|
resp = getattr(e, "response", None); status = getattr(resp, "status_code", None)
|
||||||
|
if status in RETRYABLE and attempt < max_retries and not streamed:
|
||||||
|
d = _delay(resp, attempt)
|
||||||
|
print(f"[LLM Retry] HTTP {status}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})")
|
||||||
|
time.sleep(d); continue
|
||||||
|
body = ""; rid = ""; ra = ""; ct = ""
|
||||||
|
try: body = getattr(e, '_err_body', '') or (resp.text or "").strip()[:1200]
|
||||||
|
except: pass
|
||||||
|
try: h = resp.headers or {}; rid = h.get("x-request-id","") or h.get("request-id",""); ra = h.get("retry-after",""); ct = h.get("content-type","")
|
||||||
|
except: pass
|
||||||
|
err = f"Error: HTTP {status} {e}; content_type: {ct or '<empty>'}; retry_after: {ra or '<empty>'}; request_id: {rid or '<empty>'}; body: {body or '<empty>'}"
|
||||||
|
yield err; return [{"type": "text", "text": err}]
|
||||||
|
except (requests.Timeout, requests.ConnectionError) as e:
|
||||||
|
if attempt < max_retries and not streamed:
|
||||||
|
d = _delay(None, attempt)
|
||||||
|
print(f"[LLM Retry] {type(e).__name__}, retry in {d:.1f}s ({attempt+1}/{max_retries+1})")
|
||||||
|
time.sleep(d); continue
|
||||||
|
err = f"Error: {type(e).__name__}: {e}"
|
||||||
|
yield err; return [{"type": "text", "text": err}]
|
||||||
|
except Exception as e:
|
||||||
|
err = f"Error: {e}"
|
||||||
|
yield err; return [{"type": "text", "text": err}]
|
||||||
|
|
||||||
|
def _to_responses_input(messages):
|
||||||
|
result = []
|
||||||
|
for msg in messages:
|
||||||
|
role = str(msg.get("role", "user")).lower()
|
||||||
|
if role not in ["user", "assistant", "system", "developer"]: role = "user"
|
||||||
|
if role == "system": role = "developer" # Responses API uses 'developer' instead of 'system'
|
||||||
|
content = msg.get("content", "")
|
||||||
|
text_type = "output_text" if role == "assistant" else "input_text"
|
||||||
|
parts = []
|
||||||
|
if isinstance(content, str):
|
||||||
|
if content: parts.append({"type": text_type, "text": content})
|
||||||
|
elif isinstance(content, list):
|
||||||
|
for part in content:
|
||||||
|
if not isinstance(part, dict): continue
|
||||||
|
ptype = part.get("type")
|
||||||
|
if ptype == "text":
|
||||||
|
text = part.get("text", "")
|
||||||
|
if text: parts.append({"type": text_type, "text": text})
|
||||||
|
elif ptype == "image_url":
|
||||||
|
url = (part.get("image_url") or {}).get("url", "")
|
||||||
|
if url and role != "assistant": parts.append({"type": "input_image", "image_url": url})
|
||||||
|
if len(parts) == 0: parts = [{"type": text_type, "text": str(content)}]
|
||||||
|
result.append({"role": role, "content": parts})
|
||||||
|
return result
|
||||||
|
|
||||||
class ClaudeSession:
|
class ClaudeSession:
|
||||||
def __init__(self, cfg):
|
def __init__(self, cfg):
|
||||||
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
|
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
|
||||||
@@ -266,101 +369,12 @@ class LLMSession:
|
|||||||
if mode in ["responses", "response"]: self.api_mode = "responses"
|
if mode in ["responses", "response"]: self.api_mode = "responses"
|
||||||
else: self.api_mode = "chat_completions"
|
else: self.api_mode = "chat_completions"
|
||||||
|
|
||||||
def _retry_delay(self, resp, attempt):
|
|
||||||
retry_after = None
|
|
||||||
try:
|
|
||||||
if resp is not None: retry_after = (resp.headers or {}).get("retry-after")
|
|
||||||
if retry_after is not None: retry_after = float(retry_after)
|
|
||||||
except: retry_after = None
|
|
||||||
if retry_after is None: retry_after = min(30.0, 1.5 * (2 ** attempt))
|
|
||||||
return max(0.5, float(retry_after))
|
|
||||||
|
|
||||||
def _to_responses_input(self, messages):
|
|
||||||
result = []
|
|
||||||
for msg in messages:
|
|
||||||
role = str(msg.get("role", "user")).lower()
|
|
||||||
if role not in ["user", "assistant", "system", "developer"]: role = "user"
|
|
||||||
content = msg.get("content", "")
|
|
||||||
text_type = "output_text" if role == "assistant" else "input_text"
|
|
||||||
parts = []
|
|
||||||
if isinstance(content, str):
|
|
||||||
if content: parts.append({"type": text_type, "text": content})
|
|
||||||
elif isinstance(content, list):
|
|
||||||
for part in content:
|
|
||||||
if not isinstance(part, dict): continue
|
|
||||||
ptype = part.get("type")
|
|
||||||
if ptype == "text":
|
|
||||||
text = part.get("text", "")
|
|
||||||
if text: parts.append({"type": text_type, "text": text})
|
|
||||||
elif ptype == "image_url":
|
|
||||||
url = (part.get("image_url") or {}).get("url", "")
|
|
||||||
if url and role != "assistant": parts.append({"type": "input_image", "image_url": url})
|
|
||||||
if len(parts) == 0: parts = [{"type": text_type, "text": str(content)}]
|
|
||||||
result.append({"role": role, "content": parts})
|
|
||||||
return result
|
|
||||||
|
|
||||||
def raw_ask(self, messages, model=None, temperature=0.5):
|
def raw_ask(self, messages, model=None, temperature=0.5):
|
||||||
if model is None: model = self.default_model
|
if model is None: model = self.default_model
|
||||||
if 'kimi' in model.lower() or 'moonshot' in model.lower(): temperature = 1.0 # kimi/moonshot only accepts temp 1.0
|
yield from _openai_stream(self.api_base, self.api_key, messages, model, self.api_mode,
|
||||||
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"}
|
temperature=temperature, reasoning_effort=self.reasoning_effort,
|
||||||
if self.api_mode == "responses":
|
max_retries=self.max_retries, connect_timeout=self.connect_timeout,
|
||||||
url = auto_make_url(self.api_base, "responses")
|
read_timeout=self.read_timeout, proxies=self.proxies)
|
||||||
payload = {"model": model, "input": self._to_responses_input(messages), "temperature": temperature, "stream": True}
|
|
||||||
if self.reasoning_effort: payload["reasoning"] = {"effort": self.reasoning_effort}
|
|
||||||
else:
|
|
||||||
url = auto_make_url(self.api_base, "chat/completions")
|
|
||||||
payload = {"model": model, "messages": messages, "temperature": temperature, "stream": True, "stream_options": {"include_usage": True}}
|
|
||||||
if self.reasoning_effort: payload["reasoning_effort"] = self.reasoning_effort
|
|
||||||
for attempt in range(self.max_retries + 1):
|
|
||||||
streamed_any = False
|
|
||||||
try:
|
|
||||||
with requests.post(url, headers=headers, json=payload, stream=True,
|
|
||||||
timeout=(self.connect_timeout, self.read_timeout), proxies=self.proxies) as r:
|
|
||||||
if r.status_code >= 400:
|
|
||||||
retryable = r.status_code in [408, 409, 425, 429, 500, 502, 503, 504]
|
|
||||||
if retryable and attempt < self.max_retries:
|
|
||||||
delay = self._retry_delay(r, attempt)
|
|
||||||
print(f"[LLM Retry] HTTP {r.status_code}, retry in {delay:.1f}s ({attempt+1}/{self.max_retries+1})")
|
|
||||||
time.sleep(delay)
|
|
||||||
continue
|
|
||||||
r.raise_for_status()
|
|
||||||
for chunk in _parse_openai_sse(r.iter_lines(), self.api_mode):
|
|
||||||
streamed_any = True
|
|
||||||
yield chunk
|
|
||||||
return
|
|
||||||
except requests.HTTPError as e:
|
|
||||||
resp = getattr(e, "response", None)
|
|
||||||
status = getattr(resp, "status_code", "unknown")
|
|
||||||
retryable = isinstance(status, int) and status in [408, 409, 425, 429, 500, 502, 503, 504]
|
|
||||||
if retryable and attempt < self.max_retries and not streamed_any:
|
|
||||||
delay = self._retry_delay(resp, attempt)
|
|
||||||
print(f"[LLM Retry] HTTP {status}, retry in {delay:.1f}s ({attempt+1}/{self.max_retries+1})")
|
|
||||||
time.sleep(delay)
|
|
||||||
continue
|
|
||||||
body = ""
|
|
||||||
try: body = (resp.text or "").strip()
|
|
||||||
except: body = ""
|
|
||||||
body = body[:1200] if body else "<empty>"
|
|
||||||
rid = ""; retry_after = ""; ct = ""
|
|
||||||
try:
|
|
||||||
h = resp.headers or {}
|
|
||||||
rid = h.get("x-request-id") or h.get("request-id") or ""
|
|
||||||
retry_after = h.get("retry-after") or ""
|
|
||||||
ct = h.get("content-type") or ""
|
|
||||||
except: pass
|
|
||||||
yield f"Error: HTTP {status} {str(e)}; content_type: {ct or '<empty>'}; retry_after: {retry_after or '<empty>'}; request_id: {rid or '<empty>'}; body: {body}"
|
|
||||||
return
|
|
||||||
except (requests.Timeout, requests.ConnectionError) as e:
|
|
||||||
if attempt < self.max_retries and not streamed_any:
|
|
||||||
delay = self._retry_delay(None, attempt)
|
|
||||||
print(f"[LLM Retry] {type(e).__name__}, retry in {delay:.1f}s ({attempt+1}/{self.max_retries+1})")
|
|
||||||
time.sleep(delay)
|
|
||||||
continue
|
|
||||||
yield f"Error: {type(e).__name__}: {str(e)}"
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
yield f"Error: {str(e)}"
|
|
||||||
return
|
|
||||||
|
|
||||||
def make_messages(self, raw_list, omit_images=True):
|
def make_messages(self, raw_list, omit_images=True):
|
||||||
compress_history_tags(raw_list)
|
compress_history_tags(raw_list)
|
||||||
@@ -495,30 +509,28 @@ class NativeOAISession:
|
|||||||
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
|
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
|
||||||
self.default_model = cfg.get('model', 'gpt-4o')
|
self.default_model = cfg.get('model', 'gpt-4o')
|
||||||
self.context_win = cfg.get('context_win', 28000)
|
self.context_win = cfg.get('context_win', 28000)
|
||||||
self.reasoning_effort = cfg.get('reasoning_effort')
|
|
||||||
proxy = cfg.get('proxy')
|
proxy = cfg.get('proxy')
|
||||||
self.proxies = {"http": proxy, "https": proxy} if proxy else None
|
self.proxies = {"http": proxy, "https": proxy} if proxy else None
|
||||||
self.history = []; self.system = ''; self.lock = threading.Lock()
|
self.history = []; self.system = ''; self.lock = threading.Lock()
|
||||||
|
self.max_retries = max(0, int(cfg.get('max_retries', 2)))
|
||||||
|
self.connect_timeout = max(1, int(cfg.get('connect_timeout', 10)))
|
||||||
|
self.read_timeout = max(5, int(cfg.get('read_timeout', 120)))
|
||||||
|
effort = cfg.get('reasoning_effort')
|
||||||
|
effort = None if effort is None else str(effort).strip().lower()
|
||||||
|
self.reasoning_effort = effort if effort in ('low', 'medium', 'high') else None
|
||||||
|
if effort and not self.reasoning_effort: print(f"[WARN] Invalid reasoning_effort {effort!r}, ignored.")
|
||||||
|
mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_')
|
||||||
|
self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions'
|
||||||
|
|
||||||
def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144, **kw):
|
def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144, **kw):
|
||||||
"""OpenAI streaming. yields text chunks, generator return = list[content_block]"""
|
"""OpenAI streaming. yields text chunks, generator return = list[content_block]"""
|
||||||
model = model or self.default_model
|
model = model or self.default_model
|
||||||
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
|
|
||||||
msgs = ([{"role": "system", "content": system}] if system else []) + messages
|
msgs = ([{"role": "system", "content": system}] if system else []) + messages
|
||||||
payload = {"model": model, "messages": msgs, "temperature": temperature, "max_tokens": max_tokens, "stream": True, "stream_options": {"include_usage": True}}
|
return (yield from _openai_stream(self.api_base, self.api_key, msgs, model, self.api_mode,
|
||||||
if tools: payload["tools"] = tools
|
temperature=temperature, max_tokens=max_tokens, tools=tools,
|
||||||
if self.reasoning_effort: payload["reasoning_effort"] = self.reasoning_effort
|
reasoning_effort=self.reasoning_effort,
|
||||||
try:
|
max_retries=self.max_retries, connect_timeout=self.connect_timeout,
|
||||||
resp = requests.post(auto_make_url(self.api_base, "chat/completions"), headers=headers, json=payload, stream=True, timeout=120, proxies=self.proxies)
|
read_timeout=self.read_timeout, proxies=self.proxies))
|
||||||
if resp.status_code != 200:
|
|
||||||
err = f"Error: HTTP {resp.status_code} {resp.text[:500]}"; yield err; return [{"type": "text", "text": err}]
|
|
||||||
except Exception as e:
|
|
||||||
err = f"Error: {e}"; yield err; return [{"type": "text", "text": err}]
|
|
||||||
gen = _parse_openai_sse(resp.iter_lines(), "chat_completions")
|
|
||||||
try:
|
|
||||||
while True: yield next(gen)
|
|
||||||
except StopIteration as e:
|
|
||||||
return e.value or []
|
|
||||||
|
|
||||||
def ask(self, msg, tools=None, model=None, **kw):
|
def ask(self, msg, tools=None, model=None, **kw):
|
||||||
assert type(msg) is dict
|
assert type(msg) is dict
|
||||||
|
|||||||
Reference in New Issue
Block a user