Skip to content

Python Client SDK

Overview

membrain-client is a lightweight Python client for the Membrain AI Safety Gateway. It acts as a drop-in replacement for the OpenAI Python SDK, letting you route all LLM traffic through Membrain with a one-line change. Every request passes through Membrain's full middleware pipeline -- PII detection, rate limiting, budget enforcement, caching, and audit logging -- without requiring any changes to your application logic.

The SDK provides both synchronous (MembrainClient) and asynchronous (AsyncMembrainClient) clients, mirroring the openai.OpenAI and openai.AsyncOpenAI interfaces respectively.

Key characteristics:

  • Mirrors the client.chat.completions.create() calling convention from the OpenAI SDK
  • Built on httpx with no dependency on the openai package
  • Supports streaming (SSE) and non-streaming completions
  • Supports multi-tenant isolation via project and user headers
  • Pure dataclass response models (no Pydantic dependency in the client)
  • Python 3.10+

Installation

pip install membrain-client

Or install from source:

cd clients/python
pip install .

For development (includes pytest and pytest-asyncio):

pip install ".[dev]"

Dependencies

Package Version
httpx >= 0.27

Quick Start

The migration from the OpenAI SDK is a single-line change to your import and client constructor:

# Before -- calling OpenAI directly
from openai import OpenAI

client = OpenAI()
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)
# After -- routed through Membrain
from membrain_client import MembrainClient

client = MembrainClient(base_url="http://localhost:8001")
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)

The interface is intentionally identical: client.chat.completions.create(...) works the same way in both cases.


Client Initialization

MembrainClient (synchronous)

from membrain_client import MembrainClient

client = MembrainClient(
    base_url="http://localhost:8001",  # Membrain gateway URL
    api_key="ck_live_abc123",          # Membrain API key (optional)
    user_id="user-42",                 # User identifier for audit/rate-limit (optional)
    project="my-project",              # Project scope for multi-tenancy (optional)
    timeout=120.0,                     # Request timeout in seconds (default: 120)
)

AsyncMembrainClient (asynchronous)

from membrain_client import AsyncMembrainClient

client = AsyncMembrainClient(
    base_url="http://localhost:8001",
    api_key="ck_live_abc123",
    user_id="user-42",
    project="my-project",
    timeout=120.0,
)

Constructor Parameters

Parameter Type Default Description
base_url str "http://localhost:8001" URL of the Membrain gateway. Trailing slashes are stripped automatically.
api_key str \| None None Membrain API key. Sent as the x-membrain-api-key header. When auth is enabled on the gateway, this is required.
user_id str \| None None User identifier. Sent as x-membrain-user-id. Used for per-user rate limiting, budget tracking, and audit logs.
project str \| None None Project name. Sent as x-membrain-project. Scopes requests to a specific project for multi-tenant deployments.
timeout float 120.0 HTTP request timeout in seconds. Applies to both connection and read timeouts.

When api_key, user_id, or project are None, the corresponding header is omitted entirely from the request.


Chat Completions

Basic Usage

from membrain_client import MembrainClient

client = MembrainClient(base_url="http://localhost:8001", api_key="ck_live_abc123")

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain PII detection in two sentences."},
    ],
)

# Access the response
print(response.choices[0].message.content)
print(response.choices[0].message.role)     # "assistant"
print(response.choices[0].finish_reason)     # "stop"

# Token usage
print(response.usage.prompt_tokens)
print(response.usage.completion_tokens)
print(response.usage.total_tokens)

With Optional Parameters

Any extra keyword arguments are forwarded in the request body, allowing you to pass model-specific parameters like temperature and max_tokens:

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Write a haiku about security."}],
    temperature=0.7,
    max_tokens=100,
)

Response Objects

The SDK returns Python dataclasses (not Pydantic models), keeping the client dependency-free:

ChatCompletion

Field Type Description
id str Unique completion identifier (e.g., "chatcmpl-abc123")
model str Model that generated the response
choices list[Choice] List of completion choices
usage Usage Token usage statistics

Choice

Field Type Description
index int Index of this choice
message ChatMessage The assistant's response message
finish_reason str \| None Why generation stopped ("stop", "length", etc.)

ChatMessage

Field Type Description
role str Message role ("assistant", "user", "system")
content str \| None Message text content

Usage

Field Type Description
prompt_tokens int Tokens in the prompt
completion_tokens int Tokens in the completion
total_tokens int Sum of prompt and completion tokens

Streaming

Pass stream=True to receive an incremental generator of ChatCompletionChunk objects. The SDK handles SSE parsing internally.

Synchronous Streaming

from membrain_client import MembrainClient

client = MembrainClient(base_url="http://localhost:8001")

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Tell me a story."}],
    stream=True,
)

for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)

    # Detect when streaming is complete
    if chunk.choices[0].finish_reason == "stop":
        print()  # newline after final chunk

Asynchronous Streaming

import asyncio
from membrain_client import AsyncMembrainClient

async def main():
    client = AsyncMembrainClient(base_url="http://localhost:8001")

    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Tell me a story."}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            print(delta.content, end="", flush=True)

    await client.aclose()

asyncio.run(main())

Streaming Response Objects

ChatCompletionChunk

Field Type Description
id str Completion identifier (same across all chunks)
model str Model name
choices list[StreamChoice] Chunk choices

StreamChoice

Field Type Description
index int Choice index
delta StreamDelta Incremental content for this chunk
finish_reason str \| None Set on the final chunk (e.g., "stop")

StreamDelta

Field Type Description
role str \| None Present only in the first chunk ("assistant")
content str \| None Incremental text content

PII Badge Handling

Membrain's PII detection runs transparently in the middleware pipeline. The gateway intercepts requests, detects and replaces PII tokens (e.g., email addresses, phone numbers, SSNs) with placeholders before sending to the LLM provider, then restores original values in the response before returning to the client.

From the SDK consumer's perspective, this process is invisible -- responses arrive with original PII values intact. The PII detection happens server-side, and the SDK itself does not expose PII metadata in the response dataclasses.

To inspect PII activity for your requests, use the Membrain gateway's audit and reporting APIs:

import httpx

# Query the audit log for PII findings
response = httpx.get(
    "http://localhost:8001/api/audit",
    headers={"x-membrain-api-key": "ck_live_abc123"},
)
audit_entries = response.json()

# Each entry includes pii_findings_count and pii_labels
for entry in audit_entries:
    if entry.get("pii_findings_count", 0) > 0:
        print(f"Model: {entry['model']}, PII detected: {entry['pii_findings_count']}")

The gateway also returns rate limit information in response headers (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset) when rate limiting is active. These headers are available on the raw HTTP response but are not currently surfaced on the SDK's dataclass objects.


Error Handling

The SDK uses httpx for HTTP communication. Errors propagate as httpx exceptions, which you can catch at the appropriate granularity.

HTTP Errors

import httpx
from membrain_client import MembrainClient

client = MembrainClient(base_url="http://localhost:8001", api_key="ck_live_abc123")

try:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}],
    )
except httpx.HTTPStatusError as e:
    # Raised for 4xx and 5xx responses (via raise_for_status())
    print(f"HTTP {e.response.status_code}: {e.response.text}")
except httpx.ConnectError:
    # Gateway is unreachable
    print("Cannot connect to Membrain gateway")
except httpx.TimeoutException:
    # Request exceeded the configured timeout
    print("Request timed out")

Common HTTP Status Codes

Status Meaning Cause
401 Unauthorized Missing or invalid api_key when auth is enabled
402 Payment Required Budget limit exceeded for the user/project
429 Too Many Requests Rate limit exceeded. Check the Retry-After header
502 Bad Gateway All upstream LLM providers failed

Retry Behavior

The SDK does not implement automatic retries. You are responsible for implementing retry logic appropriate for your use case:

import time
import httpx
from membrain_client import MembrainClient

client = MembrainClient(base_url="http://localhost:8001")

def chat_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.chat.completions.create(
                model="gpt-4",
                messages=messages,
            )
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 5))
                time.sleep(retry_after)
                continue
            raise
        except (httpx.ConnectError, httpx.TimeoutException):
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
                continue
            raise

Advanced Usage

Context Managers

Both clients support context manager protocols to ensure the underlying HTTP connection is properly closed:

# Synchronous
with MembrainClient(base_url="http://localhost:8001") as client:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}],
    )
# Connection is closed automatically
# Asynchronous
async with AsyncMembrainClient(base_url="http://localhost:8001") as client:
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}],
    )
# Connection is closed automatically

Async Support

The AsyncMembrainClient is a fully asynchronous client built on httpx.AsyncClient. It shares the same constructor signature and API surface as MembrainClient, but all completion methods are async:

import asyncio
from membrain_client import AsyncMembrainClient

async def main():
    client = AsyncMembrainClient(
        base_url="http://localhost:8001",
        api_key="ck_live_abc123",
        user_id="user-42",
    )

    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}],
    )
    print(response.choices[0].message.content)

    await client.aclose()

asyncio.run(main())
Sync method Async equivalent
client.close() await client.aclose()
with client: async with client:
for chunk in stream: async for chunk in stream:

Routing Headers

Membrain supports intelligent routing via request headers. While the SDK does not have dedicated parameters for these, you can influence routing by setting headers on the underlying httpx client:

from membrain_client import MembrainClient

client = MembrainClient(base_url="http://localhost:8001")

# Access the underlying httpx.Client to set routing headers
client._client.headers.update({
    "x-membrain-tier": "quality",       # Routing tier: "quality", "balanced", "speed"
    "x-membrain-private": "true",       # Force private/local model routing
    "x-membrain-max-cost": "0.01",      # Max cost per 1K tokens
})

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}],
)

Passing Extra Model Parameters

Any keyword arguments beyond model, messages, and stream are forwarded directly in the JSON request body. This lets you use provider-specific parameters:

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}],
    temperature=0.5,
    max_tokens=200,
    top_p=0.9,
    frequency_penalty=0.5,
    presence_penalty=0.5,
    stop=["\n"],
)

API Reference

Exports

The membrain_client package exports the following names:

from membrain_client import (
    MembrainClient,        # Synchronous client
    AsyncMembrainClient,   # Asynchronous client
    ChatCompletion,        # Non-streaming response dataclass
    ChatMessage,           # Message dataclass (role + content)
)

Additional dataclasses available from membrain_client.client:

from membrain_client.client import (
    Choice,                # Non-streaming choice (index, message, finish_reason)
    Usage,                 # Token usage (prompt_tokens, completion_tokens, total_tokens)
    ChatCompletionChunk,   # Streaming response chunk dataclass
    StreamChoice,          # Streaming choice (index, delta, finish_reason)
    StreamDelta,           # Streaming delta (role, content)
)

Request Flow

Your Application
    |
    v
MembrainClient.chat.completions.create()
    |
    |  POST {base_url}/v1/chat/completions
    |  Headers: x-membrain-api-key, x-membrain-user-id, x-membrain-project
    |  Body: { model, messages, stream, ...kwargs }
    |
    v
Membrain Gateway
    |  PII Detection -> Rate Limiting -> Budget Check -> Cache -> Knowledge
    |
    v
LLM Provider (OpenAI, Anthropic, Ollama, LiteLLM, etc.)
    |
    v
Membrain Gateway
    |  Cache Store -> PII Restoration -> Audit Log
    |
    v
MembrainClient -> ChatCompletion / Generator[ChatCompletionChunk]