agentmain: add timeout+sampling snapshot for task queue consumer
This commit is contained in:
13
agentmain.py
13
agentmain.py
@@ -125,18 +125,21 @@ if __name__ == '__main__':
|
|||||||
threading.Thread(target=agent.run, daemon=True).start()
|
threading.Thread(target=agent.run, daemon=True).start()
|
||||||
|
|
||||||
if args.task:
|
if args.task:
|
||||||
d = f'temp/{args.task}'; rp = f'{d}/reply.txt'
|
d = f'temp/{args.task}'; rp = f'{d}/reply.txt'; nround = ''
|
||||||
with open(f'{d}/input.txt', encoding='utf-8') as f: raw = f.read()
|
with open(f'{d}/input.txt', encoding='utf-8') as f: raw = f.read()
|
||||||
while True:
|
while True:
|
||||||
dq = agent.put_task(raw, source='task')
|
dq = agent.put_task(raw, source='task')
|
||||||
while 'done' not in (item := dq.get()): pass
|
while 'done' not in (item := dq.get(timeout=120)):
|
||||||
with open(f'{d}/output.txt', 'w', encoding='utf-8') as f: f.write(item['done'])
|
if 'next' in item and random.random() < 0.01: # 1/100的概率写一次中间结果
|
||||||
for _ in range(300): # 等reply.txt,5分钟超时
|
with open(f'{d}/output{nround}.txt', 'w', encoding='utf-8') as f: f.write(item.get('next', ''))
|
||||||
time.sleep(1)
|
with open(f'{d}/output{nround}.txt', 'w', encoding='utf-8') as f: f.write(item['done'])
|
||||||
|
for _ in range(150): # 等reply.txt,5分钟超时
|
||||||
|
time.sleep(2)
|
||||||
if os.path.exists(rp):
|
if os.path.exists(rp):
|
||||||
with open(rp, encoding='utf-8') as f: raw = f.read()
|
with open(rp, encoding='utf-8') as f: raw = f.read()
|
||||||
os.remove(rp); break
|
os.remove(rp); break
|
||||||
else: break
|
else: break
|
||||||
|
nround = int(nround) + 1 if nround.isdigit() else 1
|
||||||
elif args.scheduled:
|
elif args.scheduled:
|
||||||
def drain(dq, tag):
|
def drain(dq, tag):
|
||||||
while 'done' not in (item := dq.get()): pass
|
while 'done' not in (item := dq.get()): pass
|
||||||
|
|||||||
Reference in New Issue
Block a user