Python Client SDK¶
Overview¶
membrain-client is a lightweight Python client for the Membrain AI Safety Gateway. It acts as a drop-in replacement for the OpenAI Python SDK, letting you route all LLM traffic through Membrain with a one-line change. Every request passes through Membrain's full middleware pipeline -- PII detection, rate limiting, budget enforcement, caching, and audit logging -- without requiring any changes to your application logic.
The SDK provides both synchronous (MembrainClient) and asynchronous (AsyncMembrainClient) clients, mirroring the openai.OpenAI and openai.AsyncOpenAI interfaces respectively.
Key characteristics:
- Mirrors the
client.chat.completions.create()calling convention from the OpenAI SDK - Built on
httpxwith no dependency on theopenaipackage - Supports streaming (SSE) and non-streaming completions
- Supports multi-tenant isolation via project and user headers
- Pure dataclass response models (no Pydantic dependency in the client)
- Python 3.10+
Installation¶
Or install from source:
For development (includes pytest and pytest-asyncio):
Dependencies¶
| Package | Version |
|---|---|
httpx |
>= 0.27 |
Quick Start¶
The migration from the OpenAI SDK is a single-line change to your import and client constructor:
# Before -- calling OpenAI directly
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)
# After -- routed through Membrain
from membrain_client import MembrainClient
client = MembrainClient(base_url="http://localhost:8001")
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)
The interface is intentionally identical: client.chat.completions.create(...) works the same way in both cases.
Client Initialization¶
MembrainClient (synchronous)¶
from membrain_client import MembrainClient
client = MembrainClient(
base_url="http://localhost:8001", # Membrain gateway URL
api_key="ck_live_abc123", # Membrain API key (optional)
user_id="user-42", # User identifier for audit/rate-limit (optional)
project="my-project", # Project scope for multi-tenancy (optional)
timeout=120.0, # Request timeout in seconds (default: 120)
)
AsyncMembrainClient (asynchronous)¶
from membrain_client import AsyncMembrainClient
client = AsyncMembrainClient(
base_url="http://localhost:8001",
api_key="ck_live_abc123",
user_id="user-42",
project="my-project",
timeout=120.0,
)
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
base_url |
str |
"http://localhost:8001" |
URL of the Membrain gateway. Trailing slashes are stripped automatically. |
api_key |
str \| None |
None |
Membrain API key. Sent as the x-membrain-api-key header. When auth is enabled on the gateway, this is required. |
user_id |
str \| None |
None |
User identifier. Sent as x-membrain-user-id. Used for per-user rate limiting, budget tracking, and audit logs. |
project |
str \| None |
None |
Project name. Sent as x-membrain-project. Scopes requests to a specific project for multi-tenant deployments. |
timeout |
float |
120.0 |
HTTP request timeout in seconds. Applies to both connection and read timeouts. |
When api_key, user_id, or project are None, the corresponding header is omitted entirely from the request.
Chat Completions¶
Basic Usage¶
from membrain_client import MembrainClient
client = MembrainClient(base_url="http://localhost:8001", api_key="ck_live_abc123")
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain PII detection in two sentences."},
],
)
# Access the response
print(response.choices[0].message.content)
print(response.choices[0].message.role) # "assistant"
print(response.choices[0].finish_reason) # "stop"
# Token usage
print(response.usage.prompt_tokens)
print(response.usage.completion_tokens)
print(response.usage.total_tokens)
With Optional Parameters¶
Any extra keyword arguments are forwarded in the request body, allowing you to pass model-specific parameters like temperature and max_tokens:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a haiku about security."}],
temperature=0.7,
max_tokens=100,
)
Response Objects¶
The SDK returns Python dataclasses (not Pydantic models), keeping the client dependency-free:
ChatCompletion
| Field | Type | Description |
|---|---|---|
id |
str |
Unique completion identifier (e.g., "chatcmpl-abc123") |
model |
str |
Model that generated the response |
choices |
list[Choice] |
List of completion choices |
usage |
Usage |
Token usage statistics |
Choice
| Field | Type | Description |
|---|---|---|
index |
int |
Index of this choice |
message |
ChatMessage |
The assistant's response message |
finish_reason |
str \| None |
Why generation stopped ("stop", "length", etc.) |
ChatMessage
| Field | Type | Description |
|---|---|---|
role |
str |
Message role ("assistant", "user", "system") |
content |
str \| None |
Message text content |
Usage
| Field | Type | Description |
|---|---|---|
prompt_tokens |
int |
Tokens in the prompt |
completion_tokens |
int |
Tokens in the completion |
total_tokens |
int |
Sum of prompt and completion tokens |
Streaming¶
Pass stream=True to receive an incremental generator of ChatCompletionChunk objects. The SDK handles SSE parsing internally.
Synchronous Streaming¶
from membrain_client import MembrainClient
client = MembrainClient(base_url="http://localhost:8001")
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story."}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
# Detect when streaming is complete
if chunk.choices[0].finish_reason == "stop":
print() # newline after final chunk
Asynchronous Streaming¶
import asyncio
from membrain_client import AsyncMembrainClient
async def main():
client = AsyncMembrainClient(base_url="http://localhost:8001")
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story."}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
await client.aclose()
asyncio.run(main())
Streaming Response Objects¶
ChatCompletionChunk
| Field | Type | Description |
|---|---|---|
id |
str |
Completion identifier (same across all chunks) |
model |
str |
Model name |
choices |
list[StreamChoice] |
Chunk choices |
StreamChoice
| Field | Type | Description |
|---|---|---|
index |
int |
Choice index |
delta |
StreamDelta |
Incremental content for this chunk |
finish_reason |
str \| None |
Set on the final chunk (e.g., "stop") |
StreamDelta
| Field | Type | Description |
|---|---|---|
role |
str \| None |
Present only in the first chunk ("assistant") |
content |
str \| None |
Incremental text content |
PII Badge Handling¶
Membrain's PII detection runs transparently in the middleware pipeline. The gateway intercepts requests, detects and replaces PII tokens (e.g., email addresses, phone numbers, SSNs) with placeholders before sending to the LLM provider, then restores original values in the response before returning to the client.
From the SDK consumer's perspective, this process is invisible -- responses arrive with original PII values intact. The PII detection happens server-side, and the SDK itself does not expose PII metadata in the response dataclasses.
To inspect PII activity for your requests, use the Membrain gateway's audit and reporting APIs:
import httpx
# Query the audit log for PII findings
response = httpx.get(
"http://localhost:8001/api/audit",
headers={"x-membrain-api-key": "ck_live_abc123"},
)
audit_entries = response.json()
# Each entry includes pii_findings_count and pii_labels
for entry in audit_entries:
if entry.get("pii_findings_count", 0) > 0:
print(f"Model: {entry['model']}, PII detected: {entry['pii_findings_count']}")
The gateway also returns rate limit information in response headers (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset) when rate limiting is active. These headers are available on the raw HTTP response but are not currently surfaced on the SDK's dataclass objects.
Error Handling¶
The SDK uses httpx for HTTP communication. Errors propagate as httpx exceptions, which you can catch at the appropriate granularity.
HTTP Errors¶
import httpx
from membrain_client import MembrainClient
client = MembrainClient(base_url="http://localhost:8001", api_key="ck_live_abc123")
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
except httpx.HTTPStatusError as e:
# Raised for 4xx and 5xx responses (via raise_for_status())
print(f"HTTP {e.response.status_code}: {e.response.text}")
except httpx.ConnectError:
# Gateway is unreachable
print("Cannot connect to Membrain gateway")
except httpx.TimeoutException:
# Request exceeded the configured timeout
print("Request timed out")
Common HTTP Status Codes¶
| Status | Meaning | Cause |
|---|---|---|
401 |
Unauthorized | Missing or invalid api_key when auth is enabled |
402 |
Payment Required | Budget limit exceeded for the user/project |
429 |
Too Many Requests | Rate limit exceeded. Check the Retry-After header |
502 |
Bad Gateway | All upstream LLM providers failed |
Retry Behavior¶
The SDK does not implement automatic retries. You are responsible for implementing retry logic appropriate for your use case:
import time
import httpx
from membrain_client import MembrainClient
client = MembrainClient(base_url="http://localhost:8001")
def chat_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
return client.chat.completions.create(
model="gpt-4",
messages=messages,
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 5))
time.sleep(retry_after)
continue
raise
except (httpx.ConnectError, httpx.TimeoutException):
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
Advanced Usage¶
Context Managers¶
Both clients support context manager protocols to ensure the underlying HTTP connection is properly closed:
# Synchronous
with MembrainClient(base_url="http://localhost:8001") as client:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
# Connection is closed automatically
# Asynchronous
async with AsyncMembrainClient(base_url="http://localhost:8001") as client:
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
# Connection is closed automatically
Async Support¶
The AsyncMembrainClient is a fully asynchronous client built on httpx.AsyncClient. It shares the same constructor signature and API surface as MembrainClient, but all completion methods are async:
import asyncio
from membrain_client import AsyncMembrainClient
async def main():
client = AsyncMembrainClient(
base_url="http://localhost:8001",
api_key="ck_live_abc123",
user_id="user-42",
)
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
print(response.choices[0].message.content)
await client.aclose()
asyncio.run(main())
| Sync method | Async equivalent |
|---|---|
client.close() |
await client.aclose() |
with client: |
async with client: |
for chunk in stream: |
async for chunk in stream: |
Routing Headers¶
Membrain supports intelligent routing via request headers. While the SDK does not have dedicated parameters for these, you can influence routing by setting headers on the underlying httpx client:
from membrain_client import MembrainClient
client = MembrainClient(base_url="http://localhost:8001")
# Access the underlying httpx.Client to set routing headers
client._client.headers.update({
"x-membrain-tier": "quality", # Routing tier: "quality", "balanced", "speed"
"x-membrain-private": "true", # Force private/local model routing
"x-membrain-max-cost": "0.01", # Max cost per 1K tokens
})
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
Passing Extra Model Parameters¶
Any keyword arguments beyond model, messages, and stream are forwarded directly in the JSON request body. This lets you use provider-specific parameters:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
temperature=0.5,
max_tokens=200,
top_p=0.9,
frequency_penalty=0.5,
presence_penalty=0.5,
stop=["\n"],
)
API Reference¶
Exports¶
The membrain_client package exports the following names:
from membrain_client import (
MembrainClient, # Synchronous client
AsyncMembrainClient, # Asynchronous client
ChatCompletion, # Non-streaming response dataclass
ChatMessage, # Message dataclass (role + content)
)
Additional dataclasses available from membrain_client.client:
from membrain_client.client import (
Choice, # Non-streaming choice (index, message, finish_reason)
Usage, # Token usage (prompt_tokens, completion_tokens, total_tokens)
ChatCompletionChunk, # Streaming response chunk dataclass
StreamChoice, # Streaming choice (index, delta, finish_reason)
StreamDelta, # Streaming delta (role, content)
)
Request Flow¶
Your Application
|
v
MembrainClient.chat.completions.create()
|
| POST {base_url}/v1/chat/completions
| Headers: x-membrain-api-key, x-membrain-user-id, x-membrain-project
| Body: { model, messages, stream, ...kwargs }
|
v
Membrain Gateway
| PII Detection -> Rate Limiting -> Budget Check -> Cache -> Knowledge
|
v
LLM Provider (OpenAI, Anthropic, Ollama, LiteLLM, etc.)
|
v
Membrain Gateway
| Cache Store -> PII Restoration -> Audit Log
|
v
MembrainClient -> ChatCompletion / Generator[ChatCompletionChunk]