Refactor: simplify smart_format to string-only and enhance tool output truncation limits
This commit is contained in:
37
ga.py
37
ga.py
@@ -137,6 +137,7 @@ def web_scan(tabs_only=False, switch_tab_id=None, text_only=False):
|
||||
}
|
||||
if not tabs_only:
|
||||
importlib.reload(simphtml); result["content"] = simphtml.get_html(driver, cutlist=True, maxchars=35000, text_only=text_only)
|
||||
if text_only: result['content'] = smart_format(result['content'], max_str_len=10000, omit_str='\n\n[omitted long content]\n\n')
|
||||
return result
|
||||
except Exception as e:
|
||||
return {"status": "error", "msg": format_error(e)}
|
||||
@@ -233,7 +234,7 @@ def file_read(path, start=1, keyword=None, count=200, show_linenos=True):
|
||||
else: return f"Keyword '{keyword}' not found after line {start}. Falling back to content from line {start}:\n\n" \
|
||||
+ file_read(path, start, None, count, show_linenos)
|
||||
else: res = list(itertools.islice(stream, count))
|
||||
realcnt = len(res); L_MAX = max(100, 512000//realcnt); TAG = " ... [TRUNCATED]"
|
||||
realcnt = len(res); L_MAX = min(max(100, 256000//realcnt), 8000); TAG = " ... [TRUNCATED]"
|
||||
remaining = sum(1 for _ in itertools.islice(stream, 5000))
|
||||
total_lines = (res[0][0] - 1 if res else start - 1) + realcnt + remaining
|
||||
total_tag = "[FILE] Total " + (f"{total_lines}+" if remaining >= 5000 else str(total_lines)) + ' lines\n'
|
||||
@@ -243,17 +244,10 @@ def file_read(path, start=1, keyword=None, count=200, show_linenos=True):
|
||||
return result
|
||||
except Exception as e: return f"Error: {str(e)}"
|
||||
|
||||
def smart_format(data, max_depth=2, max_str_len=100, omit_str=' ... '):
|
||||
def truncate(obj, depth):
|
||||
if isinstance(obj, str):
|
||||
if len(obj) < max_str_len+len(omit_str)*2: return obj
|
||||
return f"{obj[:max_str_len//2]}{omit_str}{obj[-max_str_len//2:]}"
|
||||
if depth >= max_depth: return truncate(str(obj), depth + 1)
|
||||
if isinstance(obj, dict): return {k: truncate(v, depth + 1) for k, v in obj.items()}
|
||||
if isinstance(obj, list): return [truncate(i, depth + 1) for i in obj]
|
||||
return obj
|
||||
if isinstance(data, (str, bytes)): return truncate(data, 0)
|
||||
return json.dumps(truncate(data, 0), indent=2, ensure_ascii=False, default=str)
|
||||
def smart_format(data, max_str_len=100, omit_str=' ... '):
|
||||
if not isinstance(data, str): data = str(data)
|
||||
if len(data) < max_str_len + len(omit_str)*2: return data
|
||||
return f"{data[:max_str_len//2]}{omit_str}{data[-max_str_len//2:]}"
|
||||
|
||||
class GenericAgentHandler(BaseHandler):
|
||||
'''Generic Agent 工具库,包含多种工具的实现。工具函数自动加上了 do_ 前缀。实际工具名没有前缀。'''
|
||||
@@ -348,10 +342,12 @@ class GenericAgentHandler(BaseHandler):
|
||||
result["js_return"] += f"\n\n[已保存完整内容到 {abs_path}]"
|
||||
except:
|
||||
result['js_return'] += f"\n\n[保存失败,无法写入文件 {abs_path}]"
|
||||
try: print("Web Execute JS Result:", smart_format(result))
|
||||
show = smart_format(json.dumps(result, ensure_ascii=False, indent=2, default=json_default), max_str_len=300)
|
||||
try: print("Web Execute JS Result:", show)
|
||||
except: pass
|
||||
yield f"JS 执行结果:\n{smart_format(result)}\n"
|
||||
yield f"JS 执行结果:\n{show}\n"
|
||||
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
|
||||
result = json.dumps(result, ensure_ascii=False, default=json_default)
|
||||
return StepOutcome(smart_format(result, max_str_len=8000), next_prompt=next_prompt)
|
||||
|
||||
def do_file_patch(self, args, response):
|
||||
@@ -364,7 +360,7 @@ class GenericAgentHandler(BaseHandler):
|
||||
yield f"[Status] ❌ 引用展开失败: {e}\n"
|
||||
return StepOutcome({"status": "error", "msg": str(e)}, next_prompt="\n")
|
||||
result = file_patch(path, old_content, new_content)
|
||||
yield f"\n{smart_format(result)}\n"
|
||||
yield f"\n{str(result)}\n"
|
||||
next_prompt = self._get_anchor_prompt(skip=args.get('_index', 0) > 0)
|
||||
return StepOutcome(result, next_prompt=next_prompt)
|
||||
|
||||
@@ -415,8 +411,8 @@ class GenericAgentHandler(BaseHandler):
|
||||
if show_linenos:
|
||||
tips = '由于设置了show_linenos,以下返回信息为:(行号|)内容 。\n'
|
||||
result = tips + result
|
||||
if ' ... [TRUNCATED]' in result:
|
||||
result += '\n\n(某些行被截断,如需完整内容可改用 code_run 读取)'
|
||||
if ' ... [TRUNCATED]' in result: result += '\n\n(某些行被截断,如需完整内容可改用 code_run 读取)'
|
||||
result = smart_format(result, max_str_len=20000, omit_str='\n\n[omitted long content]\n\n')
|
||||
next_prompt = self._get_anchor_prompt()
|
||||
log_memory_access(path)
|
||||
if 'memory' in path or 'sop' in path:
|
||||
@@ -424,8 +420,7 @@ class GenericAgentHandler(BaseHandler):
|
||||
return StepOutcome(result, next_prompt=next_prompt)
|
||||
|
||||
def do_update_working_checkpoint(self, args, response):
|
||||
'''为整个任务设定后续需要临时记忆的重点。
|
||||
'''
|
||||
'''为整个任务设定后续需要临时记忆的重点。'''
|
||||
key_info = args.get("key_info", "")
|
||||
related_sop = args.get("related_sop", "")
|
||||
if "key_info" in args: self.working['key_info'] = key_info
|
||||
@@ -444,9 +439,9 @@ class GenericAgentHandler(BaseHandler):
|
||||
if not response or not content.strip():
|
||||
yield "[Warn] LLM returned an empty response. Retrying...\n"
|
||||
return StepOutcome({}, next_prompt="[System] Blank response, regenerate and tooluse")
|
||||
if '流异常中断,未收到完整响应 !!!]' in content:
|
||||
if '未收到完整响应 !!!]' in content[-100:]:
|
||||
return StepOutcome({}, next_prompt="[System] Incomplete response. Regenerate and tooluse.")
|
||||
if 'max_tokens !!!]' in content:
|
||||
if 'max_tokens !!!]' in content[-100:]:
|
||||
return StepOutcome({}, next_prompt="[System] max_tokens limit reached. Use multi small steps to do it.")
|
||||
# 2. 检测“包含较大代码块但未调用工具”的情况
|
||||
# 这里通过三引号代码块 + 最少字符数的方式粗略判断“大段代码”
|
||||
|
||||
Reference in New Issue
Block a user