AI Agent API Integration: The Complete Guide (2026)

Your AI agent is only as powerful as the APIs it can call. Here's how to connect agents to any service — from Slack to Stripe — with production-grade patterns for auth, retries, and safety.

74%
of agent failures are integration bugs
3x
more capable with 5+ API integrations
$0
to start — most APIs have free tiers
⚡ The core insight: An AI agent without API integrations is just a chatbot. With them, it becomes an operator — reading data, triggering actions, and closing loops across your entire stack.

Why API Integration Is the #1 Agent Skill

Most AI agent tutorials stop at "call an LLM and get a response." But real-world agents need to do things: create Jira tickets, send Slack messages, process payments, update CRMs, query databases.

The difference between a demo agent and a production agent is almost entirely about API integration quality. Here's what that looks like:

Every tool your agent can call is a multiplier on its capabilities. Five well-integrated APIs turn a mediocre agent into something that genuinely replaces manual work.

The 3-Layer Integration Architecture

Don't just slap API calls into your agent. Use this architecture to keep things maintainable:

Layer 1: Tool Definition (What the LLM sees)

This is the schema you pass to the model. Keep it clean, descriptive, and constrained:

{
  "name": "create_jira_ticket",
  "description": "Create a new Jira ticket in the specified project. Use when a bug is reported or a task needs tracking.",
  "parameters": {
    "type": "object",
    "properties": {
      "project": {
        "type": "string",
        "description": "Jira project key (e.g., 'ENG', 'PROD')",
        "enum": ["ENG", "PROD", "DESIGN", "OPS"]
      },
      "title": {
        "type": "string",
        "description": "Brief ticket title (max 100 chars)"
      },
      "description": {
        "type": "string",
        "description": "Detailed description with context"
      },
      "priority": {
        "type": "string",
        "enum": ["critical", "high", "medium", "low"],
        "default": "medium"
      },
      "assignee": {
        "type": "string",
        "description": "Email of the assignee (optional)"
      }
    },
    "required": ["project", "title", "description"]
  }
}
✅ Pro tip: The description field is 10x more important than the parameter names. This is what the LLM reads to decide when and how to use the tool. Be specific about when to use it, not just what it does.

Layer 2: Integration Logic (Your middleware)

This is where auth, validation, rate limiting, and retries live. The LLM never sees this layer:

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class JiraIntegration:
    def __init__(self, base_url: str, email: str, api_token: str):
        self.client = httpx.AsyncClient(
            base_url=base_url,
            auth=(email, api_token),
            headers={"Content-Type": "application/json"},
            timeout=30.0
        )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    async def create_ticket(self, project: str, title: str,
                           description: str, priority: str = "medium",
                           assignee: str = None) -> dict:
        # Validate inputs before hitting the API
        if len(title) > 100:
            title = title[:97] + "..."

        priority_map = {
            "critical": "1", "high": "2",
            "medium": "3", "low": "4"
        }

        payload = {
            "fields": {
                "project": {"key": project},
                "summary": title,
                "description": {
                    "type": "doc",
                    "version": 1,
                    "content": [{"type": "paragraph",
                                "content": [{"type": "text", "text": description}]}]
                },
                "priority": {"id": priority_map.get(priority, "3")},
                "issuetype": {"name": "Task"}
            }
        }

        if assignee:
            payload["fields"]["assignee"] = {"emailAddress": assignee}

        response = await self.client.post("/rest/api/3/issue", json=payload)
        response.raise_for_status()
        data = response.json()

        return {
            "ticket_id": data["key"],
            "url": f"{self.client.base_url}/browse/{data['key']}",
            "status": "created"
        }

Layer 3: Tool Router (Connects LLM to integrations)

class ToolRouter:
    def __init__(self):
        self.tools = {}
        self.integrations = {}

    def register(self, name: str, handler, schema: dict):
        self.tools[name] = {"handler": handler, "schema": schema}

    async def execute(self, tool_name: str, arguments: dict) -> str:
        if tool_name not in self.tools:
            return f"Error: Unknown tool '{tool_name}'"

        try:
            result = await self.tools[tool_name]["handler"](**arguments)
            return json.dumps(result, indent=2)
        except httpx.HTTPStatusError as e:
            return json.dumps({
                "error": f"API returned {e.response.status_code}",
                "message": e.response.text[:200]
            })
        except Exception as e:
            return json.dumps({"error": str(e)})

# Usage
router = ToolRouter()
jira = JiraIntegration(JIRA_URL, JIRA_EMAIL, JIRA_TOKEN)
router.register("create_jira_ticket", jira.create_ticket, jira_schema)

Function Calling: How It Actually Works

Every major LLM provider now supports function calling (tool use). Here's how the flow works under the hood:

  1. You send the tool schemas alongside the user message to the LLM
  2. The LLM decides whether to call a tool (and which one) based on the conversation
  3. You receive a tool call with the function name and JSON arguments
  4. You execute the function against the real API
  5. You send the result back to the LLM as a tool response
  6. The LLM generates a final response incorporating the result

Claude (Anthropic) — Tool Use

import anthropic

client = anthropic.Anthropic()

tools = [
    {
        "name": "search_crm",
        "description": "Search the CRM for customer records by name, email, or company. Returns matching contacts with recent activity.",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Search term"},
                "field": {"type": "string", "enum": ["name", "email", "company"]},
                "limit": {"type": "integer", "default": 5}
            },
            "required": ["query"]
        }
    }
]

response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "Find the account details for Acme Corp"}]
)

# Handle tool calls in the response
for block in response.content:
    if block.type == "tool_use":
        # Execute the actual API call
        result = await crm.search(
            query=block.input["query"],
            field=block.input.get("field", "name"),
            limit=block.input.get("limit", 5)
        )
        # Send result back to Claude
        followup = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            tools=tools,
            messages=[
                {"role": "user", "content": "Find the account details for Acme Corp"},
                {"role": "assistant", "content": response.content},
                {"role": "user", "content": [
                    {"type": "tool_result", "tool_use_id": block.id,
                     "content": json.dumps(result)}
                ]}
            ]
        )

OpenAI — Function Calling

from openai import OpenAI

client = OpenAI()

tools = [
    {
        "type": "function",
        "function": {
            "name": "search_crm",
            "description": "Search CRM for customer records",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                    "field": {"type": "string", "enum": ["name", "email", "company"]}
                },
                "required": ["query"]
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Find Acme Corp"}],
    tools=tools
)

# Process tool calls
if response.choices[0].message.tool_calls:
    for call in response.choices[0].message.tool_calls:
        args = json.loads(call.function.arguments)
        result = await crm.search(**args)
        # Send back as tool message
        messages.append({"role": "tool", "tool_call_id": call.id,
                        "content": json.dumps(result)})

🚀 Want 50+ Pre-Built Agent Prompts?

The AI Employee Playbook includes production-ready system prompts with tool integrations for every department.

Get the Playbook — €29

The 10 Most Valuable API Integrations

Not all integrations are equal. Here are the ones that deliver the most value per hour invested:

Integration Use Case Difficulty ROI
Slack / TeamsNotifications, reports, approvalsEasy🔥🔥🔥
Google CalendarScheduling, availability, remindersMedium🔥🔥🔥
Gmail / SMTPEmail drafts, sends, triageMedium🔥🔥🔥
Jira / LinearTicket creation, status updatesEasy🔥🔥
Stripe / PaymentsInvoice creation, payment statusMedium🔥🔥🔥
HubSpot / CRMContact management, deal trackingMedium🔥🔥🔥
GitHubPR reviews, issue management, deploysEasy🔥🔥
Notion / DocsKnowledge base, documentationEasy🔥🔥
Database (SQL)Read/write business dataHard🔥🔥🔥
Web SearchReal-time research, fact-checkingEasy🔥🔥

Authentication Patterns That Don't Break

Auth is where 90% of API integration headaches live. Here are the patterns that actually work in production:

Pattern 1: API Key (Simple & Common)

class APIKeyAuth:
    """For services like Stripe, SendGrid, Brave Search."""

    def __init__(self, api_key: str, header_name: str = "Authorization",
                 prefix: str = "Bearer"):
        self.headers = {header_name: f"{prefix} {api_key}"}

    def apply(self, request_headers: dict) -> dict:
        return {**request_headers, **self.headers}

# Usage
stripe_auth = APIKeyAuth(os.environ["STRIPE_KEY"])
sendgrid_auth = APIKeyAuth(os.environ["SENDGRID_KEY"])

Pattern 2: OAuth2 with Auto-Refresh

class OAuth2Auth:
    """For Google, Microsoft, HubSpot, Salesforce."""

    def __init__(self, client_id, client_secret, refresh_token, token_url):
        self.client_id = client_id
        self.client_secret = client_secret
        self.refresh_token = refresh_token
        self.token_url = token_url
        self.access_token = None
        self.expires_at = 0

    async def get_token(self) -> str:
        if time.time() < self.expires_at - 60:  # 60s buffer
            return self.access_token

        async with httpx.AsyncClient() as client:
            response = await client.post(self.token_url, data={
                "grant_type": "refresh_token",
                "refresh_token": self.refresh_token,
                "client_id": self.client_id,
                "client_secret": self.client_secret,
            })
            data = response.json()
            self.access_token = data["access_token"]
            self.expires_at = time.time() + data.get("expires_in", 3600)
            return self.access_token

    async def apply(self, headers: dict) -> dict:
        token = await self.get_token()
        return {**headers, "Authorization": f"Bearer {token}"}

# Usage
google_auth = OAuth2Auth(
    client_id=os.environ["GOOGLE_CLIENT_ID"],
    client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
    refresh_token=os.environ["GOOGLE_REFRESH_TOKEN"],
    token_url="https://oauth2.googleapis.com/token"
)

Pattern 3: Credential Vault

class CredentialVault:
    """Centralized credential management for all integrations."""

    def __init__(self, encryption_key: str):
        self._credentials = {}
        self._fernet = Fernet(encryption_key)

    def store(self, service: str, credentials: dict):
        encrypted = self._fernet.encrypt(json.dumps(credentials).encode())
        self._credentials[service] = encrypted

    def get(self, service: str) -> dict:
        encrypted = self._credentials.get(service)
        if not encrypted:
            raise ValueError(f"No credentials for {service}")
        return json.loads(self._fernet.decrypt(encrypted))

    def get_auth(self, service: str) -> APIKeyAuth | OAuth2Auth:
        creds = self.get(service)
        if creds.get("type") == "oauth2":
            return OAuth2Auth(**creds["config"])
        return APIKeyAuth(creds["api_key"])

vault = CredentialVault(os.environ["VAULT_KEY"])
vault.store("stripe", {"type": "api_key", "api_key": os.environ["STRIPE_KEY"]})
vault.store("google", {"type": "oauth2", "config": {...}})
⚠️ Security rules: Never let the LLM see raw API keys. Never store credentials in tool descriptions. Always use environment variables or a vault. Rotate tokens regularly. Log which tools are called but never log the auth headers.

Error Handling That Keeps Agents Running

API calls fail. A lot. Your agent needs to handle every failure mode gracefully and communicate results back to the LLM in a way it can understand:

class ResilientAPIClient:
    """Wraps any API call with retry, timeout, and error translation."""

    def __init__(self, auth, base_url: str, rate_limit: int = 100):
        self.auth = auth
        self.base_url = base_url
        self.semaphore = asyncio.Semaphore(rate_limit)
        self.client = httpx.AsyncClient(base_url=base_url, timeout=30.0)

    async def call(self, method: str, path: str, **kwargs) -> dict:
        async with self.semaphore:
            for attempt in range(3):
                try:
                    headers = await self.auth.apply(kwargs.pop("headers", {}))
                    response = await self.client.request(
                        method, path, headers=headers, **kwargs
                    )

                    if response.status_code == 429:  # Rate limited
                        retry_after = int(response.headers.get("Retry-After", 5))
                        await asyncio.sleep(retry_after)
                        continue

                    response.raise_for_status()
                    return {"success": True, "data": response.json()}

                except httpx.TimeoutException:
                    if attempt == 2:
                        return {"success": False, "error": "API timed out after 3 attempts"}
                    await asyncio.sleep(2 ** attempt)

                except httpx.HTTPStatusError as e:
                    error_map = {
                        400: "Bad request — check your parameters",
                        401: "Authentication failed — token may be expired",
                        403: "Permission denied — insufficient API scope",
                        404: "Resource not found",
                        500: "Server error — try again later",
                    }
                    return {
                        "success": False,
                        "error": error_map.get(e.response.status_code,
                                              f"HTTP {e.response.status_code}"),
                        "details": e.response.text[:200]
                    }

The key insight: return structured error messages that help the LLM self-correct. Don't just crash — tell the model what went wrong so it can try a different approach or inform the user.

5 Production Patterns You Need

Pattern 1: Confirmation Before Destructive Actions

DESTRUCTIVE_TOOLS = {"delete_customer", "send_email", "process_payment",
                      "deploy_production", "cancel_subscription"}

async def execute_with_confirmation(tool_name: str, args: dict) -> str:
    if tool_name in DESTRUCTIVE_TOOLS:
        return json.dumps({
            "status": "confirmation_required",
            "action": tool_name,
            "details": args,
            "message": f"This will {tool_name.replace('_', ' ')}. "
                      f"Please confirm by saying 'yes, proceed'."
        })
    return await router.execute(tool_name, args)

Pattern 2: Read-Before-Write

Always fetch current state before modifying. This prevents the agent from overwriting data based on stale context:

async def update_crm_contact(contact_id: str, updates: dict) -> dict:
    # Step 1: Read current state
    current = await crm.get_contact(contact_id)

    # Step 2: Merge (don't overwrite)
    merged = {**current, **updates}

    # Step 3: Diff check
    changes = {k: v for k, v in updates.items() if current.get(k) != v}
    if not changes:
        return {"status": "no_changes", "message": "Contact already up to date"}

    # Step 4: Write with version check (optimistic locking)
    result = await crm.update_contact(
        contact_id, changes,
        expected_version=current["version"]
    )
    return {"status": "updated", "changes": changes}

Pattern 3: Batch Operations

async def batch_create_tickets(tickets: list[dict]) -> dict:
    """Create multiple tickets efficiently."""
    results = []
    errors = []

    # Use asyncio.gather for parallel execution (respect rate limits)
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent

    async def create_one(ticket):
        async with semaphore:
            try:
                result = await jira.create_ticket(**ticket)
                results.append(result)
            except Exception as e:
                errors.append({"ticket": ticket["title"], "error": str(e)})

    await asyncio.gather(*[create_one(t) for t in tickets])

    return {
        "created": len(results),
        "failed": len(errors),
        "tickets": results,
        "errors": errors
    }

Pattern 4: Webhook Receivers (Two-Way Integration)

from fastapi import FastAPI, Request

app = FastAPI()

@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
    payload = await request.body()
    sig = request.headers.get("stripe-signature")

    # Verify webhook signature
    event = stripe.Webhook.construct_event(payload, sig, WEBHOOK_SECRET)

    if event["type"] == "payment_intent.succeeded":
        # Trigger agent action
        await agent.process_event({
            "type": "payment_received",
            "customer": event["data"]["object"]["customer"],
            "amount": event["data"]["object"]["amount"] / 100,
            "currency": event["data"]["object"]["currency"]
        })

    return {"status": "ok"}

@app.post("/webhooks/github")
async def github_webhook(request: Request):
    event_type = request.headers.get("X-GitHub-Event")
    payload = await request.json()

    if event_type == "pull_request" and payload["action"] == "opened":
        await agent.process_event({
            "type": "pr_opened",
            "repo": payload["repository"]["full_name"],
            "title": payload["pull_request"]["title"],
            "author": payload["pull_request"]["user"]["login"],
            "url": payload["pull_request"]["html_url"]
        })

Pattern 5: Result Caching

from functools import lru_cache
import hashlib

class CachedIntegration:
    def __init__(self, integration, ttl_seconds: int = 300):
        self.integration = integration
        self.cache = {}
        self.ttl = ttl_seconds

    async def call(self, method: str, **kwargs) -> dict:
        # Only cache GET/read operations
        if method in ("search", "get", "list", "fetch"):
            cache_key = hashlib.md5(
                f"{method}:{json.dumps(kwargs, sort_keys=True)}".encode()
            ).hexdigest()

            cached = self.cache.get(cache_key)
            if cached and time.time() - cached["time"] < self.ttl:
                return {**cached["data"], "_cached": True}

            result = await getattr(self.integration, method)(**kwargs)
            self.cache[cache_key] = {"data": result, "time": time.time()}
            return result

        # Write operations bypass cache and invalidate
        result = await getattr(self.integration, method)(**kwargs)
        self.cache.clear()  # Simple invalidation
        return result

MCP: The Universal Integration Protocol

If you're building agents that need to talk to many services, look at the Model Context Protocol (MCP). It standardizes how agents discover and use tools:

// MCP server exposing CRM tools
const server = new McpServer({
  name: "crm-integration",
  version: "1.0.0"
});

server.tool(
  "search_contacts",
  "Search CRM contacts by name, email, or company",
  {
    query: z.string().describe("Search term"),
    field: z.enum(["name", "email", "company"]).optional(),
    limit: z.number().default(10)
  },
  async ({ query, field, limit }) => {
    const results = await crm.search(query, field, limit);
    return {
      content: [{
        type: "text",
        text: JSON.stringify(results, null, 2)
      }]
    };
  }
);

server.tool(
  "create_deal",
  "Create a new deal/opportunity in the CRM pipeline",
  {
    contact_id: z.string(),
    deal_name: z.string(),
    value: z.number(),
    stage: z.enum(["lead", "qualified", "proposal", "negotiation", "closed"])
  },
  async ({ contact_id, deal_name, value, stage }) => {
    const deal = await crm.createDeal({ contact_id, deal_name, value, stage });
    return {
      content: [{ type: "text", text: `Deal created: ${deal.id} — ${deal.url}` }]
    };
  }
);

MCP gives you one protocol for all integrations instead of building custom adapters for every LLM provider. Write once, use with Claude, GPT, Gemini, or any MCP-compatible client.

📦 50+ Production Agent Templates

Complete system prompts with tool definitions, integration patterns, and deployment configs for every business function.

Get the Playbook — €29

Real-World Integration: Customer Support Agent

Let's build a complete customer support agent that integrates with 4 APIs. This is what production actually looks like:

import anthropic
import json
from datetime import datetime

# Initialize integrations
zendesk = ZendeskIntegration(ZENDESK_URL, ZENDESK_TOKEN)
stripe_client = StripeIntegration(STRIPE_KEY)
slack = SlackIntegration(SLACK_TOKEN)
knowledge_base = RAGPipeline(PINECONE_KEY, OPENAI_KEY)

TOOLS = [
    {
        "name": "search_knowledge_base",
        "description": "Search internal documentation and help articles. Use FIRST before escalating.",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Natural language search query"}
            },
            "required": ["query"]
        }
    },
    {
        "name": "get_customer_info",
        "description": "Look up customer account details including subscription, payment history, and previous tickets.",
        "input_schema": {
            "type": "object",
            "properties": {
                "email": {"type": "string", "description": "Customer email address"}
            },
            "required": ["email"]
        }
    },
    {
        "name": "create_ticket",
        "description": "Escalate to human support. Only use after attempting to resolve via knowledge base.",
        "input_schema": {
            "type": "object",
            "properties": {
                "subject": {"type": "string"},
                "description": {"type": "string"},
                "priority": {"type": "string", "enum": ["low", "normal", "high", "urgent"]},
                "customer_email": {"type": "string"}
            },
            "required": ["subject", "description", "customer_email"]
        }
    },
    {
        "name": "issue_refund",
        "description": "Process a refund for a specific payment. Requires confirmation. Max $500 auto-approved.",
        "input_schema": {
            "type": "object",
            "properties": {
                "payment_id": {"type": "string"},
                "amount_cents": {"type": "integer"},
                "reason": {"type": "string"}
            },
            "required": ["payment_id", "reason"]
        }
    },
    {
        "name": "notify_team",
        "description": "Send a notification to the support team Slack channel. Use for urgent issues or patterns.",
        "input_schema": {
            "type": "object",
            "properties": {
                "message": {"type": "string"},
                "channel": {"type": "string", "default": "#support-alerts"},
                "urgency": {"type": "string", "enum": ["info", "warning", "critical"]}
            },
            "required": ["message"]
        }
    }
]

SYSTEM_PROMPT = """You are a Tier 1 support agent for SaaS Corp.

## Rules
1. ALWAYS search the knowledge base first
2. Look up customer info to personalize responses
3. Only escalate (create ticket) after attempting resolution
4. Refunds up to $500 — auto-approved. Over $500 — escalate
5. Notify the team on Slack for: outage reports, security issues, payment failures affecting multiple users
6. Be empathetic but efficient. No fluff.

## Tone
Professional, warm, concise. First-name basis. Acknowledge frustration before solving.
"""

async def handle_support_request(customer_message: str, customer_email: str):
    messages = [
        {"role": "user", "content": f"[Customer: {customer_email}]\n{customer_message}"}
    ]

    while True:
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            system=SYSTEM_PROMPT,
            tools=TOOLS,
            messages=messages
        )

        # Check if we're done (no more tool calls)
        if response.stop_reason == "end_turn":
            return extract_text(response.content)

        # Process tool calls
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                result = await execute_tool(block.name, block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": json.dumps(result)
                })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

Testing API Integrations

Don't test in production. Here's a testing setup that catches integration bugs before they hit real APIs:

import pytest
from unittest.mock import AsyncMock

@pytest.fixture
def mock_router():
    router = ToolRouter()

    # Mock Jira
    mock_jira = AsyncMock()
    mock_jira.create_ticket.return_value = {
        "ticket_id": "ENG-123",
        "url": "https://jira.example.com/ENG-123",
        "status": "created"
    }
    router.register("create_jira_ticket", mock_jira.create_ticket, {})

    # Mock CRM
    mock_crm = AsyncMock()
    mock_crm.search.return_value = [
        {"name": "Acme Corp", "email": "billing@acme.com", "plan": "enterprise"}
    ]
    router.register("search_crm", mock_crm.search, {})

    return router

@pytest.mark.asyncio
async def test_agent_uses_correct_tool(mock_router):
    """Verify the agent picks the right tool for the task."""
    result = await mock_router.execute("search_crm", {"query": "Acme"})
    assert "Acme Corp" in result

@pytest.mark.asyncio
async def test_error_handling():
    """Verify graceful handling of API failures."""
    client = ResilientAPIClient(auth=mock_auth, base_url="https://api.example.com")

    # Simulate 500 error
    with respx.mock:
        respx.get("https://api.example.com/users").respond(500)
        result = await client.call("GET", "/users")
        assert result["success"] is False
        assert "Server error" in result["error"]

@pytest.mark.asyncio
async def test_rate_limit_handling():
    """Verify 429 retry logic."""
    with respx.mock:
        route = respx.get("https://api.example.com/data")
        route.side_effect = [
            httpx.Response(429, headers={"Retry-After": "1"}),
            httpx.Response(200, json={"data": "success"})
        ]
        result = await client.call("GET", "/data")
        assert result["success"] is True

Common Mistakes (And How to Fix Them)

  1. Giving the agent write access on day one. Start read-only. Let the agent search, fetch, and analyze before you let it create, update, or delete. Trust is earned.
  2. Vague tool descriptions. "Manages calendar" tells the LLM nothing. "Creates a new Google Calendar event with title, time, duration, and optional attendees. Use when the user wants to schedule a meeting" is what works.
  3. Not handling partial failures. If your agent calls 3 APIs and the second one fails, do you roll back the first? You need to decide — and implement — this upfront.
  4. Exposing internal IDs to users. Return human-readable responses. "Ticket ENG-423 created — I've assigned it to Sarah" beats "Created: {id: 10423, assignee_id: 8821}".
  5. Ignoring rate limits. Most free API tiers have tight limits. One overeager agent can burn through your monthly quota in an hour. Always implement throttling.
  6. No audit trail. Log every tool call with timestamp, inputs, outputs, and latency. When something goes wrong (and it will), you need to replay exactly what happened.
  7. Too many tools at once. Start with 3-5 tools. LLMs get worse at tool selection as the number of options grows. Add incrementally.

60-Minute Quickstart

Get a working agent with 3 API integrations in one hour:

1 Minutes 0-10: Set up project

mkdir my-agent && cd my-agent
python -m venv venv && source venv/bin/activate
pip install anthropic httpx python-dotenv

# .env
ANTHROPIC_API_KEY=sk-ant-...
SLACK_TOKEN=xoxb-...
GITHUB_TOKEN=ghp_...

2 Minutes 10-25: Build the tool layer

# tools.py
import httpx, os

async def search_web(query: str) -> dict:
    """Search the web using Brave Search API."""
    async with httpx.AsyncClient() as client:
        r = await client.get("https://api.search.brave.com/res/v1/web/search",
            params={"q": query, "count": 5},
            headers={"X-Subscription-Token": os.environ["BRAVE_KEY"]})
        results = r.json().get("web", {}).get("results", [])
        return [{"title": r["title"], "url": r["url"],
                 "snippet": r.get("description", "")} for r in results[:5]]

async def send_slack_message(channel: str, text: str) -> dict:
    """Send a message to a Slack channel."""
    async with httpx.AsyncClient() as client:
        r = await client.post("https://slack.com/api/chat.postMessage",
            headers={"Authorization": f"Bearer {os.environ['SLACK_TOKEN']}"},
            json={"channel": channel, "text": text})
        return r.json()

async def create_github_issue(repo: str, title: str, body: str) -> dict:
    """Create a GitHub issue."""
    async with httpx.AsyncClient() as client:
        r = await client.post(f"https://api.github.com/repos/{repo}/issues",
            headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"},
            json={"title": title, "body": body})
        data = r.json()
        return {"issue_number": data["number"], "url": data["html_url"]}

3 Minutes 25-45: Build the agent loop

# agent.py
import anthropic, json, asyncio
from tools import search_web, send_slack_message, create_github_issue

client = anthropic.Anthropic()
TOOL_MAP = {
    "search_web": search_web,
    "send_slack_message": send_slack_message,
    "create_github_issue": create_github_issue,
}

TOOLS = [
    {"name": "search_web", "description": "Search the web for current information",
     "input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}},
    {"name": "send_slack_message", "description": "Send a message to a Slack channel",
     "input_schema": {"type": "object", "properties": {"channel": {"type": "string"}, "text": {"type": "string"}}, "required": ["channel", "text"]}},
    {"name": "create_github_issue", "description": "Create a GitHub issue for bug tracking",
     "input_schema": {"type": "object", "properties": {"repo": {"type": "string"}, "title": {"type": "string"}, "body": {"type": "string"}}, "required": ["repo", "title", "body"]}}
]

async def run_agent(user_message: str):
    messages = [{"role": "user", "content": user_message}]

    while True:
        response = client.messages.create(
            model="claude-sonnet-4-20250514", max_tokens=1024,
            tools=TOOLS, messages=messages
        )

        if response.stop_reason == "end_turn":
            for block in response.content:
                if hasattr(block, "text"):
                    print(f"\nAgent: {block.text}")
            return

        # Execute tool calls
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                print(f"  🔧 Calling {block.name}...")
                result = await TOOL_MAP[block.name](**block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": json.dumps(result)
                })

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})

if __name__ == "__main__":
    asyncio.run(run_agent("Search for the latest Claude API updates and post a summary to #engineering on Slack"))

4 Minutes 45-60: Test and iterate

# Test each tool individually
python -c "import asyncio; from tools import search_web; print(asyncio.run(search_web('Claude API 2026')))"

# Run the full agent
python agent.py

# Add error handling, then commit
git init && git add -A && git commit -m "Working agent with 3 API integrations"

What's Next

You now have a complete playbook for connecting AI agents to any API. The patterns here — 3-layer architecture, resilient auth, confirmation gates, error translation — will carry you from prototype to production.

Start with 3 integrations. Get them rock-solid. Then expand. Every new API you add is a capability multiplier that makes your agent more valuable.

The agents that win aren't the smartest — they're the most connected.

⚡ Get the AI Employee Playbook — 50+ production agent templates for €29