Refactor: 统一消息格式和Session架构重构

核心改动:
- 统一所有Session内部使用Claude content-block格式
- 引入BaseSession基类,简化代码结构
- tool_results从字符串改为结构化字典列表
- NativeClaudeSession增强:支持cr_token、metadata、thinking提取
- ToolClient简化:删除structured分支,统一使用protocol prompt
- MixinSession支持按名称选择session
- ljqCtrl_sop增加DPI坐标陷阱警告
This commit is contained in:
Liang Jiaqing
2026-04-01 22:20:18 +08:00
parent 368d68baa5
commit 629e57ad83
6 changed files with 211 additions and 287 deletions

View File

@@ -64,12 +64,12 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema,
yield response.content yield response.content
if not response.tool_calls: tool_calls = [{'tool_name': 'no_tool', 'args': {}}] if not response.tool_calls: tool_calls = [{'tool_name': 'no_tool', 'args': {}}]
else: tool_calls = [{'tool_name': tc.function.name, 'args': json.loads(tc.function.arguments)} else: tool_calls = [{'tool_name': tc.function.name, 'args': json.loads(tc.function.arguments), 'id': tc.id}
for tc in response.tool_calls] for tc in response.tool_calls]
next_prompt = ""; should_exit = None tool_results = []; next_prompts = set(); should_exit = None
for ii, tc in enumerate(tool_calls): for ii, tc in enumerate(tool_calls):
tool_name, args = tc['tool_name'], tc['args'] tool_name, args, tid = tc['tool_name'], tc['args'], tc.get('id', '')
if tool_name == 'no_tool': pass if tool_name == 'no_tool': pass
else: else:
showarg = get_pretty_json(args) showarg = get_pretty_json(args)
@@ -82,18 +82,18 @@ def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema,
outcome = yield from gen outcome = yield from gen
yield '`````\n' yield '`````\n'
else: outcome = exhaust(gen) else: outcome = exhaust(gen)
if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} # should_exit is only used for immediate exit if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data} # should_exit is only used for immediate exit
if not outcome.next_prompt: if not outcome.next_prompt:
should_exit = {'result': 'CURRENT_TASK_DONE', 'data': outcome.data}; break should_exit = {'result': 'CURRENT_TASK_DONE', 'data': outcome.data}; break
if outcome.next_prompt.startswith('未知工具'): client.last_tools = '' if outcome.next_prompt.startswith('未知工具'): client.last_tools = ''
if outcome.data is not None: if outcome.data is not None:
datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data) datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data)
next_prompt += f"<tool_result>\n{datastr}\n</tool_result>\n\n" tool_results.append({'tool_use_id': tid, 'content': datastr})
next_prompt += outcome.next_prompt; next_prompts.add(outcome.next_prompt)
if not next_prompt: if len(next_prompts) == 0:
if len(handler._done_hooks) == 0: return should_exit if len(handler._done_hooks) == 0: return should_exit
next_prompt += handler._done_hooks.pop(0) next_prompts.add(handler._done_hooks.pop(0))
next_prompt = handler.next_prompt_patcher(next_prompt, None, turn) next_prompt = handler.next_prompt_patcher("\n".join(next_prompts), None, turn)
messages = [{"role": "user", "content": next_prompt}] # just new message, history is kept in *Session messages = [{"role": "user", "content": next_prompt, "tool_results": tool_results}] # just new message, history is kept in *Session
return {'result': 'MAX_TURNS_EXCEEDED'} return {'result': 'MAX_TURNS_EXCEEDED'}

View File

@@ -74,10 +74,10 @@ class GeneraticAgent:
name = self.get_llm_name() name = self.get_llm_name()
if 'glm' in name or 'minimax' in name or 'kimi' in name: load_tool_schema('_cn') if 'glm' in name or 'minimax' in name or 'kimi' in name: load_tool_schema('_cn')
else: load_tool_schema() else: load_tool_schema()
def list_llms(self): return [(i, f"{type(b.backend).__name__}/{b.backend.default_model}", i == self.llm_no) for i, b in enumerate(self.llmclients)] def list_llms(self): return [(i, f"{type(b.backend).__name__}/{b.backend.name}", i == self.llm_no) for i, b in enumerate(self.llmclients)]
def get_llm_name(self): def get_llm_name(self):
b = self.llmclient b = self.llmclient
return f"{type(b.backend).__name__}/{b.backend.default_model}" return f"{type(b.backend).__name__}/{b.backend.name}"
def abort(self): def abort(self):
if not self.is_running: return if not self.is_running: return

View File

@@ -46,7 +46,7 @@
"description": "Execute JS to control browser. Use precisely to reduce web_scan calls. Put code in ```javascript blocks in reply body to avoid escaping", "description": "Execute JS to control browser. Use precisely to reduce web_scan calls. Put code in ```javascript blocks in reply body to avoid escaping",
"parameters": {"type": "object", "properties": { "parameters": {"type": "object", "properties": {
"script": {"type": "string", "description": "[Mutually exclusive] JS code or script path. NEVER use this param when use reply code block"}, "script": {"type": "string", "description": "[Mutually exclusive] JS code or script path. NEVER use this param when use reply code block"},
"save_to_file": {"type": "string", "description": "file path; only for long result", "default": ""}, "save_to_file": {"type": "string", "description": "file path; **only** for long result", "default": ""},
"no_monitor": {"type": "boolean", "description": "Skip page change monitoring, saves 2-3s. Only for reads, not for page actions", "default": false}}} "no_monitor": {"type": "boolean", "description": "Skip page change monitoring, saves 2-3s. Only for reads, not for page actions", "default": false}}}
}}, }},
{"type": "function", "function": { {"type": "function", "function": {

19
ga.py
View File

@@ -6,7 +6,7 @@ if sys.stdout is None: sys.stdout = open(os.devnull, "w")
if sys.stderr is None: sys.stderr = open(os.devnull, "w") if sys.stderr is None: sys.stderr = open(os.devnull, "w")
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from agent_loop import BaseHandler, StepOutcome, try_call_generator from agent_loop import BaseHandler, StepOutcome, json_default
def code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop_signal=[]): def code_run(code, code_type="python", timeout=60, cwd=None, code_cwd=None, stop_signal=[]):
"""代码执行器 """代码执行器
@@ -305,7 +305,7 @@ class GenericAgentHandler(BaseHandler):
except SyntaxError: exec(code, ns); result = ns.get('_r', 'OK') except SyntaxError: exec(code, ns); result = ns.get('_r', 'OK')
except Exception as e: result = f'Error: {e}' except Exception as e: result = f'Error: {e}'
else: result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal) else: result = yield from code_run(code, code_type, timeout, cwd, code_cwd=code_cwd, stop_signal=self.code_stop_signal)
next_prompt = self._get_anchor_prompt() next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome(result, next_prompt=next_prompt) return StepOutcome(result, next_prompt=next_prompt)
def do_ask_user(self, args, response): def do_ask_user(self, args, response):
@@ -326,8 +326,8 @@ class GenericAgentHandler(BaseHandler):
result = web_scan(tabs_only=tabs_only, switch_tab_id=switch_tab_id, text_only=text_only) result = web_scan(tabs_only=tabs_only, switch_tab_id=switch_tab_id, text_only=text_only)
content = result.pop("content", None) content = result.pop("content", None)
yield f'[Info] {str(result)}\n' yield f'[Info] {str(result)}\n'
if content: next_prompt = f"<tool_result>\n```html\n{content}\n```\n</tool_result>" if content: result = json.dumps(result, ensure_ascii=False, default=json_default) + f"\n```html\n{content}\n```"
else: next_prompt = "标签页列表如上\n" # 手动tool_result为了触发历史上下文自动压缩 next_prompt = "\n"
return StepOutcome(result, next_prompt=next_prompt) return StepOutcome(result, next_prompt=next_prompt)
def do_web_execute_js(self, args, response): def do_web_execute_js(self, args, response):
@@ -353,7 +353,7 @@ class GenericAgentHandler(BaseHandler):
try: print("Web Execute JS Result:", smart_format(result)) try: print("Web Execute JS Result:", smart_format(result))
except: pass except: pass
yield f"JS 执行结果:\n{smart_format(result)}\n" yield f"JS 执行结果:\n{smart_format(result)}\n"
next_prompt = self._get_anchor_prompt() next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome(smart_format(result, max_str_len=5000), next_prompt=next_prompt) return StepOutcome(smart_format(result, max_str_len=5000), next_prompt=next_prompt)
def do_file_patch(self, args, response): def do_file_patch(self, args, response):
@@ -367,7 +367,7 @@ class GenericAgentHandler(BaseHandler):
return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n") return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n")
result = file_patch(path, old_content, new_content) result = file_patch(path, old_content, new_content)
yield f"\n{smart_format(result)}\n" yield f"\n{smart_format(result)}\n"
next_prompt = self._get_anchor_prompt() next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome(result, next_prompt=next_prompt) return StepOutcome(result, next_prompt=next_prompt)
def do_file_write(self, args, response): def do_file_write(self, args, response):
@@ -398,7 +398,7 @@ class GenericAgentHandler(BaseHandler):
else: else:
with open(path, 'a' if mode == "append" else 'w', encoding="utf-8") as f: f.write(new_content) with open(path, 'a' if mode == "append" else 'w', encoding="utf-8") as f: f.write(new_content)
yield f"[Status] ✅ {mode.capitalize()} 成功 ({len(new_content)} bytes)\n" yield f"[Status] ✅ {mode.capitalize()} 成功 ({len(new_content)} bytes)\n"
next_prompt = self._get_anchor_prompt() next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
return StepOutcome({"status": "success", 'writed_bytes': len(new_content)}, next_prompt=next_prompt) return StepOutcome({"status": "success", 'writed_bytes': len(new_content)}, next_prompt=next_prompt)
except Exception as e: except Exception as e:
yield f"[Status] ❌ 写入异常: {str(e)}\n" yield f"[Status] ❌ 写入异常: {str(e)}\n"
@@ -434,7 +434,7 @@ class GenericAgentHandler(BaseHandler):
if "related_sop" in args: self.working['related_sop'] = related_sop if "related_sop" in args: self.working['related_sop'] = related_sop
self.working['passed_sessions'] = 0 self.working['passed_sessions'] = 0
yield f"[Info] Updated key_info and related_sop.\n" yield f"[Info] Updated key_info and related_sop.\n"
next_prompt = self._get_anchor_prompt() next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
#next_prompt += '\n[SYSTEM TIPS] 此函数一般在任务开始或中间时调用如果任务已成功完成应该是start_long_term_update用于结算长期记忆。\n' #next_prompt += '\n[SYSTEM TIPS] 此函数一般在任务开始或中间时调用如果任务已成功完成应该是start_long_term_update用于结算长期记忆。\n'
return StepOutcome({"status": "success"}, next_prompt=next_prompt) return StepOutcome({"status": "success"}, next_prompt=next_prompt)
@@ -493,7 +493,8 @@ class GenericAgentHandler(BaseHandler):
else: result = "Memory Management SOP not found. Do not update memory." else: result = "Memory Management SOP not found. Do not update memory."
return StepOutcome(result, next_prompt=prompt) return StepOutcome(result, next_prompt=prompt)
def _get_anchor_prompt(self): def _get_anchor_prompt(self, skip=False):
if skip: return "\n"
h_str = "\n".join(self.history_info[-20:]) h_str = "\n".join(self.history_info[-20:])
prompt = f"\n### [WORKING MEMORY]\n<history>\n{h_str}\n</history>" prompt = f"\n### [WORKING MEMORY]\n<history>\n{h_str}\n</history>"
prompt += f"\nCurrent turn: {self.current_turn}\n" prompt += f"\nCurrent turn: {self.current_turn}\n"

View File

@@ -1,4 +1,4 @@
import os, json, re, time, requests, sys, threading, urllib3, base64, mimetypes import os, json, re, time, requests, sys, threading, urllib3, base64, mimetypes, uuid
from datetime import datetime from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -37,6 +37,38 @@ def compress_history_tags(messages, keep_recent=10, max_len=800):
print(f"[Cut] {_before} -> {sum(len(json.dumps(m, ensure_ascii=False)) for m in messages)}") print(f"[Cut] {_before} -> {sum(len(json.dumps(m, ensure_ascii=False)) for m in messages)}")
return messages return messages
def _sanitize_leading_user_msg(msg):
"""把 user 消息里的 tool_result 块改写成纯文本,避免孤立引用。
history 统一使用 Claude content-block 格式content 是 list of blocks。"""
msg = dict(msg) # 浅拷贝外层 dictcontent 在 L56 整体替换而非原地修改,故原对象的 content 不受影响
content = msg.get('content')
if not isinstance(content, list): return msg
texts = []
for block in content:
if not isinstance(block, dict): continue
if block.get('type') == 'tool_result':
c = block.get('content', '')
if isinstance(c, list): # content 本身也可能是 list[{type:text,text:...}]
texts.extend(b.get('text', '') for b in c if isinstance(b, dict))
else: texts.append(str(c))
elif block.get('type') == 'text':
texts.append(block.get('text', ''))
msg['content'] = [{"type": "text", "text": '\n'.join(t for t in texts if t)}]
return msg
def trim_messages_history(history, context_win):
compress_history_tags(history)
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history)
print(f'[Debug] Current context: {cost} chars, {len(history)} messages.')
if cost > context_win * 3:
target = context_win * 3 * 0.6
while len(history) > 4 and cost > target:
history.pop(0)
while history and history[0].get('role') != 'user': history.pop(0)
if history and history[0].get('role') == 'user': history[0] = _sanitize_leading_user_msg(history[0])
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history)
print(f'[Debug] Trimmed context, current: {cost} chars, {len(history)} messages.')
def auto_make_url(base, path): def auto_make_url(base, path):
b, p = base.rstrip('/'), path.strip('/') b, p = base.rstrip('/'), path.strip('/')
if b.endswith('$'): return b[:-1].rstrip('/') if b.endswith('$'): return b[:-1].rstrip('/')
@@ -326,25 +358,88 @@ def _to_responses_input(messages):
result.append({"role": role, "content": parts}) result.append({"role": role, "content": parts})
return result return result
class ClaudeSession:
def _msgs_claude2oai(messages):
result = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
blocks = content if isinstance(content, list) else [{"type": "text", "text": str(content)}]
if role == "assistant":
text_parts, tool_calls = [], []
for b in blocks:
if not isinstance(b, dict): continue
if b.get("type") == "text": text_parts.append({"type": "text", "text": b.get("text", "")})
elif b.get("type") == "tool_use":
tool_calls.append({
"id": b.get("id", ""), "type": "function",
"function": {"name": b.get("name", ""), "arguments": json.dumps(b.get("input", {}), ensure_ascii=False)}
})
m = {"role": "assistant"}
if text_parts: m["content"] = text_parts
else: m["content"] = ""
if tool_calls: m["tool_calls"] = tool_calls
result.append(m)
elif role == "user":
text_parts = []
for b in blocks:
if not isinstance(b, dict): continue
if b.get("type") == "tool_result":
if text_parts:
result.append({"role": "user", "content": text_parts})
text_parts = []
tr = b.get("content", "")
if isinstance(tr, list):
tr = "\n".join(x.get("text", "") for x in tr if isinstance(x, dict) and x.get("type") == "text")
result.append({"role": "tool", "tool_call_id": b.get("tool_use_id", ""), "content": tr if isinstance(tr, str) else str(tr)})
elif b.get("type") == "image":
src = b.get("source") or {}
if src.get("type") == "base64" and src.get("data"):
text_parts.append({"type": "image_url", "image_url": {"url": f"data:{src.get('media_type', 'image/png')};base64,{src.get('data', '')}"}})
elif b.get("type") == "image_url":
text_parts.append(b)
elif b.get("type") == "text":
text_parts.append({"type": "text", "text": b.get("text", "")})
if text_parts: result.append({"role": "user", "content": text_parts})
else:
result.append(msg)
return result
class BaseSession:
def __init__(self, cfg): def __init__(self, cfg):
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') self.api_key = cfg['apikey']
self.default_model = cfg.get('model', 'claude-opus') self.api_base = cfg['apibase'].rstrip('/')
self.default_model = cfg.get('model', '')
self.context_win = cfg.get('context_win', 20000) self.context_win = cfg.get('context_win', 20000)
self.raw_msgs, self.lock = [], threading.Lock() self.history = []
self.lock = threading.Lock()
self.system = "" self.system = ""
def _trim_messages(self, raw_msgs): self.name = cfg.get('name', self.default_model)
compress_history_tags(raw_msgs) proxy = cfg.get('proxy')
total = sum(len(m['prompt']) for m in raw_msgs) self.proxies = {"http": proxy, "https": proxy} if proxy else None
print(f'[Debug] Current context: {total} chars, {len(raw_msgs)} messages.') self.max_retries = max(0, int(cfg.get('max_retries', 2)))
if total <= self.context_win * 3: return raw_msgs self.connect_timeout = max(1, int(cfg.get('connect_timeout', 10)))
target, current, result = self.context_win * 3 * 0.6, 0, [] self.read_timeout = max(5, int(cfg.get('read_timeout', 120)))
for msg in reversed(raw_msgs): effort = cfg.get('reasoning_effort')
if (msg_len := len(msg['prompt'])) + current <= target: effort = None if effort is None else str(effort).strip().lower()
result.append(msg); current += msg_len self.reasoning_effort = effort if effort in ('none', 'minimal', 'low', 'medium', 'high', 'xhigh') else None
else: break if effort and not self.reasoning_effort: print(f"[WARN] Invalid reasoning_effort {effort!r}, ignored.")
print(f'[Debug] Trimmed context, current: {current} chars, {len(result)} messages.') mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_')
return result[::-1] or raw_msgs[-2:] self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions'
def ask(self, prompt, model=None, stream=False):
def _ask_gen():
content = ''
with self.lock:
self.history.append({"role": "user", "content": [{"type": "text", "text": prompt}]})
trim_messages_history(self.history, self.context_win)
messages = self.make_messages(self.history)
for chunk in self.raw_ask(messages, model):
content += chunk; yield chunk
if not content.startswith("Error:"): self.history.append({"role": "assistant", "content": [{"type": "text", "text": content}]})
return _ask_gen() if stream else ''.join(list(_ask_gen()))
class ClaudeSession(BaseSession):
def raw_ask(self, messages, model=None, temperature=0.5, max_tokens=6144): def raw_ask(self, messages, model=None, temperature=0.5, max_tokens=6144):
model = model or self.default_model model = model or self.default_model
ml = model.lower() ml = model.lower()
@@ -359,195 +454,41 @@ class ClaudeSession:
yield from _parse_claude_sse(r.iter_lines()) yield from _parse_claude_sse(r.iter_lines())
except Exception as e: yield f"Error: {str(e)}" except Exception as e: yield f"Error: {str(e)}"
def make_messages(self, raw_list): def make_messages(self, raw_list):
msgs = [{"role": m['role'], "content": [{"type": "text", "text": m['prompt']}]} for m in raw_list] msgs = [{"role": m['role'], "content": list(m['content'])} for m in raw_list]
c = msgs[-1]["content"] c = msgs[-1]["content"]
c[-1] = dict(c[-1], cache_control={"type": "ephemeral"}) c[-1] = dict(c[-1], cache_control={"type": "ephemeral"})
return msgs return msgs
def ask(self, prompt, model=None, stream=False):
def _ask_gen():
content = ''
with self.lock:
self.raw_msgs.append({"role": "user", "prompt": prompt})
self.raw_msgs = self._trim_messages(self.raw_msgs)
messages = self.make_messages(self.raw_msgs)
for chunk in self.raw_ask(messages, model):
content += chunk; yield chunk
if not content.startswith("Error:"): self.raw_msgs.append({"role": "assistant", "prompt": content})
return _ask_gen() if stream else ''.join(list(_ask_gen()))
class LLMSession:
def __init__(self, cfg):
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
self.default_model = cfg['model']
self.context_win = cfg.get('context_win', 20000)
self.raw_msgs, self.messages = [], []
proxy = cfg.get('proxy')
self.proxies = {"http": proxy, "https": proxy} if proxy else None
self.lock = threading.Lock()
self.max_retries = max(0, int(cfg.get('max_retries', 2)))
self.connect_timeout = max(1, int(cfg.get('connect_timeout', 10)))
self.read_timeout = max(5, int(cfg.get('read_timeout', 120)))
effort = cfg.get('reasoning_effort')
effort = None if effort is None else str(effort).strip().lower()
self.reasoning_effort = effort if effort in ['none', 'minimal','low', 'medium', 'high', 'xhigh'] else None
if effort and self.reasoning_effort is None: print(f"[WARN] Invalid reasoning_effort {effort!r}, ignored.")
mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_')
if mode in ["responses", "response"]: self.api_mode = "responses"
else: self.api_mode = "chat_completions"
class LLMSession(BaseSession):
def raw_ask(self, messages, model=None, temperature=0.5): def raw_ask(self, messages, model=None, temperature=0.5):
if model is None: model = self.default_model if model is None: model = self.default_model
yield from _openai_stream(self.api_base, self.api_key, messages, model, self.api_mode, yield from _openai_stream(self.api_base, self.api_key, messages, model, self.api_mode,
temperature=temperature, reasoning_effort=self.reasoning_effort, temperature=temperature, reasoning_effort=self.reasoning_effort,
max_retries=self.max_retries, connect_timeout=self.connect_timeout, max_retries=self.max_retries, connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout, proxies=self.proxies) read_timeout=self.read_timeout, proxies=self.proxies)
def make_messages(self, raw_list): return _msgs_claude2oai(raw_list)
def make_messages(self, raw_list, omit_images=True): class NativeClaudeSession(BaseSession):
compress_history_tags(raw_list)
messages = []
for i, msg in enumerate(raw_list):
prompt = msg['prompt']
image = msg.get('image')
if omit_images and image: messages.append({"role": msg['role'], "content": "[Image omitted, if you needed it, ask me]\n" + prompt})
elif not omit_images and image:
messages.append({"role": msg['role'], "content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image}"}},
{"type": "text", "text": prompt} ]})
else:
messages.append({"role": msg['role'], "content": prompt})
return messages
def summary_history(self, model=None):
if model is None: model = self.default_model
with self.lock:
keep = 0; tok = 0
for m in reversed(self.raw_msgs):
l = len(str(m))//3
if tok + l > self.context_win*0.2: break
tok += l; keep += 1
keep = max(2, keep)
old, self.raw_msgs = self.raw_msgs[:-keep], self.raw_msgs[-keep:]
if len(old) == 0: old = self.raw_msgs; self.raw_msgs = []
p = "Summarize prev summary and prev conversations into compact memory (facts/decisions/constraints/open questions). Do NOT restate long schemas. The new summary should less than 1000 tokens. Permit dropping non-important things.\n"
messages = self.make_messages(old, omit_images=True)
messages += [{"role":"user", "content":p}]
msg_lens = [1000 if isinstance(m["content"], list) else len(str(m["content"]))//3 for m in messages]
summary = ''.join(list(self.raw_ask(messages, model, temperature=0.1)))
print('[Debug] Summary length:', len(summary)//3, '; Orig context lengths:', str(msg_lens))
if not summary.startswith("Error:"):
self.raw_msgs.insert(0, {"role":"assistant", "prompt":"Prev summary:\n"+summary, "image":None})
else: self.raw_msgs = old + self.raw_msgs # 不做了,下次再做
def ask(self, prompt, model=None, image_base64=None, stream=False):
if model is None: model = self.default_model
def _ask_gen():
content = ''
with self.lock:
self.raw_msgs.append({"role": "user", "prompt": prompt, "image": image_base64})
messages = self.make_messages(self.raw_msgs[:-1], omit_images=True)
messages += self.make_messages([self.raw_msgs[-1]], omit_images=False)
msg_lens = [1000 if isinstance(m["content"], list) else len(str(m["content"]))//3 for m in messages]
total_len = sum(msg_lens) # estimate token count
gen = self.raw_ask(messages, model)
for chunk in gen:
content += chunk; yield chunk
if not content.startswith("Error:"):
self.raw_msgs.append({"role": "assistant", "prompt": content, "image": None})
if total_len > self.context_win // 2: print(f"[Debug] Whole context length {total_len} {str(msg_lens)}.")
if total_len > self.context_win:
yield '[NextWillSummary]'
threading.Thread(target=self.summary_history, daemon=True).start()
if stream: return _ask_gen()
return ''.join(list(_ask_gen()))
class NativeOAISession:
def __init__(self, cfg): def __init__(self, cfg):
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/') super().__init__(cfg)
self.default_model = cfg.get('model', 'gpt-4o') self.context_win = cfg.get("context_win", 24000)
self.context_win = cfg.get('context_win', 24000) self.no_system_prompt = cfg.get("no_system_prompt", False)
proxy = cfg.get('proxy')
self.proxies = {"http": proxy, "https": proxy} if proxy else None
self.history = []; self.system = ''; self.lock = threading.Lock()
self.max_retries = max(0, int(cfg.get('max_retries', 2)))
self.connect_timeout = max(1, int(cfg.get('connect_timeout', 10)))
self.read_timeout = max(5, int(cfg.get('read_timeout', 120)))
effort = cfg.get('reasoning_effort')
effort = None if effort is None else str(effort).strip().lower()
self.reasoning_effort = effort if effort in ('low', 'medium', 'high') else None
if effort and not self.reasoning_effort: print(f"[WARN] Invalid reasoning_effort {effort!r}, ignored.")
mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_')
self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions'
def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144, **kw):
"""OpenAI streaming. yields text chunks, generator return = list[content_block]"""
model = model or self.default_model
msgs = ([{"role": "system", "content": system}] if system else []) + messages
return (yield from _openai_stream(self.api_base, self.api_key, msgs, model, self.api_mode,
temperature=temperature, max_tokens=max_tokens, tools=tools,
reasoning_effort=self.reasoning_effort,
max_retries=self.max_retries, connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout, proxies=self.proxies))
def ask(self, msg, tools=None, model=None, **kw):
assert type(msg) is dict
with self.lock:
self.history.append(msg)
compress_history_tags(self.history)
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history)
print(f'[Debug] Current context: {cost} chars, {len(self.history)} messages.')
if cost > self.context_win * 3:
target = self.context_win * 3 * 0.6
while len(self.history) > 2 and cost > target:
self.history.pop(0); self.history.pop(0)
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history)
print(f'[Debug] Trimmed context, current: {cost} chars, {len(self.history)} messages.')
messages = list(self.history)
content_blocks = None
gen = self.raw_ask(messages, tools, self.system, model)
try:
while True: yield next(gen)
except StopIteration as e: content_blocks = e.value or []
if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("Error:")):
hist_texts = [b for b in content_blocks if b.get("type") != "tool_use"]
hist_tools = [b for b in content_blocks if b.get("type") == "tool_use"]
if hist_tools: hist_texts.append({"type": "text", "text": json.dumps(hist_tools, ensure_ascii=False)})
self.history.append({"role": "assistant", "content": hist_texts or content_blocks})
text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"]
content = "\n".join(text_parts).strip()
tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in content_blocks if b.get("type") == "tool_use"]
if len(tool_calls) == 0 and content.endswith('}]'):
_pat = next((p for p in ['[{"type":"tool_use"', '[{"type": "tool_use"'] if p in content), None)
if _pat:
try:
idx = content.index(_pat); raw = json.loads(content[idx:])
tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in raw if b.get("type") == "tool_use"]
content = content[:idx].strip()
except: pass
think_pattern = r"<think(?:ing)?>(.*?)</think(?:ing)?>"; thinking = ''
think_match = re.search(think_pattern, content, re.DOTALL)
if think_match:
thinking = think_match.group(1).strip()
content = re.sub(think_pattern, "", content, flags=re.DOTALL)
return MockResponse(thinking, content, tool_calls, str(content_blocks))
class NativeClaudeSession:
def __init__(self, cfg):
self.api_key = cfg['apikey']; self.api_base = cfg['apibase'].rstrip('/')
self.default_model = cfg.get('model', 'claude-opus')
self.context_win = cfg.get('context_win', 24000)
self.history = []; self.system = ''; self.lock = threading.Lock()
def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144): def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144):
model = model or self.default_model model = model or self.default_model
headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01",
"anthropic-beta": "prompt-caching-2024-07-31", "x-app": "cli", "user-agent": "claude-cli/2.1.80 (external, cli)"}
if self.api_key.startswith("cr_"): headers["authorization"] = f"Bearer {self.api_key}"
else: headers["x-api-key"] = self.api_key
payload = {"model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True} payload = {"model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": True}
payload["metadata"] = {"user_id": json.dumps({"device_id":uuid.uuid4().hex+uuid.uuid4().hex[:32],"account_uuid":"","session_id":str(uuid.uuid4())},separators=(',',':'))}
if tools: if tools:
tools = [dict(t) for t in tools]; tools[-1]["cache_control"] = {"type": "ephemeral"} tools = [dict(t) for t in tools]; tools[-1]["cache_control"] = {"type": "ephemeral"}
payload["tools"] = tools payload["tools"] = tools
if system: payload["system"] = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}] payload['system'] = []
if system:
if self.no_system_prompt: messages[0]["content"].insert(0, {"type": "text", "text": f"{system}\n"})
else: payload["system"] = [{"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}]
messages[-1] = {**messages[-1], "content": list(messages[-1]["content"])} messages[-1] = {**messages[-1], "content": list(messages[-1]["content"])}
messages[-1]["content"][-1] = dict(messages[-1]["content"][-1], cache_control={"type": "ephemeral"}) messages[-1]["content"][-1] = dict(messages[-1]["content"][-1], cache_control={"type": "ephemeral"})
try: try:
@@ -567,15 +508,7 @@ class NativeClaudeSession:
assert type(msg) is dict assert type(msg) is dict
with self.lock: with self.lock:
self.history.append(msg) self.history.append(msg)
compress_history_tags(self.history) trim_messages_history(self.history, self.context_win)
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history)
print(f'[Debug] Current context: {cost} chars, {len(self.history)} messages.')
if cost > self.context_win * 3:
target = self.context_win * 3 * 0.6
while len(self.history) > 2 and cost > target:
self.history.pop(0); self.history.pop(0)
cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in self.history)
print(f'[Debug] Trimmed context, current: {cost} chars, {len(self.history)} messages.')
messages = list(self.history) messages = list(self.history)
content_blocks = None content_blocks = None
@@ -585,15 +518,35 @@ class NativeClaudeSession:
except StopIteration as e: content_blocks = e.value or [] except StopIteration as e: content_blocks = e.value or []
if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("Error:")): if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("Error:")):
self.history.append({"role": "assistant", "content": content_blocks}) self.history.append({"role": "assistant", "content": content_blocks})
thinking = ''
text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"] text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"]
content = "\n".join(text_parts).strip() content = "\n".join(text_parts).strip()
tool_calls = [] tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in content_blocks if b.get("type") == "tool_use"]
for b in content_blocks: if len(tool_calls) == 0 and content.endswith('}]'):
if b.get("type") == "tool_use": _pat = next((p for p in ['[{"type":"tool_use"', '[{"type": "tool_use"'] if p in content), None)
tool_calls.append(MockToolCall(b["name"], b.get("input", {}), id=b.get("id", ""))) if _pat:
try:
idx = content.index(_pat); raw = json.loads(content[idx:])
tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in raw if b.get("type") == "tool_use"]
content = content[:idx].strip()
except: pass
think_pattern = r"<think(?:ing)?>(.*?)</think(?:ing)?>"; thinking = ''
think_match = re.search(think_pattern, content, re.DOTALL)
if think_match:
thinking = think_match.group(1).strip()
content = re.sub(think_pattern, "", content, flags=re.DOTALL)
return MockResponse(thinking, content, tool_calls, str(content_blocks)) return MockResponse(thinking, content, tool_calls, str(content_blocks))
class NativeOAISession(NativeClaudeSession):
def raw_ask(self, messages, tools=None, system=None, model=None, temperature=0.5, max_tokens=6144, **kw):
"""OpenAI streaming. yields text chunks, generator return = list[content_block]"""
model = model or self.default_model
msgs = ([{"role": "system", "content": system}] if system else []) + _msgs_claude2oai(messages)
return (yield from _openai_stream(self.api_base, self.api_key, msgs, model, self.api_mode,
temperature=temperature, max_tokens=max_tokens, tools=tools,
reasoning_effort=self.reasoning_effort,
max_retries=self.max_retries, connect_timeout=self.connect_timeout,
read_timeout=self.read_timeout, proxies=self.proxies))
def openai_tools_to_claude(tools): def openai_tools_to_claude(tools):
"""[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}].""" """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}]."""
result = [] result = []
@@ -628,19 +581,14 @@ class ToolClient:
self.backend = backend self.backend = backend
self.auto_save_tokens = auto_save_tokens self.auto_save_tokens = auto_save_tokens
self.last_tools = '' self.last_tools = ''
self.name = self.backend.name
self.total_cd_tokens = 0 self.total_cd_tokens = 0
def chat(self, messages, tools=None): def chat(self, messages, tools=None):
if self._should_use_structured_messages(messages): full_prompt = self._build_protocol_prompt(messages, tools)
backend_messages = self._build_backend_messages(messages, tools) print("Full prompt length:", len(full_prompt), 'chars')
print("Structured prompt length:", sum(self._estimate_content_len(m.get("content")) for m in backend_messages), 'chars') prompt_log = full_prompt
prompt_log = self._serialize_messages_for_log(backend_messages) gen = self.backend.ask(full_prompt, stream=True)
gen = self.backend.raw_ask(backend_messages)
else:
full_prompt = self._build_protocol_prompt(messages, tools)
print("Full prompt length:", len(full_prompt), 'chars')
prompt_log = full_prompt
gen = self.backend.ask(full_prompt, stream=True)
_write_llm_log('Prompt', prompt_log) _write_llm_log('Prompt', prompt_log)
raw_text = ''; summarytag = '[NextWillSummary]' raw_text = ''; summarytag = '[NextWillSummary]'
for chunk in gen: for chunk in gen:
@@ -651,8 +599,7 @@ class ToolClient:
_write_llm_log('Response', raw_text) _write_llm_log('Response', raw_text)
return self._parse_mixed_response(raw_text) return self._parse_mixed_response(raw_text)
def _should_use_structured_messages(self, messages): #def _should_use_structured_messages(self, messages): return isinstance(self.backend, LLMSession) and any(isinstance(m.get("content"), list) for m in messages)
return isinstance(self.backend, LLMSession) and any(isinstance(m.get("content"), list) for m in messages)
def _estimate_content_len(self, content): def _estimate_content_len(self, content):
if isinstance(content, str): return len(content) if isinstance(content, str): return len(content)
@@ -688,53 +635,20 @@ class ToolClient:
self.last_tools = tools_json self.last_tools = tools_json
return tool_instruction return tool_instruction
def _build_backend_messages(self, messages, tools):
system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "")
history_msgs = [m for m in messages if m['role'].lower() != 'system']
tool_instruction = self._prepare_tool_instruction(tools)
backend_messages = []
merged_system = f"{system_content}\n{tool_instruction}".strip() if tool_instruction else system_content
if merged_system: backend_messages.append({"role": "system", "content": merged_system})
for m in history_msgs:
backend_messages.append({"role": m['role'], "content": m['content']})
self.total_cd_tokens += self._estimate_content_len(m['content'])
if self.total_cd_tokens > 6000: self.last_tools = ''
return backend_messages
def _serialize_messages_for_log(self, messages):
logged = []
for msg in messages:
content = msg.get("content")
if isinstance(content, list):
parts = []
for part in content:
if not isinstance(part, dict): continue
if part.get("type") == "text":
parts.append({"type": "text", "text": part.get("text", "")})
elif part.get("type") == "image_url":
url = (part.get("image_url") or {}).get("url", "")
prefix = url.split(",", 1)[0] if url else "data:image/unknown;base64"
parts.append({"type": "image_url", "image_url": {"url": prefix + ",<omitted>"}})
else:
parts.append(part)
logged.append({"role": msg.get("role"), "content": parts})
else:
logged.append(msg)
return json.dumps(logged, ensure_ascii=False, indent=2)
def _build_protocol_prompt(self, messages, tools): def _build_protocol_prompt(self, messages, tools):
system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "") system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "")
history_msgs = [m for m in messages if m['role'].lower() != 'system'] history_msgs = [m for m in messages if m['role'].lower() != 'system']
tool_instruction = self._prepare_tool_instruction(tools) tool_instruction = self._prepare_tool_instruction(tools)
system = "" system = ""; user = ""
if system_content: system += f"{system_content}\n" if system_content: system += f"{system_content}\n"
system += f"{tool_instruction}" system += f"{tool_instruction}"
user = ""
for m in history_msgs: for m in history_msgs:
role = "USER" if m['role'] == 'user' else "ASSISTANT" role = "USER" if m['role'] == 'user' else "ASSISTANT"
user += f"=== {role} ===\n{m['content']}\n\n" user += f"=== {role} ===\n"
self.total_cd_tokens += self._estimate_content_len(m['content']) for tr in m.get('tool_results', []): user += f'<tool_result>{tr["content"]}</tool_result>\n'
if self.total_cd_tokens > 6000: self.last_tools = '' user += str(m['content']) + "\n"
self.total_cd_tokens += self._estimate_content_len(user)
if self.total_cd_tokens > 9000: self.last_tools = ''
user += "=== ASSISTANT ===\n" user += "=== ASSISTANT ===\n"
return system + user return system + user
@@ -808,15 +722,17 @@ def tryparse(json_str):
if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1] if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1]
return json.loads(json_str) return json.loads(json_str)
class MixinSession: class MixinSession:
"""Multi-session fallback with spring-back to primary.""" """Multi-session fallback with spring-back to primary."""
def __init__(self, all_sessions, cfg): def __init__(self, all_sessions, cfg):
self._retries, self._base_delay = cfg.get('max_retries', 3), cfg.get('base_delay', 1.5) self._retries, self._base_delay = cfg.get('max_retries', 3), cfg.get('base_delay', 1.5)
self._spring_sec = cfg.get('spring_back', 300) self._spring_sec = cfg.get('spring_back', 300)
self._sessions = [all_sessions[i].backend for i in cfg.get('llm_nos', [])] self._sessions = [all_sessions[i].backend if isinstance(i, int) else
assert 'Native' not in self._sessions[0].__class__.__name__ next(s.backend for s in all_sessions if type(s) is not dict and s.backend.name == i) for i in cfg.get('llm_nos', [])]
assert len(set(type(s) for s in self._sessions)) == 1, f'MixinSession: all sessions must be same type, got {[type(s).__name__ for s in self._sessions]}' is_native = lambda s: 'Native' in s.__class__.__name__
groups = {is_native(s) for s in self._sessions}
assert len(groups) == 1, f"MixinSession: sessions must be in same group (Native or non-Native), got {[type(s).__name__ for s in self._sessions]}"
self.name = '|'.join(s.name for s in self._sessions)
self._orig_raw_asks = [s.raw_ask for s in self._sessions] self._orig_raw_asks = [s.raw_ask for s in self._sessions]
self._sessions[0].raw_ask = self._raw_ask self._sessions[0].raw_ask = self._raw_ask
self.default_model = getattr(self._sessions[0], 'default_model', None) self.default_model = getattr(self._sessions[0], 'default_model', None)
@@ -866,25 +782,32 @@ class NativeToolClient:
self.backend = backend self.backend = backend
self.backend.system = self.THINKING_PROMPT self.backend.system = self.THINKING_PROMPT
self.tools = {} self.tools = {}
self.name = self.backend.name
self._pending_tool_ids = [] self._pending_tool_ids = []
def set_system(self, extra_system): def set_system(self, extra_system):
combined = f"{extra_system}\n\n{self.THINKING_PROMPT}" if extra_system else self.THINKING_PROMPT combined = f"{extra_system}\n\n{self.THINKING_PROMPT}" if extra_system else self.THINKING_PROMPT
if combined != self.backend.system: print(f"[Debug] Updated system prompt, length {len(combined)} chars.") if combined != self.backend.system: print(f"[Debug] Updated system prompt, length {len(combined)} chars.")
self.backend.system = combined self.backend.system = combined
def chat(self, messages, tools=None): def chat(self, messages, tools=None):
if tools: self.tools = openai_tools_to_claude(tools) if isinstance(self.backend, NativeClaudeSession) else tools if tools: self.tools = openai_tools_to_claude(tools) if type(self.backend) is NativeClaudeSession else tools
combined_content = []; resp = None combined_content = []; resp = None; tool_results = []
for msg in messages: for msg in messages:
c = msg.get('content', '') c = msg.get('content', '')
if msg['role'] == 'system': if msg['role'] == 'system':
self.set_system(c); continue self.set_system(c); continue
if isinstance(c, str): combined_content.append({"type": "text", "text": c}) if isinstance(c, str): combined_content.append({"type": "text", "text": c})
elif isinstance(c, list): combined_content.extend(c) elif isinstance(c, list): combined_content.extend(c)
if self._pending_tool_ids and isinstance(self.backend, NativeClaudeSession): if msg['role'] == 'user' and msg.get('tool_results'): tool_results.extend(msg['tool_results'])
tool_result_blocks = [{"type": "tool_result", "tool_use_id": tid, "content": ""} for tid in self._pending_tool_ids] tr_id_set = set(); tool_result_blocks = []
combined_content = tool_result_blocks + combined_content for tr in tool_results:
self._pending_tool_ids = [] tool_use_id, content = tr.get("tool_use_id", ""), tr.get("content", "")
merged = {"role": "user", "content": combined_content} tr_id_set.add(tool_use_id)
if tool_use_id: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tool_use_id, "content": tr.get("content", "")})
else: combined_content = [{"type": "text", "text": f'<tool_result>{content}</tool_result>'}] + combined_content
for tid in self._pending_tool_ids:
if tid not in tr_id_set: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tid, "content": ""})
self._pending_tool_ids = []
merged = {"role": "user", "content": tool_result_blocks + combined_content}
_write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2)) _write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2))
gen = self.backend.ask(merged, self.tools); gen = self.backend.ask(merged, self.tools);
try: try:
@@ -899,6 +822,5 @@ class NativeToolClient:
resp.thinking = think_match.group(1).strip() resp.thinking = think_match.group(1).strip()
text = re.sub(r'<think(?:ing)?>.*?</think(?:ing)?>', '', text, flags=re.DOTALL) text = re.sub(r'<think(?:ing)?>.*?</think(?:ing)?>', '', text, flags=re.DOTALL)
resp.content = text.strip() resp.content = text.strip()
if resp and hasattr(resp, 'tool_calls') and resp.tool_calls and isinstance(self.backend, NativeClaudeSession): if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls]
self._pending_tool_ids = [tc.id for tc in resp.tool_calls]
return resp return resp

View File

@@ -42,4 +42,5 @@ ljqCtrl.Click(px, py)
- **偏移量**:所有的相对偏移像素值(如“向右移动 10 像素”)同样需要除以 `dpi_scale` - **偏移量**:所有的相对偏移像素值(如“向右移动 10 像素”)同样需要除以 `dpi_scale`
- **坐标对齐**: 物理坐标 = 截图坐标ljqCtrl 自动处理 DPI 换算,禁止手动重复计算。 - **坐标对齐**: 物理坐标 = 截图坐标ljqCtrl 自动处理 DPI 换算,禁止手动重复计算。
- **⚠️ 窗口坐标转换陷阱**:使用 `win32gui.GetWindowRect(hwnd)` 获取的矩形包含标题栏和边框,而截图内容是客户区。点击截图内元素时,必须用 `win32gui.ClientToScreen(hwnd, (0, 0))` 获取客户区原点的屏幕坐标,再加上截图内坐标。禁止直接用 GetWindowRect 左上角 + 截图坐标。 - **⚠️ 窗口坐标转换陷阱**:使用 `win32gui.GetWindowRect(hwnd)` 获取的矩形包含标题栏和边框,而截图内容是客户区。点击截图内元素时,必须用 `win32gui.ClientToScreen(hwnd, (0, 0))` 获取客户区原点的屏幕坐标,再加上截图内坐标。禁止直接用 GetWindowRect 左上角 + 截图坐标。
- **⚠️ win32 DPI 坐标陷阱**:未调用 `SetProcessDPIAware()` 时,`GetWindowRect/ClientToScreen/GetClientRect` 等拿到的窗口/客户区坐标通常是**逻辑坐标**;若后续截图或 `ljqCtrl` 使用的是物理像素,必须统一做 `坐标 / ljqCtrl.dpi_scale`。等价方案:先 `SetProcessDPIAware()`,之后全流程直接使用 raw 物理坐标,禁止逻辑/物理坐标混用。
- **文本输入**ljqCtrl 无 TypeText/SendKeys。向输入框键入文本先点击/三击选中字段,再 `pyperclip.copy('文本'); ljqCtrl.Press('ctrl+v')` - **文本输入**ljqCtrl 无 TypeText/SendKeys。向输入框键入文本先点击/三击选中字段,再 `pyperclip.copy('文本'); ljqCtrl.Press('ctrl+v')`