Skip to content

Python SDK

The Python SDK wraps the REST API with typing, retries, and the ergonomics you’d expect. It’s the same SDK that runs inside DataMaker scenarios, so anything you write locally moves into a scenario unchanged.

Install

Terminal window
pip install datamaker

Requires Python 3.10+. The SDK has no required runtime dependencies beyond httpx and pydantic.

Auth

The SDK reads DM_API_KEY from the environment by default:

Terminal window
export DM_API_KEY=sk-...

Override per-instance if you need to:

from datamaker import DataMaker
dm = DataMaker(api_key="sk-...", project_id="proj_xxx")

If you’re inside a scenario, the env vars are pre-populated; you just write DataMaker().

Templates

# List
for t in dm.templates.list():
print(t.id, t.name)
# Create
t = dm.templates.create(
name="Customer",
fields=[
{ "name": "first_name", "type": "first_name", "options": { "locale": "de_DE" } },
{ "name": "email", "type": "email" },
{ "name": "iban", "type": "iban", "options": { "country": "DE" } },
],
)
# Generate
rows = dm.generate(template_id=t.id, count=100)
# or by name:
rows = dm.template("Customer").generate(count=100)

rows is a list[dict]. For very large generations, prefer streaming:

for row in dm.template("Customer").stream(count=100_000):
insert_into_my_db(row)

The stream uses HTTP chunked transfer; memory stays flat.

Connections

pg = dm.connection("Postgres dev")
sap = dm.connection("conn_s4_sandbox")
# Insert
pg.insert(table="customers", rows=rows, on_conflict="update", key="id")
# Raw SQL
pg.execute("DELETE FROM customers WHERE created_at < NOW() - INTERVAL '90 days'")
# REST POST
sap.post(entity="A_BusinessPartner", rows=rows, mode="batch")
# SAP fetch
existing = sap.fetch(
entity="A_BusinessPartner",
filter="Country eq 'DE'",
select=["BusinessPartner", "BusinessPartnerName"],
top=100,
)

See Connections for what each method does.

Scenarios

# List
for s in dm.scenarios.list():
print(s.id, s.name)
# Trigger
run = dm.scenarios.run(name="seed_orders", params={"size": "500"}, wait=True)
print(run.status, run.duration_ms)
# Stream a long-running run
for line in dm.scenarios.stream_logs(scenario_id=s.id, run_id=run.id):
print(line)

Saved sets

dm.save_set(name="reg_bp_de_2026q2", rows=masked)
prior = dm.load_set("reg_bp_de_2026q2")

Workspace files

# Upload from local
dm.workspace_files.upload("rates.csv", path="lookups/rates.csv")
# Read inside a scenario
with dm.workspace_file("lookups/rates.csv").open("r") as f:
rates = csv.DictReader(f)

Errors

The SDK raises typed exceptions:

from datamaker import DataMaker
from datamaker.errors import (
DataMakerError, AuthError, RateLimitError, NotFoundError, ValidationError
)
try:
rows = dm.generate(template_id="tmpl_missing", count=100)
except NotFoundError as e:
print("template gone:", e.code, e.message)
except RateLimitError as e:
print("retry in", e.retry_after, "seconds")
except DataMakerError:
raise

Retries

The SDK retries transient errors (5xx, 429, network) by default with exponential backoff. To customise:

dm = DataMaker(retry={"max_attempts": 5, "backoff": "linear", "factor": 2})

Disable for a single call:

rows = dm.generate(template_id=..., count=100, retry=None)

Async

For async code, use AsyncDataMaker:

from datamaker import AsyncDataMaker
async def main():
async with AsyncDataMaker() as dm:
rows = await dm.generate(template_id=..., count=100)
await dm.connection("pg").insert(table="customers", rows=rows)

Same surface, every method returns an awaitable.

Logging

The SDK uses Python’s standard logging. Turn on debug to see the requests it makes:

import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("datamaker").setLevel(logging.DEBUG)

Source