Skip to main content
Using the requests library for the TalkPilot API.

Setup

pip install requests
import requests
import os
import time

BASE_URL = os.environ.get("TALKPILOT_BASE_URL", "https://{project_ref}.supabase.co/functions/v1/api/v1")
API_KEY = os.environ.get("TALKPILOT_API_KEY", "tp_live_YOUR_KEY_HERE")

HEADERS = {
    "X-API-Key": API_KEY,
    "Content-Type": "application/json",
}


def talkpilot(path, method="GET", data=None):
    """Make a request to the TalkPilot API with automatic retry on rate limiting."""
    url = f"{BASE_URL}{path}"
    response = requests.request(method, url, headers=HEADERS, json=data)

    # Auto-retry on rate limiting
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        print(f"Rate limited. Retrying in {retry_after}s...")
        time.sleep(retry_after)
        return talkpilot(path, method, data)

    if response.status_code == 204:
        return None

    body = response.json()

    if not response.ok:
        error = body.get("error", {})
        raise Exception(
            f"[{error.get('code')}] {error.get('message')} "
            f"(request_id: {error.get('request_id')})"
        )

    return body

List agents

result = talkpilot("/agents")

print(f"Found {result['pagination']['total']} agents:")
for agent in result["data"]:
    status = "active" if agent["is_active"] else "inactive"
    print(f"  - {agent['name']} ({status})")

Get agent details

agent = talkpilot(f"/agents/{agent_id}")

print(f"Agent: {agent['name']}")
print(f"Model: {agent['llm_provider']}/{agent['llm_model']}")
print(f"Voice: {agent['voice_id']}")
print(f"Active: {agent['is_active']}")

Update agent settings

# Update prompt and greeting
updated = talkpilot(f"/agents/{agent_id}", method="PATCH", data={
    "prompt": "Du bist ein professioneller Kundenservice-Agent fuer Firma X.",
    "greeting": "Willkommen bei Firma X, wie kann ich Ihnen helfen?",
})

print(f"Agent updated: {updated['name']}")

Toggle vacation mode

# Enable vacation mode
talkpilot(f"/agents/{agent_id}", method="PATCH", data={
    "vacation_mode": True,
    "vacation_end": "2026-04-01T00:00:00Z",
    "vacation_notdienst": True,
})

# Disable vacation mode
talkpilot(f"/agents/{agent_id}", method="PATCH", data={
    "vacation_mode": False,
})

Manage employees

# List present employees
result = talkpilot(f"/agents/{agent_id}/employees?status=anwesend")
employees = result["data"]

# Create a new employee
new_employee = talkpilot(f"/agents/{agent_id}/employees", method="POST", data={
    "name": "Max Mustermann",
    "phone_number": "+491701234567",
    "email": "max@example.com",
    "status": "anwesend",
    "active": True,
    "get_mail": True,
})

print(f"Created employee: {new_employee['name']} (ID: {new_employee['id']})")

# Update employee status
talkpilot(f"/agents/{agent_id}/employees/{employee_id}", method="PATCH", data={
    "status": "urlaub",
})

# Delete an employee
talkpilot(f"/agents/{agent_id}/employees/{employee_id}", method="DELETE")

Sync forwarding slots

# Replace all forwarding slots in one request
result = talkpilot(f"/agents/{agent_id}/forwarding-slots", method="PUT", data={
    "slots": [
        {
            "slotnummer": "01",
            "cases": "Rechnungsfragen, Zahlungsprobleme",
            "slot_belegung": "Max Mustermann",
            "prioritaet_1": "+491701234567",
            "prioritaet_2": "+491709876543",
        },
        {
            "slotnummer": "02",
            "cases": "Technischer Support",
            "slot_belegung": "Erika Musterfrau",
            "prioritaet_1": "+491705555555",
        },
    ],
})

print(f"Replaced {result['replaced_count']} slots with {len(result['data'])} new slots")

Fetch calls with filtering

from datetime import datetime, timedelta

# Calls from the last 7 days
seven_days_ago = (datetime.utcnow() - timedelta(days=7)).isoformat() + "Z"

result = talkpilot(f"/agents/{agent_id}/calls?from={seven_days_ago}&limit=50")

print(f"{result['pagination']['total']} calls in the last 7 days")

for call in result["data"]:
    print(f"  [{call['created_at'][:10]}] {call.get('customer_name', 'Unknown')} "
          f"- {call.get('call_summary', 'No summary')[:60]}...")

Fetch all pages

def fetch_all(path):
    """Fetch all items across all pages."""
    all_items = []
    page = 1
    total_pages = 1
    separator = "&" if "?" in path else "?"

    while page <= total_pages:
        result = talkpilot(f"{path}{separator}page={page}&limit=100")
        all_items.extend(result["data"])
        total_pages = result["pagination"]["pages"]
        page += 1

    return all_items


# Usage
all_calls = fetch_all(f"/agents/{agent_id}/calls?from=2026-03-01T00:00:00Z")
print(f"Fetched {len(all_calls)} calls total")

Add knowledge base content

doc = talkpilot(f"/agents/{agent_id}/knowledge-base", method="POST", data={
    "title": "Preisliste 2026",
    "content": """Produkt A: 29,99 EUR/Monat
Produkt B: 49,99 EUR/Monat
Produkt C: 99,99 EUR/Monat
Alle Preise zzgl. MwSt.""",
})

print(f"Document created: {doc['title']} (status: {doc['status']})")

Upload pre-processed chunks with embeddings

import openai

openai_client = openai.OpenAI()


def get_embeddings(texts):
    """Generate embeddings using OpenAI text-embedding-3-small."""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=texts,
    )
    return [item.embedding for item in response.data]


def upload_chunks(agent_id, title, chunks_data):
    """
    Upload pre-processed chunks to a TalkPilot agent's knowledge base.

    chunks_data: list of dicts with 'content' and optional 'metadata'
    """
    # 1. Create parent document
    doc = talkpilot(f"/agents/{agent_id}/knowledge-base", method="POST", data={
        "title": title,
        "content": "",
    })
    doc_id = doc["id"]

    # 2. Generate embeddings
    texts = [c["content"] for c in chunks_data]
    embeddings = get_embeddings(texts)

    # 3. Build chunk payloads
    chunks = []
    for i, (chunk, embedding) in enumerate(zip(chunks_data, embeddings)):
        chunks.append({
            "content": chunk["content"],
            "embedding": embedding,
            "chunk_index": i,
            "token_count": len(chunk["content"].split()),
            "metadata": chunk.get("metadata", {}),
        })

    # 4. Upload chunks
    result = talkpilot(
        f"/agents/{agent_id}/knowledge-base/{doc_id}/chunks",
        method="POST",
        data={"chunks": chunks},
    )
    print(f"Uploaded {result['inserted_count']} chunks for '{title}'")

    # 5. Mark document as completed
    talkpilot(
        f"/agents/{agent_id}/knowledge-base/{doc_id}",
        method="PATCH",
        data={"status": "completed"},
    )

    return doc_id


# Usage
upload_chunks(agent_id, "Product Catalog", [
    {"content": "Produkt A kostet 29,99 EUR pro Monat...", "metadata": {"category": "pricing"}},
    {"content": "Produkt B kostet 49,99 EUR pro Monat...", "metadata": {"category": "pricing"}},
    {"content": "Produkt C kostet 99,99 EUR pro Monat...", "metadata": {"category": "pricing"}},
])

Complete script: HR status sync

#!/usr/bin/env python3
"""
Sync employee statuses from your HR system to TalkPilot.
Run as a cron job or scheduled task.
"""

import os
import requests
import time

BASE_URL = os.environ["TALKPILOT_BASE_URL"]
API_KEY = os.environ["TALKPILOT_API_KEY"]
AGENT_ID = os.environ["TALKPILOT_AGENT_ID"]
HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"}


def talkpilot(path, method="GET", data=None):
    url = f"{BASE_URL}{path}"
    response = requests.request(method, url, headers=HEADERS, json=data)
    if response.status_code == 429:
        time.sleep(int(response.headers.get("Retry-After", 60)))
        return talkpilot(path, method, data)
    if response.status_code == 204:
        return None
    body = response.json()
    if not response.ok:
        raise Exception(f"API error: {body['error']['message']}")
    return body


def get_hr_statuses():
    """Replace this with your actual HR system integration."""
    return [
        {"name": "Max Mustermann", "status": "anwesend"},
        {"name": "Erika Musterfrau", "status": "urlaub"},
    ]


def sync():
    hr_employees = get_hr_statuses()
    tp_result = talkpilot(f"/agents/{AGENT_ID}/employees")
    tp_employees = tp_result["data"]

    updated = 0
    for hr_emp in hr_employees:
        match = next((e for e in tp_employees if e["name"] == hr_emp["name"]), None)
        if match and match["status"] != hr_emp["status"]:
            talkpilot(
                f"/agents/{AGENT_ID}/employees/{match['id']}",
                method="PATCH",
                data={"status": hr_emp["status"]},
            )
            print(f"Updated {match['name']}: {match['status']} -> {hr_emp['status']}")
            updated += 1

    print(f"Sync complete. {updated} employee(s) updated.")


if __name__ == "__main__":
    sync()