requests library for the TalkPilot API.
Setup
pip install requests
import requests
import os
import time
BASE_URL = os.environ.get("TALKPILOT_BASE_URL", "https://{project_ref}.supabase.co/functions/v1/api/v1")
API_KEY = os.environ.get("TALKPILOT_API_KEY", "tp_live_YOUR_KEY_HERE")
HEADERS = {
"X-API-Key": API_KEY,
"Content-Type": "application/json",
}
def talkpilot(path, method="GET", data=None):
"""Make a request to the TalkPilot API with automatic retry on rate limiting."""
url = f"{BASE_URL}{path}"
response = requests.request(method, url, headers=HEADERS, json=data)
# Auto-retry on rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Retrying in {retry_after}s...")
time.sleep(retry_after)
return talkpilot(path, method, data)
if response.status_code == 204:
return None
body = response.json()
if not response.ok:
error = body.get("error", {})
raise Exception(
f"[{error.get('code')}] {error.get('message')} "
f"(request_id: {error.get('request_id')})"
)
return body
List agents
result = talkpilot("/agents")
print(f"Found {result['pagination']['total']} agents:")
for agent in result["data"]:
status = "active" if agent["is_active"] else "inactive"
print(f" - {agent['name']} ({status})")
Get agent details
agent = talkpilot(f"/agents/{agent_id}")
print(f"Agent: {agent['name']}")
print(f"Model: {agent['llm_provider']}/{agent['llm_model']}")
print(f"Voice: {agent['voice_id']}")
print(f"Active: {agent['is_active']}")
Update agent settings
# Update prompt and greeting
updated = talkpilot(f"/agents/{agent_id}", method="PATCH", data={
"prompt": "Du bist ein professioneller Kundenservice-Agent fuer Firma X.",
"greeting": "Willkommen bei Firma X, wie kann ich Ihnen helfen?",
})
print(f"Agent updated: {updated['name']}")
Toggle vacation mode
# Enable vacation mode
talkpilot(f"/agents/{agent_id}", method="PATCH", data={
"vacation_mode": True,
"vacation_end": "2026-04-01T00:00:00Z",
"vacation_notdienst": True,
})
# Disable vacation mode
talkpilot(f"/agents/{agent_id}", method="PATCH", data={
"vacation_mode": False,
})
Manage employees
# List present employees
result = talkpilot(f"/agents/{agent_id}/employees?status=anwesend")
employees = result["data"]
# Create a new employee
new_employee = talkpilot(f"/agents/{agent_id}/employees", method="POST", data={
"name": "Max Mustermann",
"phone_number": "+491701234567",
"email": "max@example.com",
"status": "anwesend",
"active": True,
"get_mail": True,
})
print(f"Created employee: {new_employee['name']} (ID: {new_employee['id']})")
# Update employee status
talkpilot(f"/agents/{agent_id}/employees/{employee_id}", method="PATCH", data={
"status": "urlaub",
})
# Delete an employee
talkpilot(f"/agents/{agent_id}/employees/{employee_id}", method="DELETE")
Sync forwarding slots
# Replace all forwarding slots in one request
result = talkpilot(f"/agents/{agent_id}/forwarding-slots", method="PUT", data={
"slots": [
{
"slotnummer": "01",
"cases": "Rechnungsfragen, Zahlungsprobleme",
"slot_belegung": "Max Mustermann",
"prioritaet_1": "+491701234567",
"prioritaet_2": "+491709876543",
},
{
"slotnummer": "02",
"cases": "Technischer Support",
"slot_belegung": "Erika Musterfrau",
"prioritaet_1": "+491705555555",
},
],
})
print(f"Replaced {result['replaced_count']} slots with {len(result['data'])} new slots")
Fetch calls with filtering
from datetime import datetime, timedelta
# Calls from the last 7 days
seven_days_ago = (datetime.utcnow() - timedelta(days=7)).isoformat() + "Z"
result = talkpilot(f"/agents/{agent_id}/calls?from={seven_days_ago}&limit=50")
print(f"{result['pagination']['total']} calls in the last 7 days")
for call in result["data"]:
print(f" [{call['created_at'][:10]}] {call.get('customer_name', 'Unknown')} "
f"- {call.get('call_summary', 'No summary')[:60]}...")
Fetch all pages
def fetch_all(path):
"""Fetch all items across all pages."""
all_items = []
page = 1
total_pages = 1
separator = "&" if "?" in path else "?"
while page <= total_pages:
result = talkpilot(f"{path}{separator}page={page}&limit=100")
all_items.extend(result["data"])
total_pages = result["pagination"]["pages"]
page += 1
return all_items
# Usage
all_calls = fetch_all(f"/agents/{agent_id}/calls?from=2026-03-01T00:00:00Z")
print(f"Fetched {len(all_calls)} calls total")
Add knowledge base content
doc = talkpilot(f"/agents/{agent_id}/knowledge-base", method="POST", data={
"title": "Preisliste 2026",
"content": """Produkt A: 29,99 EUR/Monat
Produkt B: 49,99 EUR/Monat
Produkt C: 99,99 EUR/Monat
Alle Preise zzgl. MwSt.""",
})
print(f"Document created: {doc['title']} (status: {doc['status']})")
Upload pre-processed chunks with embeddings
import openai
openai_client = openai.OpenAI()
def get_embeddings(texts):
"""Generate embeddings using OpenAI text-embedding-3-small."""
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=texts,
)
return [item.embedding for item in response.data]
def upload_chunks(agent_id, title, chunks_data):
"""
Upload pre-processed chunks to a TalkPilot agent's knowledge base.
chunks_data: list of dicts with 'content' and optional 'metadata'
"""
# 1. Create parent document
doc = talkpilot(f"/agents/{agent_id}/knowledge-base", method="POST", data={
"title": title,
"content": "",
})
doc_id = doc["id"]
# 2. Generate embeddings
texts = [c["content"] for c in chunks_data]
embeddings = get_embeddings(texts)
# 3. Build chunk payloads
chunks = []
for i, (chunk, embedding) in enumerate(zip(chunks_data, embeddings)):
chunks.append({
"content": chunk["content"],
"embedding": embedding,
"chunk_index": i,
"token_count": len(chunk["content"].split()),
"metadata": chunk.get("metadata", {}),
})
# 4. Upload chunks
result = talkpilot(
f"/agents/{agent_id}/knowledge-base/{doc_id}/chunks",
method="POST",
data={"chunks": chunks},
)
print(f"Uploaded {result['inserted_count']} chunks for '{title}'")
# 5. Mark document as completed
talkpilot(
f"/agents/{agent_id}/knowledge-base/{doc_id}",
method="PATCH",
data={"status": "completed"},
)
return doc_id
# Usage
upload_chunks(agent_id, "Product Catalog", [
{"content": "Produkt A kostet 29,99 EUR pro Monat...", "metadata": {"category": "pricing"}},
{"content": "Produkt B kostet 49,99 EUR pro Monat...", "metadata": {"category": "pricing"}},
{"content": "Produkt C kostet 99,99 EUR pro Monat...", "metadata": {"category": "pricing"}},
])
Complete script: HR status sync
#!/usr/bin/env python3
"""
Sync employee statuses from your HR system to TalkPilot.
Run as a cron job or scheduled task.
"""
import os
import requests
import time
BASE_URL = os.environ["TALKPILOT_BASE_URL"]
API_KEY = os.environ["TALKPILOT_API_KEY"]
AGENT_ID = os.environ["TALKPILOT_AGENT_ID"]
HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"}
def talkpilot(path, method="GET", data=None):
url = f"{BASE_URL}{path}"
response = requests.request(method, url, headers=HEADERS, json=data)
if response.status_code == 429:
time.sleep(int(response.headers.get("Retry-After", 60)))
return talkpilot(path, method, data)
if response.status_code == 204:
return None
body = response.json()
if not response.ok:
raise Exception(f"API error: {body['error']['message']}")
return body
def get_hr_statuses():
"""Replace this with your actual HR system integration."""
return [
{"name": "Max Mustermann", "status": "anwesend"},
{"name": "Erika Musterfrau", "status": "urlaub"},
]
def sync():
hr_employees = get_hr_statuses()
tp_result = talkpilot(f"/agents/{AGENT_ID}/employees")
tp_employees = tp_result["data"]
updated = 0
for hr_emp in hr_employees:
match = next((e for e in tp_employees if e["name"] == hr_emp["name"]), None)
if match and match["status"] != hr_emp["status"]:
talkpilot(
f"/agents/{AGENT_ID}/employees/{match['id']}",
method="PATCH",
data={"status": hr_emp["status"]},
)
print(f"Updated {match['name']}: {match['status']} -> {hr_emp['status']}")
updated += 1
print(f"Sync complete. {updated} employee(s) updated.")
if __name__ == "__main__":
sync()