feat: add ACK mechanism for WS execute_js + timeout diagnostics

- TMWebDriver: add self.acks dict, HTTP implicit ACK on longpoll dequeue, WS explicit ACK handler, enhanced timeout diagnostics (4 cases)
- Fix: data.get('type') in 'result' -> == 'result'
- JS userscript: send ACK on script receive
- tgapp: fix proxy URL prefix
- mykey_template: add tg/proxy fields
This commit is contained in:
Liang Jiaqing
2026-02-13 15:30:49 +08:00
parent b06cd52cf2
commit 8d3582e3b7
5 changed files with 27 additions and 18 deletions

View File

@@ -34,11 +34,8 @@ class Session:
class TMWebDriver: class TMWebDriver:
def __init__(self, host: str = 'localhost', port: int = 18765): def __init__(self, host: str = 'localhost', port: int = 18765):
self.host = host self.host, self.port = host, port
self.port = port self.sessions, self.results, self.acks = {}, {}, {}
self.sessions = {}
self.results = {}
self.default_session_id = None self.default_session_id = None
self.latest_session_id = None self.latest_session_id = None
self.last_cmd_time = 0 self.last_cmd_time = 0
@@ -67,7 +64,11 @@ class TMWebDriver:
else: return json.dumps({"id": "", "ret": "use ws"}) else: return json.dumps({"id": "", "ret": "use ws"})
start_time = time.time() start_time = time.time()
while time.time() - start_time < 5: while time.time() - start_time < 5:
try: return msgQ.get(timeout=0.2) try:
msg = msgQ.get(timeout=0.2)
try: self.acks[json.loads(msg).get('id','')] = True
except: pass
return msg
except queue.Empty: continue except queue.Empty: continue
return json.dumps({"id": "", "ret": "next long-poll"}) return json.dumps({"id": "", "ret": "next long-poll"})
@@ -100,13 +101,11 @@ class TMWebDriver:
except Exception as e: except Exception as e:
return json.dumps({'error': str(e)}, ensure_ascii=False) return json.dumps({'error': str(e)}, ensure_ascii=False)
return 'ok' return 'ok'
def run(): def run():
import asyncio import asyncio
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
bottle.run(app, host=self.host, port=self.port+1, server='tornado', threads=20) bottle.run(app, host=self.host, port=self.port+1, server='tornado', threads=20)
http_thread = threading.Thread(target=run) http_thread = threading.Thread(target=run)
http_thread.daemon = True http_thread.daemon = True
http_thread.start() http_thread.start()
@@ -129,7 +128,8 @@ class TMWebDriver:
session_info = {'url': data.get('url'), 'title': data.get('title', ''), session_info = {'url': data.get('url'), 'title': data.get('title', ''),
'connected_at': time.time(), 'type': 'ws'} 'connected_at': time.time(), 'type': 'ws'}
driver._register_client(session_id, self, session_info) driver._register_client(session_id, self, session_info)
elif data.get('type') in 'result': elif data.get('type') == 'ack': driver.acks[data.get('id','')] = True
elif data.get('type') == 'result':
driver.results[data.get('id')] = {'success': True, 'data': data.get('result'), 'newTabs': data.get('newTabs', [])} driver.results[data.get('id')] = {'success': True, 'data': data.get('result'), 'newTabs': data.get('newTabs', [])}
elif data.get('type') == 'error': elif data.get('type') == 'error':
driver.results[data.get('id')] = {'success': False, 'data': data.get('error')} driver.results[data.get('id')] = {'success': False, 'data': data.get('error')}
@@ -199,7 +199,6 @@ class TMWebDriver:
print(f"会话 {session_id} 未连接,自动切换到最新活动会话: {session.id}") print(f"会话 {session_id} 未连接,自动切换到最新活动会话: {session.id}")
session_id = self.default_session_id = session.id session_id = self.default_session_id = session.id
if not session or not session.is_active(): if not session or not session.is_active():
#breakpoint()
raise ValueError(f"会话ID {session_id} 未连接") raise ValueError(f"会话ID {session_id} 未连接")
tp = session.type tp = session.type
@@ -214,10 +213,12 @@ class TMWebDriver:
start_time = time.time() start_time = time.time()
self.clean_sessions() self.clean_sessions()
hasjump = False hasjump = acked = False
while exec_id not in self.results: while exec_id not in self.results:
time.sleep(0.1) time.sleep(0.1)
if not acked and exec_id in self.acks:
acked = True; start_time = time.time()
if tp == 'ws': if tp == 'ws':
if not session.is_active(): hasjump = True if not session.is_active(): hasjump = True
if hasjump and session.is_active(): if hasjump and session.is_active():
@@ -226,11 +227,14 @@ class TMWebDriver:
if time.time() - start_time > timeout: if time.time() - start_time > timeout:
if tp == 'ws': if tp == 'ws':
if hasjump: return {"result": f"Session {session_id} reloaded and new page is loading...", "closed":1} if hasjump: return {"result": f"Session {session_id} reloaded and new page is loading...", "closed":1}
return {"result": f"No response data in {timeout}s"} if acked: return {"result": f"No response data in {timeout}s (ACK received, script may still be running)"}
return {"result": f"No response data in {timeout}s (no ACK, script may not have been delivered)"}
elif tp == 'http': elif tp == 'http':
return {"result": f"Session {session_id} no response."} if acked: return {"result": f"Session {session_id} no response in {timeout}s (delivered but no result)"}
return {"result": f"Session {session_id} no response in {timeout}s (script not polled)"}
result = self.results.pop(exec_id) result = self.results.pop(exec_id)
if exec_id in self.acks: self.acks.pop(exec_id)
if not result['success']: raise Exception(result['data']) if not result['success']: raise Exception(result['data'])
if not self.is_remote and auto_switch_newtab: if not self.is_remote and auto_switch_newtab:
newtabs = result.get('newTabs', []) newtabs = result.get('newTabs', [])

View File

@@ -366,6 +366,7 @@
ws.onmessage = async function(e) { ws.onmessage = async function(e) {
try { try {
let data = JSON.parse(e.data); let data = JSON.parse(e.data);
ws.send(JSON.stringify({type: 'ack',id: data.id}));
let startTime = Date.now(); let startTime = Date.now();
let newTabs = []; let newTabs = [];
let checkNewTab = data.auto_switch_newtab === true; let checkNewTab = data.auto_switch_newtab === true;

View File

@@ -22,3 +22,9 @@ claude_config = {
'apibase':"http://233.145.139.147:3001/", 'apibase':"http://233.145.139.147:3001/",
'model':"claude-opus" 'model':"claude-opus"
} }
# If you need them
# tg_bot_token = '84102K2gYZ...'
# tg_allowed_users = [6806...]
# proxy = "http://127.0.0.1:2082"

View File

@@ -23,8 +23,7 @@ agent = init()
st.title("🖥️ Cowork") st.title("🖥️ Cowork")
if 'autonomous_enabled' not in st.session_state: if 'autonomous_enabled' not in st.session_state: st.session_state.autonomous_enabled = False
st.session_state.autonomous_enabled = False
@st.fragment @st.fragment
def render_sidebar(): def render_sidebar():

View File

@@ -89,10 +89,9 @@ if __name__ == '__main__':
try: try:
_lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); _lock_sock.bind(('127.0.0.1', 19527)) _lock_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM); _lock_sock.bind(('127.0.0.1', 19527))
except OSError: sys.exit('Another instance is already running.') except OSError: sys.exit('Another instance is already running.')
if not ALLOWED: if not ALLOWED: sys.exit('ERROR: tg_allowed_users in mykey.py is empty or missing. Set it to avoid unauthorized access.')
sys.exit('ERROR: tg_allowed_users in mykey.py is empty or missing. Set it to avoid unauthorized access.')
threading.Thread(target=agent.run, daemon=True).start() threading.Thread(target=agent.run, daemon=True).start()
proxy = vars(mykey).get('proxy', '127.0.0.1:2082') proxy = vars(mykey).get('proxy', 'http://127.0.0.1:2082')
app = ApplicationBuilder().token(mykey.tg_bot_token).proxy(proxy).get_updates_proxy(proxy).build() app = ApplicationBuilder().token(mykey.tg_bot_token).proxy(proxy).get_updates_proxy(proxy).build()
app.add_handler(CommandHandler("stop", cmd_abort)) app.add_handler(CommandHandler("stop", cmd_abort))
app.add_handler(CommandHandler("llm", cmd_llm)) app.add_handler(CommandHandler("llm", cmd_llm))