How to Deploy AI Agents to Production: Complete Guide (2026)
You've built an AI agent that works on your laptop. It handles tasks, calls tools, and gives great answers. Now comes the hard part: making it work reliably for real users, 24/7, without you watching.
Most AI agents die in the gap between "works locally" and "runs in production." The agent that responds in 2 seconds on your machine times out under load. The one that costs $0.50/day in testing costs $200/day with real traffic. The one that handles 10 requests perfectly crashes at 100.
This guide covers everything you need to deploy AI agents that actually stay running — from containerization to monitoring to the scaling patterns that separate toy projects from production systems.
The Production Readiness Checklist
Before you deploy anything, your agent needs to pass these gates. Skip any of them and you're shipping a time bomb.
Gate 1: Reliability
- Retry logic — Exponential backoff on API failures (LLM providers go down more than you think)
- Timeout handling — Hard timeouts on every external call (LLM, tools, APIs)
- Graceful degradation — What happens when your primary LLM is down? Fallback model? Cached responses?
- Idempotency — Can you safely retry any operation without side effects?
Gate 2: Cost Controls
- Token budgets — Max tokens per request, per user, per day
- Loop detection — Agents can get stuck in infinite tool-call loops (set max iterations)
- Model routing — Use cheap models for simple tasks, expensive ones only when needed
- Caching — Cache identical or near-identical requests
Gate 3: Security
- Input sanitization — Users will try prompt injection. Assume it.
- Tool permissions — Principle of least privilege. Read-only where possible.
- Secrets management — No API keys in code. Use vault or environment variables.
- Audit logging — Log every tool call, every decision, every output.
Containerization: Docker for AI Agents
Docker is the standard way to package AI agents for deployment. Here's a production-ready Dockerfile — not the tutorial version, the one that actually works.
# Production Dockerfile for AI Agent
FROM python:3.12-slim AS base
# Security: non-root user
RUN groupadd -r agent && useradd -r -g agent -d /app agent
WORKDIR /app
# Dependencies first (cache layer)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Application code
COPY --chown=agent:agent . .
# Health check endpoint
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
USER agent
# Don't use CMD with shell form — use exec form
ENTRYPOINT ["python", "-m", "agent.main"]
Key decisions in this Dockerfile:
- Non-root user — Your agent shouldn't run as root. If it gets compromised, damage is limited.
- Slim base image — 150MB vs 1.2GB. Faster deploys, smaller attack surface.
- Layer caching — Dependencies change less than code. Install them first.
- Health check — The orchestrator needs to know if your agent is alive.
Docker Compose for Local Testing
# docker-compose.yml
version: '3.8'
services:
agent:
build: .
ports:
- "8080:8080"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- LOG_LEVEL=info
- MAX_TOKENS_PER_REQUEST=4096
- MAX_TOOL_CALLS=10
- REDIS_URL=redis://redis:6379
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
deploy:
resources:
limits:
memory: 512M
cpus: '0.5'
redis:
image: redis:7-alpine
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
volumes:
- redis_data:/data
volumes:
redis_data:
Notice the resource limits. Without them, a runaway agent can consume your entire server. The memory limit prevents OOM situations; the CPU limit prevents one agent from starving others.
Cloud Deployment Options Compared
Where you deploy depends on your traffic pattern, budget, and team size. Here's an honest comparison:
| Platform | Best For | Cost (100 req/day) | Scaling | Complexity |
|---|---|---|---|---|
| Railway / Render | Solo devs, MVPs | $5-15/mo | Auto | Low |
| AWS ECS / Fargate | Teams, complex apps | $20-50/mo | Auto | High |
| Google Cloud Run | Event-driven agents | $3-10/mo | Auto (to zero) | Medium |
| Fly.io | Global low-latency | $5-15/mo | Auto | Low |
| VPS (Hetzner/DO) | Budget, full control | $4-10/mo | Manual | Medium |
| Kubernetes (EKS/GKE) | Enterprise, multi-agent | $50-200/mo | Auto | Very High |
Railway Deployment (5 Minutes)
# Install Railway CLI
npm install -g @railway/cli
# Login and initialize
railway login
railway init
# Set environment variables
railway variables set ANTHROPIC_API_KEY=sk-ant-...
railway variables set MAX_TOKENS_PER_REQUEST=4096
railway variables set LOG_LEVEL=info
# Deploy
railway up
That's it. Railway detects your Dockerfile, builds it, deploys it, gives you a URL, and handles SSL. For most agents doing under 10,000 requests per day, this is all you need.
Google Cloud Run (Scale to Zero)
If your agent is event-driven (webhook-triggered, scheduled tasks), Cloud Run is ideal because it scales to zero — you pay nothing when there's no traffic.
# Build and push to Google Container Registry
gcloud builds submit --tag gcr.io/PROJECT_ID/my-agent
# Deploy with concurrency and resource limits
gcloud run deploy my-agent \
--image gcr.io/PROJECT_ID/my-agent \
--platform managed \
--region us-central1 \
--memory 512Mi \
--cpu 1 \
--timeout 300 \
--concurrency 10 \
--min-instances 0 \
--max-instances 5 \
--set-env-vars "ANTHROPIC_API_KEY=sk-ant-..." \
--set-env-vars "MAX_TOOL_CALLS=10"
Critical setting: --timeout 300. AI agents often need 30-60 seconds for complex tasks. The default 60-second timeout will kill your long-running agent calls.
The Production Agent Architecture
A local agent is just a loop. A production agent is a system. Here's the architecture that handles real traffic:
┌─────────────────────────────────────────────┐
│ Load Balancer │
│ (CloudFlare / ALB) │
└──────────────────┬──────────────────────────┘
│
┌─────────┼─────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent 1 │ │ Agent 2 │ │ Agent 3 │
│ (Pod) │ │ (Pod) │ │ (Pod) │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
┌────┴───────────┴───────────┴────┐
│ Shared Services │
│ ┌───────┐ ┌───────┐ ┌────────┐ │
│ │ Redis │ │ DB │ │ Queue │ │
│ │(cache)│ │(state)│ │(tasks) │ │
│ └───────┘ └───────┘ └────────┘ │
└─────────────────────────────────┘
Each component has a specific job:
- Load Balancer — Distributes requests, handles SSL termination, DDoS protection
- Agent Pods — Stateless containers running your agent code (scale horizontally)
- Redis — Response caching, rate limiting counters, session state
- Database — Conversation history, user data, audit logs (Postgres recommended)
- Queue — Async task processing for long-running agent jobs (Redis/BullMQ or SQS)
Making Agents Stateless
The key to horizontal scaling is statelessness. Your agent container should be disposable — kill it, spin up a new one, no data lost.
# BAD: State in memory (dies with the container)
class Agent:
def __init__(self):
self.conversations = {} # Lost on restart!
# GOOD: State in external store
class Agent:
def __init__(self, redis_client, db_client):
self.redis = redis_client
self.db = db_client
async def get_conversation(self, session_id: str):
# Check cache first
cached = await self.redis.get(f"conv:{session_id}")
if cached:
return json.loads(cached)
# Fall back to database
return await self.db.get_conversation(session_id)
async def save_message(self, session_id: str, message: dict):
# Write to both
await self.db.append_message(session_id, message)
await self.redis.set(
f"conv:{session_id}",
json.dumps(await self.db.get_conversation(session_id)),
ex=3600 # 1 hour cache
)
🚀 Want the Complete Production Blueprint?
The AI Employee Playbook includes deployment templates, Docker configs, monitoring dashboards, and CI/CD pipelines — ready to copy-paste.
Get the Playbook — €29Monitoring: The Non-Negotiable Layer
If you're not monitoring your agent, you don't know if it's working. Simple as that. Here are the metrics that actually matter:
The 5 Essential Metrics
| Metric | Target | Red Flag | Why It Matters |
|---|---|---|---|
| Success Rate | > 95% | < 90% | Core reliability indicator |
| Latency (p95) | < 5s | > 15s | User experience |
| Cost per Request | < $0.05 | > $0.20 | Financial sustainability |
| Tool Error Rate | < 2% | > 5% | Integration health |
| Hallucination Rate | < 3% | > 8% | Output quality |
Production Monitoring Stack
# monitoring.py — Production agent monitoring
import time
import logging
from dataclasses import dataclass, field
from prometheus_client import Counter, Histogram, Gauge
# Prometheus metrics
REQUEST_COUNT = Counter(
'agent_requests_total',
'Total agent requests',
['status', 'model']
)
REQUEST_LATENCY = Histogram(
'agent_request_duration_seconds',
'Request latency',
buckets=[0.5, 1, 2, 5, 10, 30, 60]
)
ACTIVE_REQUESTS = Gauge(
'agent_active_requests',
'Currently processing requests'
)
TOKEN_USAGE = Counter(
'agent_tokens_total',
'Token usage',
['type', 'model'] # type: input/output
)
TOOL_CALLS = Counter(
'agent_tool_calls_total',
'Tool invocations',
['tool', 'status']
)
COST_TOTAL = Counter(
'agent_cost_dollars_total',
'Estimated cost in dollars',
['model']
)
class MonitoredAgent:
"""Wraps your agent with production monitoring."""
def __init__(self, agent, cost_tracker):
self.agent = agent
self.cost_tracker = cost_tracker
async def run(self, request):
ACTIVE_REQUESTS.inc()
start = time.time()
try:
result = await self.agent.run(request)
# Record success metrics
duration = time.time() - start
REQUEST_COUNT.labels(
status='success',
model=result.model_used
).inc()
REQUEST_LATENCY.observe(duration)
TOKEN_USAGE.labels(
type='input', model=result.model_used
).inc(result.input_tokens)
TOKEN_USAGE.labels(
type='output', model=result.model_used
).inc(result.output_tokens)
# Track cost
cost = self.cost_tracker.calculate(result)
COST_TOTAL.labels(model=result.model_used).inc(cost)
return result
except Exception as e:
REQUEST_COUNT.labels(
status='error',
model='unknown'
).inc()
logging.error(f"Agent error: {e}", exc_info=True)
raise
finally:
ACTIVE_REQUESTS.dec()
Alerting Rules
Metrics without alerts are just pretty dashboards. Set these up on day one:
# alerting-rules.yml (Prometheus/Grafana)
groups:
- name: agent_alerts
rules:
- alert: HighErrorRate
expr: |
rate(agent_requests_total{status="error"}[5m])
/ rate(agent_requests_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Agent error rate above 10%"
- alert: HighLatency
expr: |
histogram_quantile(0.95,
rate(agent_request_duration_seconds_bucket[5m])
) > 15
for: 5m
labels:
severity: warning
annotations:
summary: "Agent p95 latency above 15 seconds"
- alert: CostSpike
expr: |
rate(agent_cost_dollars_total[1h]) > 10
for: 15m
labels:
severity: critical
annotations:
summary: "Agent cost exceeding $10/hour"
- alert: HighToolErrorRate
expr: |
rate(agent_tool_calls_total{status="error"}[5m])
/ rate(agent_tool_calls_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "Tool error rate above 5%"
CI/CD: Deploying Agent Updates Safely
AI agents are different from normal software. A code change might make the agent behave completely differently — even dangerously. Your CI/CD pipeline needs extra safety gates.
# .github/workflows/deploy-agent.yml
name: Deploy Agent
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Unit tests (tools + utilities)
run: pytest tests/unit/ -v
- name: Prompt regression tests
run: pytest tests/prompts/ -v --timeout=60
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
- name: Integration tests (tool calls)
run: pytest tests/integration/ -v --timeout=120
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
- name: Eval suite (LLM-as-judge)
run: python evals/run_evals.py --threshold 0.85
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
deploy-staging:
needs: test
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: railway up --environment staging
- name: Smoke test staging
run: |
curl -f https://staging.myagent.com/health
python tests/smoke.py --url https://staging.myagent.com
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production # Requires manual approval
steps:
- name: Deploy with canary
run: |
# Deploy to 10% of traffic first
railway up --environment production
# Wait and check metrics
sleep 300
python scripts/check_canary.py --threshold 0.95
- name: Full rollout
run: railway up --environment production --scale 100%
The critical addition here is the eval suite. Before every deploy, you run your agent against a set of test cases and verify the output quality hasn't regressed. This catches issues that unit tests never will.
Scaling Patterns for AI Agents
Pattern 1: Request Queue with Workers
For agents that take 10-60 seconds per request, synchronous handling doesn't scale. Use a queue:
# queue_worker.py
import asyncio
from bullmq import Worker, Queue
queue = Queue('agent-tasks')
async def process_job(job):
"""Process an agent task from the queue."""
request = job.data
# Run agent with timeout
try:
result = await asyncio.wait_for(
agent.run(request),
timeout=120 # 2 minute max
)
# Store result for client polling
await redis.set(
f"result:{job.id}",
json.dumps(result),
ex=3600
)
return result
except asyncio.TimeoutError:
await redis.set(
f"result:{job.id}",
json.dumps({"error": "Agent timed out"}),
ex=3600
)
# API endpoint: submit task
@app.post("/agent/task")
async def submit_task(request: AgentRequest):
job = await queue.add('run-agent', request.dict())
return {"task_id": job.id, "status": "queued"}
# API endpoint: check result
@app.get("/agent/task/{task_id}")
async def get_result(task_id: str):
result = await redis.get(f"result:{task_id}")
if result:
return {"status": "complete", "result": json.loads(result)}
return {"status": "processing"}
Pattern 2: Model Routing for Cost Control
# model_router.py
class ModelRouter:
"""Route requests to appropriate model based on complexity."""
MODELS = {
"simple": {"model": "claude-3-5-haiku-20241022", "cost_per_1k": 0.001},
"medium": {"model": "claude-sonnet-4-20250514", "cost_per_1k": 0.003},
"complex": {"model": "claude-opus-4-20250514", "cost_per_1k": 0.015},
}
async def route(self, request: str) -> str:
"""Classify request complexity and pick model."""
# Simple heuristics first (free)
if len(request) < 100 and '?' in request:
return "simple"
# Use cheap model to classify if needed
classification = await classify_with_haiku(request)
if classification.tool_calls_needed == 0:
return "simple"
elif classification.tool_calls_needed <= 3:
return "medium"
else:
return "complex"
def get_model(self, tier: str) -> str:
return self.MODELS[tier]["model"]
This pattern alone can reduce your LLM costs by 40-60%. Most requests don't need your most expensive model.
Pattern 3: Circuit Breaker for External Services
# circuit_breaker.py
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject fast
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = 0
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen("Service unavailable")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# Usage: one breaker per external service
anthropic_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
slack_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
async def call_llm(messages):
return await anthropic_breaker.call(
client.messages.create,
model="claude-sonnet-4-20250514",
messages=messages
)
📋 Free: Production Deployment Checklist
Get our 47-point checklist for deploying AI agents. Covers security, monitoring, scaling, and cost control.
Download Free Checklist →Secrets and Configuration Management
Your agent needs API keys, database credentials, and config values. Here's the hierarchy that works:
# config.py — Production configuration pattern
import os
from pydantic_settings import BaseSettings
class AgentConfig(BaseSettings):
"""Type-safe configuration with validation."""
# LLM Settings
anthropic_api_key: str
default_model: str = "claude-sonnet-4-20250514"
max_tokens: int = 4096
max_tool_calls: int = 10
request_timeout: int = 120
# Cost Controls
max_cost_per_request: float = 0.50
max_daily_cost: float = 50.0
cost_alert_threshold: float = 0.80 # Alert at 80% of daily limit
# Retry Settings
max_retries: int = 3
retry_base_delay: float = 1.0
retry_max_delay: float = 30.0
# Monitoring
log_level: str = "info"
enable_tracing: bool = True
metrics_port: int = 9090
class Config:
env_prefix = "AGENT_" # AGENT_ANTHROPIC_API_KEY, etc.
config = AgentConfig()
Never hardcode secrets. Use environment variables for simple setups, or a secrets manager (AWS Secrets Manager, GCP Secret Manager, HashiCorp Vault) for teams.
Logging That Actually Helps
When your agent does something weird at 3 AM, your logs are the only thing that can tell you why. Structure them properly:
# logging_setup.py
import structlog
import logging
def setup_logging(level: str = "info"):
"""Configure structured JSON logging for production."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
# Usage in your agent
async def run_agent(request):
log = logger.bind(
request_id=request.id,
user_id=request.user_id,
session_id=request.session_id
)
log.info("agent.request.started", input_length=len(request.message))
for i, step in enumerate(agent_steps):
log.info("agent.step", step=i, tool=step.tool_name)
if step.tool_name:
log.info("agent.tool_call",
tool=step.tool_name,
args=sanitize(step.tool_args), # Don't log sensitive data!
duration_ms=step.duration_ms
)
log.info("agent.request.completed",
duration_ms=total_duration,
tokens_used=total_tokens,
tool_calls=num_tool_calls,
model=model_used,
cost=estimated_cost
)
Key logging rules:
- Always use structured logging (JSON) — grep and dashboards will thank you
- Include request_id in every log line — trace a single request through the system
- Log tool calls with duration — find slow integrations fast
- Sanitize sensitive data — don't log API keys, user PII, or full prompts in production
- Log costs — catch cost spikes before the bill arrives
Health Checks and Readiness Probes
# health.py
from fastapi import FastAPI
from datetime import datetime, timedelta
app = FastAPI()
@app.get("/health")
async def health():
"""Liveness probe — is the process running?"""
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
@app.get("/ready")
async def ready():
"""Readiness probe — can we serve requests?"""
checks = {}
# Check LLM provider
try:
await anthropic_client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=5,
messages=[{"role": "user", "content": "hi"}]
)
checks["llm"] = "ok"
except Exception as e:
checks["llm"] = f"error: {str(e)}"
# Check Redis
try:
await redis.ping()
checks["redis"] = "ok"
except Exception:
checks["redis"] = "error"
# Check Database
try:
await db.execute("SELECT 1")
checks["database"] = "ok"
except Exception:
checks["database"] = "error"
all_ok = all(v == "ok" for v in checks.values())
return {
"status": "ready" if all_ok else "degraded",
"checks": checks
}
Use /health for liveness (restart if dead) and /ready for readiness (don't send traffic if dependencies are down).
7 Production Mistakes That Kill AI Agents
Mistake 1: No Token Limits
What happens: A single user sends a 100K-token document. Your agent processes it with multiple tool calls. Bill: $15 for one request.
Fix: Hard limits on input tokens, output tokens, and total tool calls per request.
Mistake 2: Synchronous Everything
What happens: 10 users hit your agent simultaneously. Each takes 30 seconds. User #10 waits 5 minutes.
Fix: Async processing with a task queue. Return a task ID, let the client poll for results.
Mistake 3: No Fallback Model
What happens: Claude API has an outage. Your entire agent goes down.
Fix: Configure a fallback (e.g., Claude → GPT-4 → cached response). Degrade gracefully.
Mistake 4: Logging Full Prompts
What happens: Your system prompt (the crown jewels) ends up in log aggregators that 20 people have access to.
Fix: Log metadata (lengths, token counts) not content. Hash sensitive inputs if you need to correlate.
Mistake 5: No Circuit Breakers
What happens: A tool API goes down. Your agent retries 3 times per request × 100 requests = 300 failing API calls hammering the dead service.
Fix: Circuit breaker pattern. After N failures, stop trying for M seconds.
Mistake 6: Deploying Without Eval Baselines
What happens: You push a "small prompt change." Agent quality drops 20%. You don't notice for a week.
Fix: Run evals before every deploy. Set quality thresholds. Block deploys that regress.
Mistake 7: Trusting the Agent Unconditionally
What happens: Agent executes a tool call that deletes data, sends an email, or charges a customer — without human approval.
Fix: Confirmation gates for destructive actions. Read operations: auto-approve. Write operations: require confirmation (or at least logging + undo capability).
60-Minute Production Deploy Quickstart
Let's deploy a real agent to production in under an hour. We'll use Railway (simplest path) with proper monitoring.
1 Project Structure (5 min)
my-agent/
├── agent/
│ ├── __init__.py
│ ├── main.py # Entry point
│ ├── core.py # Agent logic
│ ├── tools.py # Tool definitions
│ ├── config.py # Pydantic settings
│ ├── monitoring.py # Prometheus metrics
│ └── health.py # Health endpoints
├── tests/
│ ├── test_tools.py
│ └── test_evals.py
├── Dockerfile
├── requirements.txt
└── railway.json
2 Core Agent with Safety (15 min)
# agent/core.py
import anthropic
import asyncio
from .config import config
from .monitoring import REQUEST_COUNT, REQUEST_LATENCY
client = anthropic.AsyncAnthropic(api_key=config.anthropic_api_key)
async def run_agent(message: str, session_id: str) -> dict:
"""Production agent with safety guardrails."""
# Guard: input length
if len(message) > 10000:
return {"error": "Input too long", "max_chars": 10000}
messages = [{"role": "user", "content": message}]
tool_calls = 0
total_tokens = 0
while tool_calls < config.max_tool_calls:
response = await asyncio.wait_for(
client.messages.create(
model=config.default_model,
max_tokens=config.max_tokens,
messages=messages,
tools=TOOLS,
),
timeout=config.request_timeout
)
total_tokens += response.usage.input_tokens + response.usage.output_tokens
# Cost guard
estimated_cost = total_tokens * 0.000003 # ~$3/1M tokens
if estimated_cost > config.max_cost_per_request:
return {"error": "Cost limit reached", "partial": messages[-1]}
if response.stop_reason == "end_turn":
return {
"response": response.content[0].text,
"tokens": total_tokens,
"tool_calls": tool_calls,
"cost": estimated_cost
}
# Process tool calls
for block in response.content:
if block.type == "tool_use":
tool_calls += 1
result = await execute_tool(block.name, block.input)
messages.append({"role": "assistant", "content": response.content})
messages.append({
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": block.id, "content": str(result)}]
})
return {"error": "Max tool calls reached", "tool_calls": tool_calls}
3 API Server (10 min)
# agent/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .core import run_agent
from .health import health_router
app = FastAPI(title="My AI Agent")
app.include_router(health_router)
class AgentRequest(BaseModel):
message: str
session_id: str = "default"
@app.post("/agent/run")
async def agent_endpoint(req: AgentRequest):
try:
result = await run_agent(req.message, req.session_id)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
except TimeoutError:
raise HTTPException(status_code=504, detail="Agent timed out")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
4 Deploy (10 min)
# requirements.txt
anthropic>=0.40.0
fastapi>=0.115.0
uvicorn>=0.32.0
pydantic-settings>=2.6.0
structlog>=24.4.0
prometheus-client>=0.21.0
# Dockerfile — same as earlier in this guide
# railway.json
{
"build": { "builder": "DOCKERFILE" },
"deploy": {
"startCommand": "uvicorn agent.main:app --host 0.0.0.0 --port $PORT",
"healthcheckPath": "/health",
"restartPolicyType": "ON_FAILURE"
}
}
# Deploy
railway login
railway init
railway variables set AGENT_ANTHROPIC_API_KEY=sk-ant-...
railway up
5 Verify (5 min)
# Health check
curl https://your-agent.up.railway.app/health
# Test request
curl -X POST https://your-agent.up.railway.app/agent/run \
-H "Content-Type: application/json" \
-d '{"message": "What is the weather in Amsterdam?", "session_id": "test-1"}'
# Check metrics
curl https://your-agent.up.railway.app/metrics
Congratulations — you have a production AI agent running. From here, add monitoring dashboards (Grafana), set up alerting, and implement the CI/CD pipeline from earlier in this guide.
What's Next
You've got your agent running in production. Here's the upgrade path:
- Week 1: Monitor baseline metrics. Identify your most expensive requests and slowest tools.
- Week 2: Add model routing. Route simple requests to Haiku, complex ones to Sonnet.
- Week 3: Set up the eval pipeline. Build a test set from real production requests.
- Week 4: Add caching. Identical requests should hit cache, not the LLM.
Production deployment isn't a one-time event — it's an ongoing practice. But with the patterns in this guide, you have a solid foundation that won't fall apart at 3 AM.
🚀 Ready to Ship Your AI Agent?
The AI Employee Playbook includes production Docker templates, CI/CD configs, monitoring dashboards, and the exact patterns used by teams running agents at scale.
Get the Playbook — €29