From 8e6270e3a34b1e1b966039b3e81c7e19b45d99fb Mon Sep 17 00:00:00 2001 From: totoyang <34332614+totoyang@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:27:55 +0800 Subject: [PATCH] feat: optional Langfuse tracing for agent execution (#115) Self-activating langfuse tracing via monkey-patch: independent module, zero impact when langfuse_config unset. Captures LLM generation, tool calls, token usage from SSE streams. Co-authored-by: totoyang --- langfuse_tracing.py | 122 ++++++++++++++++++++++++++++++++++++++++++++ llmcore.py | 4 ++ mykey_template.py | 7 +++ 3 files changed, 133 insertions(+) create mode 100644 langfuse_tracing.py diff --git a/langfuse_tracing.py b/langfuse_tracing.py new file mode 100644 index 0000000..07a4aa2 --- /dev/null +++ b/langfuse_tracing.py @@ -0,0 +1,122 @@ +"""Opt-in Langfuse tracing. Self-activates on import if langfuse_config exists in mykey. + +Hooks only via monkey-patch so core files stay untouched: +- agent_loop.agent_runner_loop -> outer agent trace (parent of all below) +- llmcore._write_llm_log -> generation span (Prompt=start, Response=end) +- BaseHandler.tool_before/after -> tool span +""" +import threading, sys + +try: + from llmcore import _load_mykeys + _cfg = _load_mykeys().get('langfuse_config') + from langfuse import Langfuse + _lf = Langfuse(**_cfg) if _cfg else None +except Exception: + _lf = None + +if _lf: + import llmcore, agent_loop + _tls = threading.local() + + _orig_log = llmcore._write_llm_log + def _patched_log(label, content): + try: + if label == 'Prompt': + _tls.gen = _lf.start_observation(name='llm.chat', as_type='generation', input=content[:20000]) + _tls.usage = None + elif label == 'Response' and getattr(_tls, 'gen', None) is not None: + _tls.gen.update(output=content[:20000], usage_details=getattr(_tls, 'usage', None)) + _tls.gen.end(); _tls.gen = None + except Exception: pass + return _orig_log(label, content) + llmcore._write_llm_log = _patched_log + + def _extract_usage(buf): + u = {} + import json as _j + for line in buf: + s = line.decode('utf-8', 'replace') if isinstance(line, (bytes, bytearray)) else line + if not s or not s.startswith('data:'): continue + ds = s[5:].lstrip() + if ds == '[DONE]': continue + try: evt = _j.loads(ds) + except: continue + if evt.get('type') == 'message_start': + us = evt.get('message', {}).get('usage', {}) or {} + u['input'] = us.get('input_tokens', u.get('input', 0)) + if us.get('cache_creation_input_tokens'): u['cache_creation_input_tokens'] = us['cache_creation_input_tokens'] + if us.get('cache_read_input_tokens'): u['cache_read_input_tokens'] = us['cache_read_input_tokens'] + elif evt.get('type') == 'message_delta': + ot = (evt.get('usage') or {}).get('output_tokens') + if ot: u['output'] = ot + elif evt.get('type') == 'response.completed': + us = evt.get('response', {}).get('usage', {}) or {} + if us.get('input_tokens'): u['input'] = us['input_tokens'] + if us.get('output_tokens'): u['output'] = us['output_tokens'] + cr = (us.get('input_tokens_details') or {}).get('cached_tokens') + if cr: u['cache_read_input_tokens'] = cr + else: + us = evt.get('usage') + if us: + if us.get('prompt_tokens'): u['input'] = us['prompt_tokens'] + if us.get('completion_tokens'): u['output'] = us['completion_tokens'] + cr = (us.get('prompt_tokens_details') or {}).get('cached_tokens') + if cr: u['cache_read_input_tokens'] = cr + return u or None + + def _wrap_parser(orig): + def wrapped(resp_lines, *a, **kw): + buf = [] + def tee(): + for ln in resp_lines: + buf.append(ln); yield ln + ret = yield from orig(tee(), *a, **kw) + try: _tls.usage = _extract_usage(buf) + except Exception: pass + return ret + return wrapped + llmcore._parse_claude_sse = _wrap_parser(llmcore._parse_claude_sse) + llmcore._parse_openai_sse = _wrap_parser(llmcore._parse_openai_sse) + + _orig_before = agent_loop.BaseHandler.tool_before_callback + _orig_after = agent_loop.BaseHandler.tool_after_callback + + def _patched_before(self, tool_name, args, response): + try: + if not hasattr(_tls, 'tstack'): _tls.tstack = [] + a = {k: v for k, v in args.items() if k != '_index'} + _tls.tstack.append(_lf.start_observation(name=tool_name, as_type='tool', input=a)) + except Exception: pass + return _orig_before(self, tool_name, args, response) + + def _patched_after(self, tool_name, args, response, ret): + try: + if getattr(_tls, 'tstack', None): + sp = _tls.tstack.pop() + out = {'data': ret.data, 'next_prompt': ret.next_prompt, 'should_exit': ret.should_exit} if ret else None + sp.update(output=out); sp.end() + except Exception: pass + return _orig_after(self, tool_name, args, response, ret) + + agent_loop.BaseHandler.tool_before_callback = _patched_before + agent_loop.BaseHandler.tool_after_callback = _patched_after + + _orig_loop = agent_loop.agent_runner_loop + def _patched_loop(client, system_prompt, user_input, handler, tools_schema, *a, **kw): + try: cm = _lf.start_as_current_observation(name='agent.task', as_type='agent', input={'user_input': user_input}) + except Exception: cm = None + if cm is None: + ret = yield from _orig_loop(client, system_prompt, user_input, handler, tools_schema, *a, **kw); return ret + with cm as sp: + ret = yield from _orig_loop(client, system_prompt, user_input, handler, tools_schema, *a, **kw) + try: sp.update(output=ret) + except Exception: pass + try: _lf.flush() + except Exception: pass + return ret + agent_loop.agent_runner_loop = _patched_loop + for _m in list(sys.modules.values()): + if _m and getattr(_m, 'agent_runner_loop', None) is _orig_loop: + try: setattr(_m, 'agent_runner_loop', _patched_loop) + except Exception: pass diff --git a/llmcore.py b/llmcore.py index dfdc0d3..768d83b 100644 --- a/llmcore.py +++ b/llmcore.py @@ -939,3 +939,7 @@ class NativeToolClient: if resp: _write_llm_log('Response', resp.raw) if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls] return resp + +try: import langfuse_tracing # opt-in observability; noop if langfuse_config not set in mykey +except Exception: pass + diff --git a/mykey_template.py b/mykey_template.py index 1900694..a6688a6 100644 --- a/mykey_template.py +++ b/mykey_template.py @@ -403,3 +403,10 @@ native_oai_config = { # dingtalk_client_id = 'your_app_key' # dingtalk_client_secret = 'your_app_secret' # dingtalk_allowed_users = ['your_staff_id'] # 留空或 ['*'] 表示允许所有钉钉用户 + +# 可选:Langfuse 追踪。不设此项则不 import langfuse,零影响 +# langfuse_config = { +# 'public_key': 'pk-lf-...', +# 'secret_key': 'sk-lf-...', +# 'host': 'https://cloud.langfuse.com', # 或自托管地址 +# }