Skip to content

Scenario API

The full SDK surface available inside a scenario. Everything starts with:

from datamaker import DataMaker
dm = DataMaker()

The dm instance is auto-configured with the current workspace, project, and API credentials.

Generation

dm.generate(template_id, count, overrides=None) -> list[dict]

Generate count rows from a template. Returns a list of dicts.

customers = dm.generate(template_id="tmpl_customer", count=100)

overrides lets you pin a value or substitute a generator for one or more fields:

customers = dm.generate(
template_id="tmpl_customer",
count=100,
overrides={
"country": "DE", # constant
"customer_id": dm.pick_from(existing_ids), # pick from a list
"balance": dm.callable(my_fn), # arbitrary callable per row
},
)

dm.template(name_or_id) -> Template

Resolve a template object you can call multiple times:

tmpl = dm.template("Customer")
batch_a = tmpl.generate(count=50)
batch_b = tmpl.generate(count=50, overrides={"country": "AT"})

dm.pick_from(iterable, weights=None)

A lazy callable that, when used as an override, picks one element per row.

overrides = { "tier": dm.pick_from(["free", "pro", "enterprise"], weights=[6, 3, 1]) }

Connections

dm.connection(name_or_id) -> Connection

Resolve a connection by name or ID:

pg = dm.connection("Postgres dev")
sap = dm.connection("conn_s4_sandbox")

Connection.insert(table, rows, on_conflict=None, key=None) (databases)

pg.insert(table="customers", rows=customers)
pg.insert(table="customers", rows=customers, on_conflict="update", key="id")

Connection.post(path_or_entity, rows, mode="single"|"batch") (REST / SAP)

sap.post(entity="A_BusinessPartner", rows=records, mode="batch")
rest.post(path="/customers", rows=customers)

Connection.fetch(...) (SAP / REST)

existing = sap.fetch(
entity="A_BusinessPartner",
filter="Country eq 'DE'",
select=["BusinessPartner", "BusinessPartnerName"],
top=200,
)

Connection.execute(sql, params=None) (databases)

For raw SQL. Returns a list of dicts.

top_customers = pg.execute(
"SELECT id, name FROM customers WHERE balance > %s LIMIT 10",
params=[1000],
)

Working with rows

DataMaker scenarios mostly pass plain list[dict] around. Helpers:

dm.transform(rows, fn) -> list[dict]

Map a function over each row. Same as [fn(r) for r in rows], but parallelisable.

dm.mask(rows, fields, strategy="replace"|"format-preserve"|"redact")

Apply a masking strategy to a fetched set:

real = sap.fetch(entity="A_BusinessPartner", top=200)
masked = dm.mask(real, fields=["TaxNumber1", "EmailAddress"], strategy="format-preserve")
sap.post(entity="A_BusinessPartner", rows=masked) # safe to re-import

Saved sets

Save a list of rows as a named regression set you can reload later:

dm.save_set(name="reg_bp_de_2026q2", rows=masked)
# In a different scenario:
prior = dm.load_set("reg_bp_de_2026q2")

Sets are project-scoped and counted toward storage limits (see plans).

Run parameters

env = dm.params.get("environment", "dev")
size = int(dm.params.get("size", "100"))

Pass them in when triggering:

Terminal window
curl -X POST .../scenarios/$ID/run -d '{"params": {"environment": "staging", "size": "500"}}'

Logging

print() works — output is captured live. For structured logs use dm.log:

dm.log.info("seeded %d customers", len(customers))
dm.log.warn("skipping country=%s", c)
dm.log.error("failed: %s", err)

See Logs & retries for streaming, retention, and retry semantics.

Errors & retries

Scenarios fail loudly: any uncaught exception marks the run as failed. To make a step retryable, wrap it:

@dm.retry(max_attempts=3, backoff="exponential")
def push_batch(rows):
sap.post(entity="A_BusinessPartner", rows=rows, mode="batch")

The decorator retries on transient errors (connection reset, 5xx, OData 503) and gives up on the rest.

Sub-scenarios

result = dm.run_scenario(name="seed_customers", params={"size": 500})

Useful for chaining shared building blocks. The sub-run shows up as a child in the run log.