Lesson 6 / 6

06. Choosing Models & Building Production AI Apps

TL;DR

There's no single best model — match model to task. Use small models for simple tasks, big models for complex reasoning. Cache aggressively. Add guardrails (input validation, output filtering). Evaluate with automated evals, not vibes. Monitor cost per request. Route between models based on complexity. Ship incrementally.

You have built prototypes. You have called APIs, written prompts, wired up RAG pipelines, and maybe orchestrated agents with LangChain. Now the question changes from “can I make this work?” to “can I ship this?” Production AI is a different game. You need to pick the right model, keep costs from spiraling, stop the model from saying something catastrophic, measure whether it is actually working, and do all of that while handling real traffic. This lesson is the playbook.

LLM model comparison matrix — quality, speed, cost, and best use cases

The Model Landscape

The LLM market moves fast, but the tiers stay remarkably stable. Understanding the tiers matters more than memorizing specific model names.

Frontier models — best quality, highest cost, slowest:

  • OpenAI GPT-4o, o1, o3
  • Anthropic Claude Opus
  • Google Gemini Ultra / 2.5 Pro

Mid-tier models — strong quality, reasonable cost, good speed:

  • OpenAI GPT-4o-mini
  • Anthropic Claude Sonnet
  • Google Gemini Flash
  • Meta Llama 3.3 70B (open-weight, self-hostable)

Small/fast models — limited reasoning, very cheap, very fast:

  • Anthropic Claude Haiku
  • Google Gemini Flash 8B
  • Meta Llama 3.2 8B (runs on a laptop)
  • Mistral 7B, Phi-3

Open-source / self-hosted — no API costs, full control, you handle infra:

  • Llama 3 family (Meta)
  • Mistral / Mixtral
  • Qwen 2.5 (Alibaba)
  • DeepSeek V3

The right model is rarely the biggest one. It is the cheapest model that meets your quality bar.

How to Choose a Model

Model selection is an engineering decision, not a brand loyalty exercise. Here is the decision framework:

Step 1: Define your task complexity.

# Simple classification, extraction, formatting
# → Small model (Haiku, GPT-4o-mini, Llama 8B)
SIMPLE_TASKS = [
    "Classify this support ticket as billing/technical/general",
    "Extract the date and amount from this invoice",
    "Reformat this JSON into a markdown table",
]

# Multi-step reasoning, nuanced generation, complex analysis
# → Frontier model (Opus, GPT-4o, Gemini Pro)
COMPLEX_TASKS = [
    "Analyze this contract for risks and suggest amendments",
    "Debug this distributed system trace and identify the root cause",
    "Write a migration plan for this database schema change",
]

Step 2: Run a benchmark on YOUR data. Public benchmarks (MMLU, HumanEval) tell you general capability. They do not tell you how a model performs on your specific prompts with your specific data.

import json
import time
from openai import OpenAI
from anthropic import Anthropic

# Build a test set from real examples
test_cases = [
    {
        "input": "Customer says: I was charged twice for order #4821",
        "expected_category": "billing",
        "expected_sentiment": "negative",
    },
    # ... 50-100 real examples with known-good answers
]

def benchmark_model(client, model, test_cases):
    results = []
    for case in test_cases:
        start = time.time()
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "Classify the ticket. Return JSON: {category, sentiment}"},
                {"role": "user", "content": case["input"]},
            ],
            temperature=0,
        )
        latency = time.time() - start
        output = json.loads(response.choices[0].message.content)
        results.append({
            "correct": output["category"] == case["expected_category"],
            "latency_ms": latency * 1000,
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens,
        })

    accuracy = sum(r["correct"] for r in results) / len(results)
    avg_latency = sum(r["latency_ms"] for r in results) / len(results)
    total_tokens = sum(r["input_tokens"] + r["output_tokens"] for r in results)
    print(f"Model: {model}")
    print(f"  Accuracy:    {accuracy:.1%}")
    print(f"  Avg latency: {avg_latency:.0f}ms")
    print(f"  Total tokens: {total_tokens}")
    return {"accuracy": accuracy, "avg_latency": avg_latency, "total_tokens": total_tokens}

Step 3: Calculate cost per request, not cost per token. A model that costs 10x more per token but needs 3x fewer tokens might be cheaper overall.

Model routing pattern — smart cost optimization by routing to small vs large models

Cost Optimization

LLM costs scale with traffic. A prototype that costs $5/day can become $5,000/day at scale. Here are the levers.

Prompt Caching

Most providers now support prompt caching — if the same system prompt prefix is sent repeatedly, you pay reduced rates for the cached portion.

from anthropic import Anthropic

client = Anthropic()

# Long system prompt — cached after first request
SYSTEM_PROMPT = """You are a customer support classifier for Acme Corp.
[... 2000 tokens of classification rules, examples, edge cases ...]
"""

def classify_ticket(ticket_text: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=200,
        system=[{
            "type": "text",
            "text": SYSTEM_PROMPT,
            "cache_control": {"type": "ephemeral"},  # Enable prompt caching
        }],
        messages=[{"role": "user", "content": ticket_text}],
    )
    # First call: full price. Subsequent calls: system prompt tokens at 90% discount
    return response.content[0].text

Model Routing

The single biggest cost saver. Route simple requests to cheap models and complex requests to expensive ones.

import re
from openai import OpenAI

client = OpenAI()

def estimate_complexity(user_input: str) -> str:
    """Cheap heuristic to classify task complexity."""
    word_count = len(user_input.split())
    has_code = bool(re.search(r"```|def |class |function ", user_input))
    has_analysis_words = bool(re.search(
        r"analyze|compare|explain why|trade-?offs|architect", user_input, re.I
    ))

    if word_count < 30 and not has_code and not has_analysis_words:
        return "simple"
    elif has_code or has_analysis_words or word_count > 200:
        return "complex"
    return "medium"

MODEL_MAP = {
    "simple": "gpt-4o-mini",       # $0.15 / 1M input tokens
    "medium": "gpt-4o-mini",       # Same model, still cheap enough
    "complex": "gpt-4o",           # $2.50 / 1M input tokens
}

def routed_completion(user_input: str, system_prompt: str) -> str:
    complexity = estimate_complexity(user_input)
    model = MODEL_MAP[complexity]
    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_input},
        ],
    )
    return response.choices[0].message.content

For more sophisticated routing, use a small classifier model to decide which model handles the request:

def smart_route(user_input: str) -> str:
    """Use a cheap model to decide which model should answer."""
    routing_response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"Rate this query's complexity as SIMPLE or COMPLEX. "
                       f"SIMPLE = factual lookup, formatting, classification. "
                       f"COMPLEX = reasoning, analysis, code generation, creative writing.\n\n"
                       f"Query: {user_input}\n\nRating:",
        }],
        max_tokens=10,
        temperature=0,
    )
    rating = routing_response.choices[0].message.content.strip().upper()
    return "gpt-4o" if "COMPLEX" in rating else "gpt-4o-mini"

Batching

If responses are not time-sensitive, batch requests for lower costs. OpenAI’s Batch API gives a 50% discount.

import json

# Prepare batch file (JSONL format)
requests = []
for i, ticket in enumerate(tickets):
    requests.append({
        "custom_id": f"ticket-{i}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4o-mini",
            "messages": [
                {"role": "system", "content": "Classify this ticket."},
                {"role": "user", "content": ticket},
            ],
            "temperature": 0,
        },
    })

with open("/tmp/batch_input.jsonl", "w") as f:
    for req in requests:
        f.write(json.dumps(req) + "\n")

# Submit batch — results arrive within 24 hours at 50% cost
batch_file = client.files.create(file=open("/tmp/batch_input.jsonl", "rb"), purpose="batch")
batch_job = client.batches.create(input_file_id=batch_file.id, endpoint="/v1/chat/completions", completion_window="24h")

Guardrails

An LLM in production without guardrails is a liability. You need defenses on both input and output.

Input Validation

import re

def validate_input(user_input: str) -> tuple[bool, str]:
    """Validate user input before sending to LLM."""
    # Length check — prevent token bombs
    if len(user_input) > 10_000:
        return False, "Input too long. Maximum 10,000 characters."

    # Basic prompt injection detection
    injection_patterns = [
        r"ignore (?:all )?(?:previous|above|prior) instructions",
        r"you are now",
        r"new system prompt",
        r"disregard (?:your|the) (?:rules|instructions|guidelines)",
        r"pretend you",
    ]
    for pattern in injection_patterns:
        if re.search(pattern, user_input, re.IGNORECASE):
            return False, "Input contains disallowed patterns."

    return True, "ok"

Output Filtering

def filter_output(llm_response: str, context: dict) -> str:
    """Post-process LLM output before returning to user."""
    # PII detection — crude but effective first pass
    import re

    # Mask SSNs
    cleaned = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN REDACTED]", llm_response)
    # Mask credit card numbers
    cleaned = re.sub(r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "[CC REDACTED]", cleaned)
    # Mask email addresses if not expected
    if not context.get("allow_emails"):
        cleaned = re.sub(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b", "[EMAIL REDACTED]", cleaned)

    return cleaned


def enforce_format(llm_response: str, expected_format: str = "json") -> dict | None:
    """Ensure LLM output matches expected format."""
    if expected_format == "json":
        # Strip markdown code fences if present
        text = llm_response.strip()
        if text.startswith("```"):
            text = text.split("\n", 1)[1].rsplit("```", 1)[0]
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            return None
    return llm_response

Content Moderation

For user-facing applications, run a moderation check before and after the LLM call:

def check_moderation(text: str) -> bool:
    """Returns True if content is safe."""
    response = client.moderations.create(
        model="omni-moderation-latest",
        input=text,
    )
    return not response.results[0].flagged

def safe_completion(user_input: str, system_prompt: str) -> str:
    # Check input
    if not check_moderation(user_input):
        return "I can't process that request."

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_input},
        ],
    )
    result = response.choices[0].message.content

    # Check output
    if not check_moderation(result):
        return "I'm unable to provide a response to that query."

    return result
AI evaluation pipeline — test datasets, grading methods, and metrics dashboard

Evaluation

“It seems to work” is not an evaluation strategy. You need automated evals that run on every prompt change, every model switch, and every code deploy.

Building an Eval Suite

import json
from dataclasses import dataclass

@dataclass
class EvalCase:
    input_text: str
    expected_output: str   # Or expected properties
    category: str          # Group evals by type
    grade_fn: str          # "exact_match", "contains", "llm_judge"

# Define your eval set
eval_suite = [
    EvalCase(
        input_text="What is our refund policy?",
        expected_output="30-day money-back guarantee",
        category="factual",
        grade_fn="contains",
    ),
    EvalCase(
        input_text="I hate your product, give me my money back NOW",
        expected_output="empathetic and professional tone",
        category="tone",
        grade_fn="llm_judge",
    ),
]

def grade_exact_match(actual: str, expected: str) -> bool:
    return actual.strip().lower() == expected.strip().lower()

def grade_contains(actual: str, expected: str) -> bool:
    return expected.lower() in actual.lower()

def grade_llm_judge(actual: str, expected_criteria: str) -> bool:
    """Use a strong model to judge the output."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"Judge whether this AI response meets the criteria.\n\n"
                       f"Criteria: {expected_criteria}\n\n"
                       f"Response: {actual}\n\n"
                       f"Return ONLY 'PASS' or 'FAIL'.",
        }],
        temperature=0,
        max_tokens=10,
    )
    return "PASS" in response.choices[0].message.content.upper()

GRADERS = {
    "exact_match": grade_exact_match,
    "contains": grade_contains,
    "llm_judge": grade_llm_judge,
}

def run_eval(system_prompt: str, model: str, suite: list[EvalCase]) -> dict:
    results = {"total": 0, "passed": 0, "by_category": {}}
    for case in suite:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": case.input_text},
            ],
            temperature=0,
        )
        actual = response.choices[0].message.content
        grader = GRADERS[case.grade_fn]
        passed = grader(actual, case.expected_output)

        results["total"] += 1
        results["passed"] += int(passed)
        cat = results["by_category"].setdefault(case.category, {"total": 0, "passed": 0})
        cat["total"] += 1
        cat["passed"] += int(passed)

    results["score"] = results["passed"] / results["total"] if results["total"] else 0
    return results

Run this in CI. Set a threshold (e.g., 90% pass rate). Block deploys that drop below it.

Observability

You cannot improve what you do not measure. Log every LLM interaction.

import time
import uuid
import logging
from dataclasses import dataclass, asdict

logger = logging.getLogger("llm_obs")

@dataclass
class LLMCall:
    request_id: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    cost_usd: float
    status: str          # "success", "error", "filtered"
    user_id: str | None

# Cost per 1M tokens (input, output) — update as pricing changes
PRICING = {
    "gpt-4o":       (2.50, 10.00),
    "gpt-4o-mini":  (0.15, 0.60),
    "claude-sonnet-4-20250514": (3.00, 15.00),
    "claude-haiku-4-20250414":   (0.25, 1.25),
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    input_rate, output_rate = PRICING.get(model, (0, 0))
    return (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000

def tracked_completion(model: str, messages: list, user_id: str = None, **kwargs) -> str:
    request_id = str(uuid.uuid4())
    start = time.time()
    try:
        response = client.chat.completions.create(model=model, messages=messages, **kwargs)
        latency_ms = (time.time() - start) * 1000
        usage = response.usage
        cost = calculate_cost(model, usage.prompt_tokens, usage.completion_tokens)

        call = LLMCall(
            request_id=request_id, model=model,
            prompt_tokens=usage.prompt_tokens,
            completion_tokens=usage.completion_tokens,
            latency_ms=latency_ms, cost_usd=cost,
            status="success", user_id=user_id,
        )
        logger.info("llm_call", extra=asdict(call))
        return response.choices[0].message.content

    except Exception as e:
        latency_ms = (time.time() - start) * 1000
        call = LLMCall(
            request_id=request_id, model=model,
            prompt_tokens=0, completion_tokens=0,
            latency_ms=latency_ms, cost_usd=0,
            status=f"error:{type(e).__name__}", user_id=user_id,
        )
        logger.error("llm_call_error", extra=asdict(call))
        raise

For production systems, consider dedicated observability tools like LangSmith (by LangChain) or Langfuse (open-source). They provide trace visualization, cost dashboards, prompt versioning, and eval integration out of the box.

Caching Strategies

LLM calls are slow and expensive. Caching identical or near-identical requests can cut costs dramatically.

Exact-Match Cache

import hashlib
import json
import redis

redis_client = redis.Redis(host="localhost", port=6379, db=0)
CACHE_TTL = 3600  # 1 hour

def cached_completion(model: str, messages: list, temperature: float = 0, **kwargs) -> str:
    # Only cache deterministic requests
    if temperature > 0:
        return tracked_completion(model, messages, **kwargs)

    # Build cache key from model + messages
    cache_key = hashlib.sha256(
        json.dumps({"model": model, "messages": messages}, sort_keys=True).encode()
    ).hexdigest()

    # Check cache
    cached = redis_client.get(f"llm:{cache_key}")
    if cached:
        return cached.decode("utf-8")

    # Cache miss — call LLM
    result = tracked_completion(model, messages, **kwargs)
    redis_client.setex(f"llm:{cache_key}", CACHE_TTL, result)
    return result

Semantic Cache

For cases where users ask the same question with different wording, use embedding similarity to find cache hits:

import numpy as np

def get_embedding(text: str) -> list[float]:
    response = client.embeddings.create(model="text-embedding-3-small", input=text)
    return response.data[0].embedding

def cosine_similarity(a: list[float], b: list[float]) -> float:
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.95):
        self.threshold = similarity_threshold
        self.entries: list[dict] = []  # In production, use a vector DB

    def get(self, query: str) -> str | None:
        query_embedding = get_embedding(query)
        for entry in self.entries:
            similarity = cosine_similarity(query_embedding, entry["embedding"])
            if similarity >= self.threshold:
                return entry["response"]
        return None

    def set(self, query: str, response: str):
        self.entries.append({
            "query": query,
            "embedding": get_embedding(query),
            "response": response,
        })

Set the similarity threshold high (0.95+). A false cache hit — returning the wrong answer because two different questions looked similar — is worse than a cache miss.

Error Handling in Production

LLM APIs fail. Rate limits hit. Models go down. Your application needs to handle all of it gracefully.

import time
import random
from openai import (
    APIError, RateLimitError, APIConnectionError, APITimeoutError
)

FALLBACK_CHAIN = ["gpt-4o", "gpt-4o-mini", "claude-sonnet-4-20250514"]

def resilient_completion(
    messages: list,
    model: str = "gpt-4o",
    max_retries: int = 3,
    use_fallbacks: bool = True,
) -> str:
    """LLM call with retries, exponential backoff, and model fallback."""
    models_to_try = [model]
    if use_fallbacks:
        models_to_try += [m for m in FALLBACK_CHAIN if m != model]

    last_error = None
    for current_model in models_to_try:
        for attempt in range(max_retries):
            try:
                # Pick client based on model name
                if "claude" in current_model:
                    return _call_anthropic(current_model, messages)
                else:
                    return _call_openai(current_model, messages)

            except RateLimitError as e:
                wait = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait)
                last_error = e

            except APITimeoutError as e:
                wait = 2 ** attempt
                time.sleep(wait)
                last_error = e

            except (APIError, APIConnectionError) as e:
                last_error = e
                break  # Don't retry server errors on same model — try fallback

    # All models and retries exhausted
    raise RuntimeError(f"All LLM calls failed. Last error: {last_error}")


def _call_openai(model: str, messages: list) -> str:
    response = client.chat.completions.create(model=model, messages=messages, timeout=30)
    return response.choices[0].message.content

def _call_anthropic(model: str, messages: list) -> str:
    anthropic_client = Anthropic()
    # Convert OpenAI message format to Anthropic format
    system = next((m["content"] for m in messages if m["role"] == "system"), "")
    user_msgs = [m for m in messages if m["role"] != "system"]
    response = anthropic_client.messages.create(
        model=model, max_tokens=1024, system=system, messages=user_msgs,
    )
    return response.content[0].text

Rate Limiting and Queue Management

When your AI feature goes viral (or gets hit by a bot), you need traffic management.

import asyncio
import time
from collections import deque

class TokenBucketRateLimiter:
    """Rate limiter that respects API provider token-per-minute limits."""

    def __init__(self, requests_per_minute: int):
        self.rpm = requests_per_minute
        self.tokens = requests_per_minute
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.rpm, self.tokens + elapsed * (self.rpm / 60))
            self.last_refill = now

            if self.tokens < 1:
                wait_time = (1 - self.tokens) / (self.rpm / 60)
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1


class PriorityRequestQueue:
    """Queue that processes high-priority requests first."""

    def __init__(self, rate_limiter: TokenBucketRateLimiter):
        self.high_priority: asyncio.Queue = asyncio.Queue()
        self.low_priority: asyncio.Queue = asyncio.Queue()
        self.rate_limiter = rate_limiter

    async def enqueue(self, request: dict, priority: str = "low"):
        queue = self.high_priority if priority == "high" else self.low_priority
        future = asyncio.get_event_loop().create_future()
        await queue.put((request, future))
        return await future  # Caller awaits the result

    async def process_loop(self):
        while True:
            # High priority first
            if not self.high_priority.empty():
                request, future = await self.high_priority.get()
            elif not self.low_priority.empty():
                request, future = await self.low_priority.get()
            else:
                await asyncio.sleep(0.1)
                continue

            await self.rate_limiter.acquire()
            try:
                result = await asyncio.to_thread(
                    tracked_completion, request["model"], request["messages"]
                )
                future.set_result(result)
            except Exception as e:
                future.set_exception(e)

Deployment Patterns

How you deploy your AI feature depends on latency requirements and scale.

Pattern 1: Synchronous API wrapper — simplest, works for low-latency tasks.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class CompletionRequest(BaseModel):
    prompt: str
    user_id: str

class CompletionResponse(BaseModel):
    result: str
    model_used: str

@app.post("/api/ai/classify", response_model=CompletionResponse)
async def classify(req: CompletionRequest):
    valid, msg = validate_input(req.prompt)
    if not valid:
        raise HTTPException(status_code=400, detail=msg)

    model = smart_route(req.prompt)
    result = cached_completion(
        model=model,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": req.prompt},
        ],
    )
    result = filter_output(result, {"allow_emails": False})
    return CompletionResponse(result=result, model_used=model)

Pattern 2: Async processing with webhooks — for tasks that take more than a few seconds.

import uuid
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import httpx

app = FastAPI()
job_store = {}  # In production, use Redis or a database

class AsyncRequest(BaseModel):
    prompt: str
    webhook_url: str
    user_id: str

@app.post("/api/ai/analyze")
async def analyze(req: AsyncRequest, background_tasks: BackgroundTasks):
    job_id = str(uuid.uuid4())
    job_store[job_id] = {"status": "processing"}
    background_tasks.add_task(process_and_notify, job_id, req)
    return {"job_id": job_id, "status": "processing"}

@app.get("/api/ai/jobs/{job_id}")
async def get_job(job_id: str):
    return job_store.get(job_id, {"status": "not_found"})

async def process_and_notify(job_id: str, req: AsyncRequest):
    try:
        result = resilient_completion(
            messages=[
                {"role": "system", "content": "Perform deep analysis."},
                {"role": "user", "content": req.prompt},
            ],
            model="gpt-4o",
        )
        job_store[job_id] = {"status": "complete", "result": result}
        async with httpx.AsyncClient() as http:
            await http.post(req.webhook_url, json={"job_id": job_id, "result": result})
    except Exception as e:
        job_store[job_id] = {"status": "failed", "error": str(e)}

Scaling considerations:

  • Horizontal scaling is straightforward — LLM calls are I/O-bound, not CPU-bound
  • Use connection pooling for your LLM client
  • Set aggressive timeouts (15-30s for most tasks)
  • Monitor queue depth as a scaling signal
  • Consider separate worker pools for different model tiers (cheap/fast vs expensive/slow)

Key Takeaways

  • There is no best model. Match the model to the task. Use the cheapest model that meets your quality bar. Benchmark on YOUR data, not public leaderboards.
  • Model routing is the highest-leverage cost optimization. Route simple tasks to cheap models, complex tasks to expensive ones. This alone can cut costs 60-80%.
  • Cache aggressively. Exact-match cache for deterministic requests, semantic cache for fuzzy matches. Set temperature to 0 for cacheable requests.
  • Guardrails are not optional. Validate inputs (length, injection patterns), filter outputs (PII, format enforcement), and run content moderation for user-facing apps.
  • Evaluate with automated evals, not vibes. Build a test suite from real examples. Run it in CI. Block deploys that drop below your quality threshold.
  • Log everything. Every LLM call should record model, tokens, latency, cost, and status. You cannot optimize what you do not measure.
  • Build for failure. Retries with exponential backoff, fallback model chains, and graceful degradation. LLM APIs will go down.
  • Ship incrementally. Start with a single model, a simple prompt, and exact-match caching. Add routing, evals, and guardrails as you learn where the problems are. Over-engineering on day one is a waste — the model landscape will change before you finish.