AI Agent API Integration: The Complete Guide (2026)
Your AI agent is only as powerful as the APIs it can call. Here's how to connect agents to any service — from Slack to Stripe — with production-grade patterns for auth, retries, and safety.
Why API Integration Is the #1 Agent Skill
Most AI agent tutorials stop at "call an LLM and get a response." But real-world agents need to do things: create Jira tickets, send Slack messages, process payments, update CRMs, query databases.
The difference between a demo agent and a production agent is almost entirely about API integration quality. Here's what that looks like:
- Demo agent: "I'd recommend creating a ticket for this bug" (text output)
- Production agent: Creates the Jira ticket, assigns it, links the PR, and notifies the team on Slack (actions)
Every tool your agent can call is a multiplier on its capabilities. Five well-integrated APIs turn a mediocre agent into something that genuinely replaces manual work.
The 3-Layer Integration Architecture
Don't just slap API calls into your agent. Use this architecture to keep things maintainable:
Layer 1: Tool Definition (What the LLM sees)
This is the schema you pass to the model. Keep it clean, descriptive, and constrained:
{
"name": "create_jira_ticket",
"description": "Create a new Jira ticket in the specified project. Use when a bug is reported or a task needs tracking.",
"parameters": {
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Jira project key (e.g., 'ENG', 'PROD')",
"enum": ["ENG", "PROD", "DESIGN", "OPS"]
},
"title": {
"type": "string",
"description": "Brief ticket title (max 100 chars)"
},
"description": {
"type": "string",
"description": "Detailed description with context"
},
"priority": {
"type": "string",
"enum": ["critical", "high", "medium", "low"],
"default": "medium"
},
"assignee": {
"type": "string",
"description": "Email of the assignee (optional)"
}
},
"required": ["project", "title", "description"]
}
}
description field is 10x more important than the parameter names. This is what the LLM reads to decide when and how to use the tool. Be specific about when to use it, not just what it does.
Layer 2: Integration Logic (Your middleware)
This is where auth, validation, rate limiting, and retries live. The LLM never sees this layer:
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class JiraIntegration:
def __init__(self, base_url: str, email: str, api_token: str):
self.client = httpx.AsyncClient(
base_url=base_url,
auth=(email, api_token),
headers={"Content-Type": "application/json"},
timeout=30.0
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def create_ticket(self, project: str, title: str,
description: str, priority: str = "medium",
assignee: str = None) -> dict:
# Validate inputs before hitting the API
if len(title) > 100:
title = title[:97] + "..."
priority_map = {
"critical": "1", "high": "2",
"medium": "3", "low": "4"
}
payload = {
"fields": {
"project": {"key": project},
"summary": title,
"description": {
"type": "doc",
"version": 1,
"content": [{"type": "paragraph",
"content": [{"type": "text", "text": description}]}]
},
"priority": {"id": priority_map.get(priority, "3")},
"issuetype": {"name": "Task"}
}
}
if assignee:
payload["fields"]["assignee"] = {"emailAddress": assignee}
response = await self.client.post("/rest/api/3/issue", json=payload)
response.raise_for_status()
data = response.json()
return {
"ticket_id": data["key"],
"url": f"{self.client.base_url}/browse/{data['key']}",
"status": "created"
}
Layer 3: Tool Router (Connects LLM to integrations)
class ToolRouter:
def __init__(self):
self.tools = {}
self.integrations = {}
def register(self, name: str, handler, schema: dict):
self.tools[name] = {"handler": handler, "schema": schema}
async def execute(self, tool_name: str, arguments: dict) -> str:
if tool_name not in self.tools:
return f"Error: Unknown tool '{tool_name}'"
try:
result = await self.tools[tool_name]["handler"](**arguments)
return json.dumps(result, indent=2)
except httpx.HTTPStatusError as e:
return json.dumps({
"error": f"API returned {e.response.status_code}",
"message": e.response.text[:200]
})
except Exception as e:
return json.dumps({"error": str(e)})
# Usage
router = ToolRouter()
jira = JiraIntegration(JIRA_URL, JIRA_EMAIL, JIRA_TOKEN)
router.register("create_jira_ticket", jira.create_ticket, jira_schema)
Function Calling: How It Actually Works
Every major LLM provider now supports function calling (tool use). Here's how the flow works under the hood:
- You send the tool schemas alongside the user message to the LLM
- The LLM decides whether to call a tool (and which one) based on the conversation
- You receive a tool call with the function name and JSON arguments
- You execute the function against the real API
- You send the result back to the LLM as a tool response
- The LLM generates a final response incorporating the result
Claude (Anthropic) — Tool Use
import anthropic
client = anthropic.Anthropic()
tools = [
{
"name": "search_crm",
"description": "Search the CRM for customer records by name, email, or company. Returns matching contacts with recent activity.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search term"},
"field": {"type": "string", "enum": ["name", "email", "company"]},
"limit": {"type": "integer", "default": 5}
},
"required": ["query"]
}
}
]
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "Find the account details for Acme Corp"}]
)
# Handle tool calls in the response
for block in response.content:
if block.type == "tool_use":
# Execute the actual API call
result = await crm.search(
query=block.input["query"],
field=block.input.get("field", "name"),
limit=block.input.get("limit", 5)
)
# Send result back to Claude
followup = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "Find the account details for Acme Corp"},
{"role": "assistant", "content": response.content},
{"role": "user", "content": [
{"type": "tool_result", "tool_use_id": block.id,
"content": json.dumps(result)}
]}
]
)
OpenAI — Function Calling
from openai import OpenAI
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "search_crm",
"description": "Search CRM for customer records",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"field": {"type": "string", "enum": ["name", "email", "company"]}
},
"required": ["query"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Find Acme Corp"}],
tools=tools
)
# Process tool calls
if response.choices[0].message.tool_calls:
for call in response.choices[0].message.tool_calls:
args = json.loads(call.function.arguments)
result = await crm.search(**args)
# Send back as tool message
messages.append({"role": "tool", "tool_call_id": call.id,
"content": json.dumps(result)})
🚀 Want 50+ Pre-Built Agent Prompts?
The AI Employee Playbook includes production-ready system prompts with tool integrations for every department.
Get the Playbook — €29The 10 Most Valuable API Integrations
Not all integrations are equal. Here are the ones that deliver the most value per hour invested:
| Integration | Use Case | Difficulty | ROI |
|---|---|---|---|
| Slack / Teams | Notifications, reports, approvals | Easy | 🔥🔥🔥 |
| Google Calendar | Scheduling, availability, reminders | Medium | 🔥🔥🔥 |
| Gmail / SMTP | Email drafts, sends, triage | Medium | 🔥🔥🔥 |
| Jira / Linear | Ticket creation, status updates | Easy | 🔥🔥 |
| Stripe / Payments | Invoice creation, payment status | Medium | 🔥🔥🔥 |
| HubSpot / CRM | Contact management, deal tracking | Medium | 🔥🔥🔥 |
| GitHub | PR reviews, issue management, deploys | Easy | 🔥🔥 |
| Notion / Docs | Knowledge base, documentation | Easy | 🔥🔥 |
| Database (SQL) | Read/write business data | Hard | 🔥🔥🔥 |
| Web Search | Real-time research, fact-checking | Easy | 🔥🔥 |
Authentication Patterns That Don't Break
Auth is where 90% of API integration headaches live. Here are the patterns that actually work in production:
Pattern 1: API Key (Simple & Common)
class APIKeyAuth:
"""For services like Stripe, SendGrid, Brave Search."""
def __init__(self, api_key: str, header_name: str = "Authorization",
prefix: str = "Bearer"):
self.headers = {header_name: f"{prefix} {api_key}"}
def apply(self, request_headers: dict) -> dict:
return {**request_headers, **self.headers}
# Usage
stripe_auth = APIKeyAuth(os.environ["STRIPE_KEY"])
sendgrid_auth = APIKeyAuth(os.environ["SENDGRID_KEY"])
Pattern 2: OAuth2 with Auto-Refresh
class OAuth2Auth:
"""For Google, Microsoft, HubSpot, Salesforce."""
def __init__(self, client_id, client_secret, refresh_token, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self.token_url = token_url
self.access_token = None
self.expires_at = 0
async def get_token(self) -> str:
if time.time() < self.expires_at - 60: # 60s buffer
return self.access_token
async with httpx.AsyncClient() as client:
response = await client.post(self.token_url, data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
})
data = response.json()
self.access_token = data["access_token"]
self.expires_at = time.time() + data.get("expires_in", 3600)
return self.access_token
async def apply(self, headers: dict) -> dict:
token = await self.get_token()
return {**headers, "Authorization": f"Bearer {token}"}
# Usage
google_auth = OAuth2Auth(
client_id=os.environ["GOOGLE_CLIENT_ID"],
client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
refresh_token=os.environ["GOOGLE_REFRESH_TOKEN"],
token_url="https://oauth2.googleapis.com/token"
)
Pattern 3: Credential Vault
class CredentialVault:
"""Centralized credential management for all integrations."""
def __init__(self, encryption_key: str):
self._credentials = {}
self._fernet = Fernet(encryption_key)
def store(self, service: str, credentials: dict):
encrypted = self._fernet.encrypt(json.dumps(credentials).encode())
self._credentials[service] = encrypted
def get(self, service: str) -> dict:
encrypted = self._credentials.get(service)
if not encrypted:
raise ValueError(f"No credentials for {service}")
return json.loads(self._fernet.decrypt(encrypted))
def get_auth(self, service: str) -> APIKeyAuth | OAuth2Auth:
creds = self.get(service)
if creds.get("type") == "oauth2":
return OAuth2Auth(**creds["config"])
return APIKeyAuth(creds["api_key"])
vault = CredentialVault(os.environ["VAULT_KEY"])
vault.store("stripe", {"type": "api_key", "api_key": os.environ["STRIPE_KEY"]})
vault.store("google", {"type": "oauth2", "config": {...}})
Error Handling That Keeps Agents Running
API calls fail. A lot. Your agent needs to handle every failure mode gracefully and communicate results back to the LLM in a way it can understand:
class ResilientAPIClient:
"""Wraps any API call with retry, timeout, and error translation."""
def __init__(self, auth, base_url: str, rate_limit: int = 100):
self.auth = auth
self.base_url = base_url
self.semaphore = asyncio.Semaphore(rate_limit)
self.client = httpx.AsyncClient(base_url=base_url, timeout=30.0)
async def call(self, method: str, path: str, **kwargs) -> dict:
async with self.semaphore:
for attempt in range(3):
try:
headers = await self.auth.apply(kwargs.pop("headers", {}))
response = await self.client.request(
method, path, headers=headers, **kwargs
)
if response.status_code == 429: # Rate limited
retry_after = int(response.headers.get("Retry-After", 5))
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.TimeoutException:
if attempt == 2:
return {"success": False, "error": "API timed out after 3 attempts"}
await asyncio.sleep(2 ** attempt)
except httpx.HTTPStatusError as e:
error_map = {
400: "Bad request — check your parameters",
401: "Authentication failed — token may be expired",
403: "Permission denied — insufficient API scope",
404: "Resource not found",
500: "Server error — try again later",
}
return {
"success": False,
"error": error_map.get(e.response.status_code,
f"HTTP {e.response.status_code}"),
"details": e.response.text[:200]
}
The key insight: return structured error messages that help the LLM self-correct. Don't just crash — tell the model what went wrong so it can try a different approach or inform the user.
5 Production Patterns You Need
Pattern 1: Confirmation Before Destructive Actions
DESTRUCTIVE_TOOLS = {"delete_customer", "send_email", "process_payment",
"deploy_production", "cancel_subscription"}
async def execute_with_confirmation(tool_name: str, args: dict) -> str:
if tool_name in DESTRUCTIVE_TOOLS:
return json.dumps({
"status": "confirmation_required",
"action": tool_name,
"details": args,
"message": f"This will {tool_name.replace('_', ' ')}. "
f"Please confirm by saying 'yes, proceed'."
})
return await router.execute(tool_name, args)
Pattern 2: Read-Before-Write
Always fetch current state before modifying. This prevents the agent from overwriting data based on stale context:
async def update_crm_contact(contact_id: str, updates: dict) -> dict:
# Step 1: Read current state
current = await crm.get_contact(contact_id)
# Step 2: Merge (don't overwrite)
merged = {**current, **updates}
# Step 3: Diff check
changes = {k: v for k, v in updates.items() if current.get(k) != v}
if not changes:
return {"status": "no_changes", "message": "Contact already up to date"}
# Step 4: Write with version check (optimistic locking)
result = await crm.update_contact(
contact_id, changes,
expected_version=current["version"]
)
return {"status": "updated", "changes": changes}
Pattern 3: Batch Operations
async def batch_create_tickets(tickets: list[dict]) -> dict:
"""Create multiple tickets efficiently."""
results = []
errors = []
# Use asyncio.gather for parallel execution (respect rate limits)
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def create_one(ticket):
async with semaphore:
try:
result = await jira.create_ticket(**ticket)
results.append(result)
except Exception as e:
errors.append({"ticket": ticket["title"], "error": str(e)})
await asyncio.gather(*[create_one(t) for t in tickets])
return {
"created": len(results),
"failed": len(errors),
"tickets": results,
"errors": errors
}
Pattern 4: Webhook Receivers (Two-Way Integration)
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
payload = await request.body()
sig = request.headers.get("stripe-signature")
# Verify webhook signature
event = stripe.Webhook.construct_event(payload, sig, WEBHOOK_SECRET)
if event["type"] == "payment_intent.succeeded":
# Trigger agent action
await agent.process_event({
"type": "payment_received",
"customer": event["data"]["object"]["customer"],
"amount": event["data"]["object"]["amount"] / 100,
"currency": event["data"]["object"]["currency"]
})
return {"status": "ok"}
@app.post("/webhooks/github")
async def github_webhook(request: Request):
event_type = request.headers.get("X-GitHub-Event")
payload = await request.json()
if event_type == "pull_request" and payload["action"] == "opened":
await agent.process_event({
"type": "pr_opened",
"repo": payload["repository"]["full_name"],
"title": payload["pull_request"]["title"],
"author": payload["pull_request"]["user"]["login"],
"url": payload["pull_request"]["html_url"]
})
Pattern 5: Result Caching
from functools import lru_cache
import hashlib
class CachedIntegration:
def __init__(self, integration, ttl_seconds: int = 300):
self.integration = integration
self.cache = {}
self.ttl = ttl_seconds
async def call(self, method: str, **kwargs) -> dict:
# Only cache GET/read operations
if method in ("search", "get", "list", "fetch"):
cache_key = hashlib.md5(
f"{method}:{json.dumps(kwargs, sort_keys=True)}".encode()
).hexdigest()
cached = self.cache.get(cache_key)
if cached and time.time() - cached["time"] < self.ttl:
return {**cached["data"], "_cached": True}
result = await getattr(self.integration, method)(**kwargs)
self.cache[cache_key] = {"data": result, "time": time.time()}
return result
# Write operations bypass cache and invalidate
result = await getattr(self.integration, method)(**kwargs)
self.cache.clear() # Simple invalidation
return result
MCP: The Universal Integration Protocol
If you're building agents that need to talk to many services, look at the Model Context Protocol (MCP). It standardizes how agents discover and use tools:
// MCP server exposing CRM tools
const server = new McpServer({
name: "crm-integration",
version: "1.0.0"
});
server.tool(
"search_contacts",
"Search CRM contacts by name, email, or company",
{
query: z.string().describe("Search term"),
field: z.enum(["name", "email", "company"]).optional(),
limit: z.number().default(10)
},
async ({ query, field, limit }) => {
const results = await crm.search(query, field, limit);
return {
content: [{
type: "text",
text: JSON.stringify(results, null, 2)
}]
};
}
);
server.tool(
"create_deal",
"Create a new deal/opportunity in the CRM pipeline",
{
contact_id: z.string(),
deal_name: z.string(),
value: z.number(),
stage: z.enum(["lead", "qualified", "proposal", "negotiation", "closed"])
},
async ({ contact_id, deal_name, value, stage }) => {
const deal = await crm.createDeal({ contact_id, deal_name, value, stage });
return {
content: [{ type: "text", text: `Deal created: ${deal.id} — ${deal.url}` }]
};
}
);
MCP gives you one protocol for all integrations instead of building custom adapters for every LLM provider. Write once, use with Claude, GPT, Gemini, or any MCP-compatible client.
📦 50+ Production Agent Templates
Complete system prompts with tool definitions, integration patterns, and deployment configs for every business function.
Get the Playbook — €29Real-World Integration: Customer Support Agent
Let's build a complete customer support agent that integrates with 4 APIs. This is what production actually looks like:
import anthropic
import json
from datetime import datetime
# Initialize integrations
zendesk = ZendeskIntegration(ZENDESK_URL, ZENDESK_TOKEN)
stripe_client = StripeIntegration(STRIPE_KEY)
slack = SlackIntegration(SLACK_TOKEN)
knowledge_base = RAGPipeline(PINECONE_KEY, OPENAI_KEY)
TOOLS = [
{
"name": "search_knowledge_base",
"description": "Search internal documentation and help articles. Use FIRST before escalating.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Natural language search query"}
},
"required": ["query"]
}
},
{
"name": "get_customer_info",
"description": "Look up customer account details including subscription, payment history, and previous tickets.",
"input_schema": {
"type": "object",
"properties": {
"email": {"type": "string", "description": "Customer email address"}
},
"required": ["email"]
}
},
{
"name": "create_ticket",
"description": "Escalate to human support. Only use after attempting to resolve via knowledge base.",
"input_schema": {
"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"priority": {"type": "string", "enum": ["low", "normal", "high", "urgent"]},
"customer_email": {"type": "string"}
},
"required": ["subject", "description", "customer_email"]
}
},
{
"name": "issue_refund",
"description": "Process a refund for a specific payment. Requires confirmation. Max $500 auto-approved.",
"input_schema": {
"type": "object",
"properties": {
"payment_id": {"type": "string"},
"amount_cents": {"type": "integer"},
"reason": {"type": "string"}
},
"required": ["payment_id", "reason"]
}
},
{
"name": "notify_team",
"description": "Send a notification to the support team Slack channel. Use for urgent issues or patterns.",
"input_schema": {
"type": "object",
"properties": {
"message": {"type": "string"},
"channel": {"type": "string", "default": "#support-alerts"},
"urgency": {"type": "string", "enum": ["info", "warning", "critical"]}
},
"required": ["message"]
}
}
]
SYSTEM_PROMPT = """You are a Tier 1 support agent for SaaS Corp.
## Rules
1. ALWAYS search the knowledge base first
2. Look up customer info to personalize responses
3. Only escalate (create ticket) after attempting resolution
4. Refunds up to $500 — auto-approved. Over $500 — escalate
5. Notify the team on Slack for: outage reports, security issues, payment failures affecting multiple users
6. Be empathetic but efficient. No fluff.
## Tone
Professional, warm, concise. First-name basis. Acknowledge frustration before solving.
"""
async def handle_support_request(customer_message: str, customer_email: str):
messages = [
{"role": "user", "content": f"[Customer: {customer_email}]\n{customer_message}"}
]
while True:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages
)
# Check if we're done (no more tool calls)
if response.stop_reason == "end_turn":
return extract_text(response.content)
# Process tool calls
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = await execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
Testing API Integrations
Don't test in production. Here's a testing setup that catches integration bugs before they hit real APIs:
import pytest
from unittest.mock import AsyncMock
@pytest.fixture
def mock_router():
router = ToolRouter()
# Mock Jira
mock_jira = AsyncMock()
mock_jira.create_ticket.return_value = {
"ticket_id": "ENG-123",
"url": "https://jira.example.com/ENG-123",
"status": "created"
}
router.register("create_jira_ticket", mock_jira.create_ticket, {})
# Mock CRM
mock_crm = AsyncMock()
mock_crm.search.return_value = [
{"name": "Acme Corp", "email": "billing@acme.com", "plan": "enterprise"}
]
router.register("search_crm", mock_crm.search, {})
return router
@pytest.mark.asyncio
async def test_agent_uses_correct_tool(mock_router):
"""Verify the agent picks the right tool for the task."""
result = await mock_router.execute("search_crm", {"query": "Acme"})
assert "Acme Corp" in result
@pytest.mark.asyncio
async def test_error_handling():
"""Verify graceful handling of API failures."""
client = ResilientAPIClient(auth=mock_auth, base_url="https://api.example.com")
# Simulate 500 error
with respx.mock:
respx.get("https://api.example.com/users").respond(500)
result = await client.call("GET", "/users")
assert result["success"] is False
assert "Server error" in result["error"]
@pytest.mark.asyncio
async def test_rate_limit_handling():
"""Verify 429 retry logic."""
with respx.mock:
route = respx.get("https://api.example.com/data")
route.side_effect = [
httpx.Response(429, headers={"Retry-After": "1"}),
httpx.Response(200, json={"data": "success"})
]
result = await client.call("GET", "/data")
assert result["success"] is True
Common Mistakes (And How to Fix Them)
- Giving the agent write access on day one. Start read-only. Let the agent search, fetch, and analyze before you let it create, update, or delete. Trust is earned.
- Vague tool descriptions. "Manages calendar" tells the LLM nothing. "Creates a new Google Calendar event with title, time, duration, and optional attendees. Use when the user wants to schedule a meeting" is what works.
- Not handling partial failures. If your agent calls 3 APIs and the second one fails, do you roll back the first? You need to decide — and implement — this upfront.
- Exposing internal IDs to users. Return human-readable responses. "Ticket ENG-423 created — I've assigned it to Sarah" beats "Created: {id: 10423, assignee_id: 8821}".
- Ignoring rate limits. Most free API tiers have tight limits. One overeager agent can burn through your monthly quota in an hour. Always implement throttling.
- No audit trail. Log every tool call with timestamp, inputs, outputs, and latency. When something goes wrong (and it will), you need to replay exactly what happened.
- Too many tools at once. Start with 3-5 tools. LLMs get worse at tool selection as the number of options grows. Add incrementally.
60-Minute Quickstart
Get a working agent with 3 API integrations in one hour:
1 Minutes 0-10: Set up project
mkdir my-agent && cd my-agent
python -m venv venv && source venv/bin/activate
pip install anthropic httpx python-dotenv
# .env
ANTHROPIC_API_KEY=sk-ant-...
SLACK_TOKEN=xoxb-...
GITHUB_TOKEN=ghp_...
2 Minutes 10-25: Build the tool layer
# tools.py
import httpx, os
async def search_web(query: str) -> dict:
"""Search the web using Brave Search API."""
async with httpx.AsyncClient() as client:
r = await client.get("https://api.search.brave.com/res/v1/web/search",
params={"q": query, "count": 5},
headers={"X-Subscription-Token": os.environ["BRAVE_KEY"]})
results = r.json().get("web", {}).get("results", [])
return [{"title": r["title"], "url": r["url"],
"snippet": r.get("description", "")} for r in results[:5]]
async def send_slack_message(channel: str, text: str) -> dict:
"""Send a message to a Slack channel."""
async with httpx.AsyncClient() as client:
r = await client.post("https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {os.environ['SLACK_TOKEN']}"},
json={"channel": channel, "text": text})
return r.json()
async def create_github_issue(repo: str, title: str, body: str) -> dict:
"""Create a GitHub issue."""
async with httpx.AsyncClient() as client:
r = await client.post(f"https://api.github.com/repos/{repo}/issues",
headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"},
json={"title": title, "body": body})
data = r.json()
return {"issue_number": data["number"], "url": data["html_url"]}
3 Minutes 25-45: Build the agent loop
# agent.py
import anthropic, json, asyncio
from tools import search_web, send_slack_message, create_github_issue
client = anthropic.Anthropic()
TOOL_MAP = {
"search_web": search_web,
"send_slack_message": send_slack_message,
"create_github_issue": create_github_issue,
}
TOOLS = [
{"name": "search_web", "description": "Search the web for current information",
"input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}},
{"name": "send_slack_message", "description": "Send a message to a Slack channel",
"input_schema": {"type": "object", "properties": {"channel": {"type": "string"}, "text": {"type": "string"}}, "required": ["channel", "text"]}},
{"name": "create_github_issue", "description": "Create a GitHub issue for bug tracking",
"input_schema": {"type": "object", "properties": {"repo": {"type": "string"}, "title": {"type": "string"}, "body": {"type": "string"}}, "required": ["repo", "title", "body"]}}
]
async def run_agent(user_message: str):
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-20250514", max_tokens=1024,
tools=TOOLS, messages=messages
)
if response.stop_reason == "end_turn":
for block in response.content:
if hasattr(block, "text"):
print(f"\nAgent: {block.text}")
return
# Execute tool calls
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f" 🔧 Calling {block.name}...")
result = await TOOL_MAP[block.name](**block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
if __name__ == "__main__":
asyncio.run(run_agent("Search for the latest Claude API updates and post a summary to #engineering on Slack"))
4 Minutes 45-60: Test and iterate
# Test each tool individually
python -c "import asyncio; from tools import search_web; print(asyncio.run(search_web('Claude API 2026')))"
# Run the full agent
python agent.py
# Add error handling, then commit
git init && git add -A && git commit -m "Working agent with 3 API integrations"
What's Next
You now have a complete playbook for connecting AI agents to any API. The patterns here — 3-layer architecture, resilient auth, confirmation gates, error translation — will carry you from prototype to production.
Start with 3 integrations. Get them rock-solid. Then expand. Every new API you add is a capability multiplier that makes your agent more valuable.
The agents that win aren't the smartest — they're the most connected.