Scenario API
The full SDK surface available inside a scenario. Everything starts with:
from datamaker import DataMakerdm = DataMaker()The dm instance is auto-configured with the current workspace, project, and API
credentials.
Generation
dm.generate(template_id, count, overrides=None) -> list[dict]
Generate count rows from a template. Returns a list of dicts.
customers = dm.generate(template_id="tmpl_customer", count=100)overrides lets you pin a value or substitute a generator for one or more fields:
customers = dm.generate( template_id="tmpl_customer", count=100, overrides={ "country": "DE", # constant "customer_id": dm.pick_from(existing_ids), # pick from a list "balance": dm.callable(my_fn), # arbitrary callable per row },)dm.template(name_or_id) -> Template
Resolve a template object you can call multiple times:
tmpl = dm.template("Customer")batch_a = tmpl.generate(count=50)batch_b = tmpl.generate(count=50, overrides={"country": "AT"})dm.pick_from(iterable, weights=None)
A lazy callable that, when used as an override, picks one element per row.
overrides = { "tier": dm.pick_from(["free", "pro", "enterprise"], weights=[6, 3, 1]) }Connections
dm.connection(name_or_id) -> Connection
Resolve a connection by name or ID:
pg = dm.connection("Postgres dev")sap = dm.connection("conn_s4_sandbox")Connection.insert(table, rows, on_conflict=None, key=None) (databases)
pg.insert(table="customers", rows=customers)pg.insert(table="customers", rows=customers, on_conflict="update", key="id")Connection.post(path_or_entity, rows, mode="single"|"batch") (REST / SAP)
sap.post(entity="A_BusinessPartner", rows=records, mode="batch")rest.post(path="/customers", rows=customers)Connection.fetch(...) (SAP / REST)
existing = sap.fetch( entity="A_BusinessPartner", filter="Country eq 'DE'", select=["BusinessPartner", "BusinessPartnerName"], top=200,)Connection.execute(sql, params=None) (databases)
For raw SQL. Returns a list of dicts.
top_customers = pg.execute( "SELECT id, name FROM customers WHERE balance > %s LIMIT 10", params=[1000],)Working with rows
DataMaker scenarios mostly pass plain list[dict] around. Helpers:
dm.transform(rows, fn) -> list[dict]
Map a function over each row. Same as [fn(r) for r in rows], but parallelisable.
dm.mask(rows, fields, strategy="replace"|"format-preserve"|"redact")
Apply a masking strategy to a fetched set:
real = sap.fetch(entity="A_BusinessPartner", top=200)masked = dm.mask(real, fields=["TaxNumber1", "EmailAddress"], strategy="format-preserve")sap.post(entity="A_BusinessPartner", rows=masked) # safe to re-importSaved sets
Save a list of rows as a named regression set you can reload later:
dm.save_set(name="reg_bp_de_2026q2", rows=masked)
# In a different scenario:prior = dm.load_set("reg_bp_de_2026q2")Sets are project-scoped and counted toward storage limits (see plans).
Run parameters
env = dm.params.get("environment", "dev")size = int(dm.params.get("size", "100"))Pass them in when triggering:
curl -X POST .../scenarios/$ID/run -d '{"params": {"environment": "staging", "size": "500"}}'Logging
print() works — output is captured live. For structured logs use dm.log:
dm.log.info("seeded %d customers", len(customers))dm.log.warn("skipping country=%s", c)dm.log.error("failed: %s", err)See Logs & retries for streaming, retention, and retry semantics.
Errors & retries
Scenarios fail loudly: any uncaught exception marks the run as failed. To make a step retryable, wrap it:
@dm.retry(max_attempts=3, backoff="exponential")def push_batch(rows): sap.post(entity="A_BusinessPartner", rows=rows, mode="batch")The decorator retries on transient errors (connection reset, 5xx, OData 503) and gives up on the rest.
Sub-scenarios
result = dm.run_scenario(name="seed_customers", params={"size": 500})Useful for chaining shared building blocks. The sub-run shows up as a child in the run log.