|

Build a Complete Langfuse Observability and Evaluation Pipeline for Tracing, Prompt Management, Scoring, and Experiments

✅

In this tutorial, we implement the Langfuse (an open-source LLM engineering platform) pipeline for tracing, immediate administration, scoring, datasets, and experiments. We construct a full workflow that works with both a actual OpenAI key or a deterministic mock LLM, so we are able to perceive each main Langfuse characteristic with out relying on paid mannequin entry. We begin by organising credentials and connecting to Langfuse. We hint easy operate calls, instrument a small RAG pipeline, handle prompts centrally, connect analysis scores, and run dataset-based experiments. Also, we see how Langfuse helps us observe, consider, and enhance LLM purposes in a structured and production-ready method.

import subprocess, sys
def pip_install(*pkgs):
   subprocess.run([sys.executable, "-m", "pip", "install", "-qU", *pkgs], verify=True)
pip_install("langfuse", "openai")
import os
from getpass import getpass
def _ask(var, immediate, secret=True, default=None):
   if os.environ.get(var):
       return os.environ[var]
   val = (getpass(immediate) if secret else enter(immediate)).strip()
   if not val and default isn't None:
       val = default
   os.environ[var] = val
   return val
print("Enter your Langfuse credentials (enter is hidden):")
_ask("LANGFUSE_PUBLIC_KEY", "  Langfuse PUBLIC key (pk-lf-...): ")
_ask("LANGFUSE_SECRET_KEY", "  Langfuse SECRET key (sk-lf-...): ")
area = (enter("  Region — EU (default) / US / or paste a self-hosted URL: ")
         .strip().decrease())
if area.startswith("http"):
   HOST = area
elif area in ("2", "us"):
   HOST = "https://us.cloud.langfuse.com"
else:
   HOST = "https://cloud.langfuse.com"
os.environ["LANGFUSE_HOST"] = HOST
OPENAI_API_KEY = (getpass("  OpenAI key (elective, press Enter to skip): ").strip())
if OPENAI_API_KEY:
   os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
USE_OPENAI = bool(OPENAI_API_KEY)
DEFAULT_MODEL = "gpt-4o-mini" if USE_OPENAI else "mock-llm-v1"
from langfuse import get_client, observe, propagate_attributes, Evaluation
langfuse = get_client()
assert langfuse.auth_check(), "Auth failed — double-check keys/area."
print(f"n✅ Connected to Langfuse at {HOST}")
print(f"   LLM backend: {'OpenAI (' + DEFAULT_MODEL + ')' if USE_OPENAI else 'built-in mock'}n")

We start by putting in the required Langfuse and OpenAI packages contained in the Colab setting. We then gather Langfuse credentials, select the right Langfuse area or self-hosted URL, and optionally settle for an OpenAI API key. We lastly initialize the Langfuse consumer, confirm authentication, and affirm whether or not we’re utilizing OpenAI or the built-in mock LLM.

if USE_OPENAI:
   from langfuse.openai import openai
_MOCK_FACTS = {
   "france": "Paris", "germany": "Berlin", "japan": "Tokyo",
   "italy": "Rome", "spain": "Madrid", "india": "New Delhi",
}
def _mock_answer(user_text: str) -> str:
   t = user_text.decrease()
   for nation, capital in _MOCK_FACTS.objects():
       if nation in t:
           return capital
   if "langfuse" in t:
       return ("Langfuse is an open-source LLM engineering platform for "
               "observability, immediate administration, analysis and datasets.")
   return "This is a mock response. Provide an OpenAI key for actual generations."
def llm_chat(messages, *, mannequin=DEFAULT_MODEL, temperature=0.3,
            identify=None, langfuse_prompt=None) -> str:
   """Return assistant textual content; the decision is traced as a Langfuse era."""
   if USE_OPENAI:
       kwargs = dict(mannequin=mannequin, messages=messages, temperature=temperature)
       if identify:            kwargs["name"] = identify
       if langfuse_prompt: kwargs["langfuse_prompt"] = langfuse_prompt
       resp = openai.chat.completions.create(**kwargs)
       return resp.selections[0].message.content material
   last_user = subsequent((m["content"] for m in reversed(messages)
                     if m["role"] == "consumer"), "")
   reply = _mock_answer(last_user)
   gen_kwargs = dict(as_type="era", identify=identify or "mock-llm",
                     mannequin=mannequin, enter=messages)
   if langfuse_prompt isn't None:
       gen_kwargs["prompt"] = langfuse_prompt
   with langfuse.start_as_current_observation(**gen_kwargs) as gen:
       gen.replace(output=reply,
                  usage_details={"input_tokens": 24, "output_tokens": 12})
   return reply
print("PART 1 ── Decorator tracing -------------------------------------------")
@observe()
def write_story(subject: str) -> str:
   return llm_chat(
       [{"role": "user", "content": f"Write a one-sentence story about {topic}."}],
       identify="story-generation",
   )
@observe()
def story_pipeline(subject: str) -> str:
   return write_story(subject)
print("  →", story_pipeline("a debugging robotic"))

We outline the LLM helper that helps each actual OpenAI generations and deterministic mock responses. We additionally make it possible for even the mock path creates a correct Langfuse era statement, so the tutorial stays absolutely traceable with out an OpenAI key. We then exhibit primary decorator-based tracing by wrapping a easy story-generation pipeline with @observe.

print("nPART 2 ── Manual RAG hint --------------------------------------------")
_KB = {
   "refund": "Refunds are processed inside 5–7 enterprise days to the unique methodology.",
   "guarantee": "All merchandise carry a 1-year restricted producer guarantee.",
}
@observe(identify="retrieve")
def retrieve(query: str):
   q = query.decrease()
   hits = [v for k, v in _KB.items() if k in q] or record(_KB.values())
   return hits[:2]
@observe(identify="rag-pipeline")
def rag_pipeline(query: str, user_id="user-42", session_id="sess-001") -> str:
   with propagate_attributes(user_id=user_id, session_id=session_id,
                             tags=["rag", "support-bot", "tutorial"]):
       context = "n".be part of(retrieve(query))
       return llm_chat(
           [{"role": "system",
             "content": "Answer the question using ONLY the provided context."},
            {"role": "user",
             "content": f"Context:n{context}nnQuestion: {question}"}],
           identify="rag-answer",
       )
rag_answer = rag_pipeline("How lengthy do refunds take?")
rag_trace_id = langfuse.get_current_trace_id()
print("  →", rag_answer)

We construct a small handbook RAG pipeline utilizing a easy in-memory information base for refunds, transport, and guarantee info. We hint the retrieval step individually and use propagate_attributes to connect consumer ID, session ID, and tags throughout the complete hint. We then run a refund-related query and seize the hint ID so we are able to connect scores to it later.

print("nPART 3 ── Prompt administration -------------------------------------------")
langfuse.create_prompt(
   identify="support-agent",
   kind="chat",
   immediate=[
       {"role": "system",
        "content": "You are a {{tone}} customer-support agent for {{company}}. "
                   "Be concise."},
       {"role": "user", "content": "{{question}}"},
   ],
   labels=["production"],
   config={"mannequin": DEFAULT_MODEL, "temperature": 0.2},
)
immediate = langfuse.get_prompt("support-agent", kind="chat")
compiled = immediate.compile(tone="pleasant", firm="Acme",
                         query="Do you supply categorical transport?")
print("  compiled immediate:", compiled)
@observe(identify="prompt-managed-call")
def answer_with_managed_prompt():
   return llm_chat(compiled, identify="support-reply", langfuse_prompt=immediate)
print("  →", answer_with_managed_prompt())
print("nPART 4 ── Scoring -----------------------------------------------------")
def keyword_overlap(reply: str, expected_keyword: str) -> float:
   return 1.0 if expected_keyword.decrease() in (reply or "").decrease() else 0.0
langfuse.create_score(
   identify="groundedness",
   worth=keyword_overlap(rag_answer, "5"),
   trace_id=rag_trace_id,
   data_type="NUMERIC",
   remark="Heuristic: mentions the documented refund window.",
)
langfuse.create_score(identify="user_feedback", worth="useful",
                     trace_id=rag_trace_id, data_type="CATEGORICAL")
langfuse.create_score(identify="resolved", worth=1,
                     trace_id=rag_trace_id, data_type="BOOLEAN")
@observe(identify="scored-call")
def scored_call():
   out = llm_chat([{"role": "user", "content": "What is the capital of Japan?"}],
                  identify="capital-q")
   with langfuse.start_as_current_observation(as_type="span", identify="grade") as span:
       span.rating(identify="appropriate", worth=keyword_overlap(out, "Tokyo"),
                  data_type="NUMERIC")
       span.score_trace(identify="trace_quality", worth=0.9, data_type="NUMERIC")
   return out
print("  →", scored_call(), "(scores hooked up)")

We create a managed Langfuse chat immediate, compile it with runtime variables, and hyperlink the immediate model to a traced era. We then add totally different rating varieties to the sooner RAG hint, together with numeric, categorical, and boolean scores. We additionally exhibit inline scoring by grading a capital-city reply inside the present noticed span and hint.

print("nPART 5 ── Datasets & experiments --------------------------------------")
DATASET = "capital-cities-tutorial"
langfuse.create_dataset(identify=DATASET, description="Capital-city QA benchmark")
_items = [
   ("What is the capital of France?",  "Paris"),
   ("What is the capital of Germany?", "Berlin"),
   ("What is the capital of Japan?",   "Tokyo"),
   ("What is the capital of Italy?",   "Rome"),
]
for i, (q, a) in enumerate(_items):
   langfuse.create_dataset_item(dataset_name=DATASET, id=f"cap-{i}",
                                enter={"query": q}, expected_output=a)
def capital_task(*, merchandise, **kwargs):
   query = merchandise.enter["question"] if isinstance(merchandise.enter, dict) else merchandise.enter
   return llm_chat([{"role": "user", "content": question}], identify="experiment-answer")
def accuracy(*, enter, output, expected_output, metadata=None, **kwargs):
   hit = bool(expected_output) and expected_output.decrease() in (output or "").decrease()
   return Evaluation(identify="accuracy", worth=1.0 if hit else 0.0,
                     remark="exact-match comprises verify")
def conciseness(*, enter, output, **kwargs):
   return Evaluation(identify="char_length", worth=float(len(output or "")))
def mean_accuracy(*, item_results, **kwargs):
   vals = [e.value for r in item_results for e in r.evaluations if e.name == "accuracy"]
   avg = sum(vals) / len(vals) if vals else 0.0
   return Evaluation(identify="mean_accuracy", worth=avg, remark=f"{avg:.0%} appropriate")
dataset = langfuse.get_dataset(DATASET)
outcome = dataset.run_experiment(
   identify="capitals-baseline",
   description="Baseline run from the Colab tutorial",
   activity=capital_task,
   evaluators=[accuracy, conciseness],
   run_evaluators=[mean_accuracy],
   max_concurrency=4,
)
print(outcome.format())

We create a Langfuse dataset for capital-city questions and add deterministic objects to make sure repeated runs stay idempotent. We outline a activity operate that solutions every merchandise, together with item-level evaluators for accuracy and response size. We then run an experiment on the dataset and print a formatted abstract of item-level and combination outcomes.

if USE_OPENAI:
   print("nPART 6 ── LangChain integration ---------------------------------------")
   pip_install("langchain-core", "langchain-openai")
   from langchain_openai import ChatOpenAI
   from langchain_core.prompts import ChatPromptTemplate
   from langfuse.langchain import CallbackHandler
   handler = CallbackHandler()
   chain = (ChatPromptTemplate.from_template("Explain {idea} in a single sentence.")
            | ChatOpenAI(mannequin="gpt-4o-mini", temperature=0))
   lc_out = chain.invoke({"idea": "observability"},
                         config={"callbacks": [handler]})
   print("  →", lc_out.content material)
else:
   print("nPART 6 ── LangChain integration skipped (no OpenAI key).")
langfuse.flush()
print("Open your mission at", HOST)
print("   • Tracing tab .... Parts 1–4 traces (incl. consumer/session/tags)")
print("   • Prompts tab .... the versioned 'support-agent' immediate")
print("   • Scores ......... groundedness / user_feedback / resolved / accuracy")
print("   • Datasets tab ... '%s' with the 'capitals-baseline' experiment run" % DATASET)

We optionally exhibit the LangChain integration when an OpenAI secret is accessible, utilizing the Langfuse callback handler to hint chain execution. If no OpenAI secret is offered, we skip this part whereas protecting the remainder of the tutorial absolutely practical. We lastly flush all buffered occasions to Langfuse and print the place to examine traces, prompts, scores, and dataset experiment outcomes.

In conclusion, we created a sensible end-to-end Langfuse workflow that covers a very powerful components of LLM observability and analysis. We realized easy methods to hint each computerized and handbook operations, hyperlink immediate variations to generations, rating outputs, and benchmark an utility utilizing datasets and experiments. We additionally saved the tutorial versatile by supporting each OpenAI-powered era and a mock LLM path, making it simpler to check the complete pipeline in any setting. Also, we gained an understanding of how Langfuse helps us monitor LLM habits, evaluate experiment runs, and construct extra dependable AI purposes.


Check out the Full Codes with Notebook hereAlso, be at liberty to observe us on Twitter and don’t neglect to hitch our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to associate with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us

The publish Build a Complete Langfuse Observability and Evaluation Pipeline for Tracing, Prompt Management, Scoring, and Experiments appeared first on MarkTechPost.

Similar Posts