feat: optional Langfuse tracing for agent execution (#115)
Self-activating langfuse tracing via monkey-patch: independent module, zero impact when langfuse_config unset. Captures LLM generation, tool calls, token usage from SSE streams. Co-authored-by: totoyang
This commit is contained in:
122
langfuse_tracing.py
Normal file
122
langfuse_tracing.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Opt-in Langfuse tracing. Self-activates on import if langfuse_config exists in mykey.
|
||||
|
||||
Hooks only via monkey-patch so core files stay untouched:
|
||||
- agent_loop.agent_runner_loop -> outer agent trace (parent of all below)
|
||||
- llmcore._write_llm_log -> generation span (Prompt=start, Response=end)
|
||||
- BaseHandler.tool_before/after -> tool span
|
||||
"""
|
||||
import threading, sys
|
||||
|
||||
try:
|
||||
from llmcore import _load_mykeys
|
||||
_cfg = _load_mykeys().get('langfuse_config')
|
||||
from langfuse import Langfuse
|
||||
_lf = Langfuse(**_cfg) if _cfg else None
|
||||
except Exception:
|
||||
_lf = None
|
||||
|
||||
if _lf:
|
||||
import llmcore, agent_loop
|
||||
_tls = threading.local()
|
||||
|
||||
_orig_log = llmcore._write_llm_log
|
||||
def _patched_log(label, content):
|
||||
try:
|
||||
if label == 'Prompt':
|
||||
_tls.gen = _lf.start_observation(name='llm.chat', as_type='generation', input=content[:20000])
|
||||
_tls.usage = None
|
||||
elif label == 'Response' and getattr(_tls, 'gen', None) is not None:
|
||||
_tls.gen.update(output=content[:20000], usage_details=getattr(_tls, 'usage', None))
|
||||
_tls.gen.end(); _tls.gen = None
|
||||
except Exception: pass
|
||||
return _orig_log(label, content)
|
||||
llmcore._write_llm_log = _patched_log
|
||||
|
||||
def _extract_usage(buf):
|
||||
u = {}
|
||||
import json as _j
|
||||
for line in buf:
|
||||
s = line.decode('utf-8', 'replace') if isinstance(line, (bytes, bytearray)) else line
|
||||
if not s or not s.startswith('data:'): continue
|
||||
ds = s[5:].lstrip()
|
||||
if ds == '[DONE]': continue
|
||||
try: evt = _j.loads(ds)
|
||||
except: continue
|
||||
if evt.get('type') == 'message_start':
|
||||
us = evt.get('message', {}).get('usage', {}) or {}
|
||||
u['input'] = us.get('input_tokens', u.get('input', 0))
|
||||
if us.get('cache_creation_input_tokens'): u['cache_creation_input_tokens'] = us['cache_creation_input_tokens']
|
||||
if us.get('cache_read_input_tokens'): u['cache_read_input_tokens'] = us['cache_read_input_tokens']
|
||||
elif evt.get('type') == 'message_delta':
|
||||
ot = (evt.get('usage') or {}).get('output_tokens')
|
||||
if ot: u['output'] = ot
|
||||
elif evt.get('type') == 'response.completed':
|
||||
us = evt.get('response', {}).get('usage', {}) or {}
|
||||
if us.get('input_tokens'): u['input'] = us['input_tokens']
|
||||
if us.get('output_tokens'): u['output'] = us['output_tokens']
|
||||
cr = (us.get('input_tokens_details') or {}).get('cached_tokens')
|
||||
if cr: u['cache_read_input_tokens'] = cr
|
||||
else:
|
||||
us = evt.get('usage')
|
||||
if us:
|
||||
if us.get('prompt_tokens'): u['input'] = us['prompt_tokens']
|
||||
if us.get('completion_tokens'): u['output'] = us['completion_tokens']
|
||||
cr = (us.get('prompt_tokens_details') or {}).get('cached_tokens')
|
||||
if cr: u['cache_read_input_tokens'] = cr
|
||||
return u or None
|
||||
|
||||
def _wrap_parser(orig):
|
||||
def wrapped(resp_lines, *a, **kw):
|
||||
buf = []
|
||||
def tee():
|
||||
for ln in resp_lines:
|
||||
buf.append(ln); yield ln
|
||||
ret = yield from orig(tee(), *a, **kw)
|
||||
try: _tls.usage = _extract_usage(buf)
|
||||
except Exception: pass
|
||||
return ret
|
||||
return wrapped
|
||||
llmcore._parse_claude_sse = _wrap_parser(llmcore._parse_claude_sse)
|
||||
llmcore._parse_openai_sse = _wrap_parser(llmcore._parse_openai_sse)
|
||||
|
||||
_orig_before = agent_loop.BaseHandler.tool_before_callback
|
||||
_orig_after = agent_loop.BaseHandler.tool_after_callback
|
||||
|
||||
def _patched_before(self, tool_name, args, response):
|
||||
try:
|
||||
if not hasattr(_tls, 'tstack'): _tls.tstack = []
|
||||
a = {k: v for k, v in args.items() if k != '_index'}
|
||||
_tls.tstack.append(_lf.start_observation(name=tool_name, as_type='tool', input=a))
|
||||
except Exception: pass
|
||||
return _orig_before(self, tool_name, args, response)
|
||||
|
||||
def _patched_after(self, tool_name, args, response, ret):
|
||||
try:
|
||||
if getattr(_tls, 'tstack', None):
|
||||
sp = _tls.tstack.pop()
|
||||
out = {'data': ret.data, 'next_prompt': ret.next_prompt, 'should_exit': ret.should_exit} if ret else None
|
||||
sp.update(output=out); sp.end()
|
||||
except Exception: pass
|
||||
return _orig_after(self, tool_name, args, response, ret)
|
||||
|
||||
agent_loop.BaseHandler.tool_before_callback = _patched_before
|
||||
agent_loop.BaseHandler.tool_after_callback = _patched_after
|
||||
|
||||
_orig_loop = agent_loop.agent_runner_loop
|
||||
def _patched_loop(client, system_prompt, user_input, handler, tools_schema, *a, **kw):
|
||||
try: cm = _lf.start_as_current_observation(name='agent.task', as_type='agent', input={'user_input': user_input})
|
||||
except Exception: cm = None
|
||||
if cm is None:
|
||||
ret = yield from _orig_loop(client, system_prompt, user_input, handler, tools_schema, *a, **kw); return ret
|
||||
with cm as sp:
|
||||
ret = yield from _orig_loop(client, system_prompt, user_input, handler, tools_schema, *a, **kw)
|
||||
try: sp.update(output=ret)
|
||||
except Exception: pass
|
||||
try: _lf.flush()
|
||||
except Exception: pass
|
||||
return ret
|
||||
agent_loop.agent_runner_loop = _patched_loop
|
||||
for _m in list(sys.modules.values()):
|
||||
if _m and getattr(_m, 'agent_runner_loop', None) is _orig_loop:
|
||||
try: setattr(_m, 'agent_runner_loop', _patched_loop)
|
||||
except Exception: pass
|
||||
Reference in New Issue
Block a user