Transactions & Pipelines
Two distinct tools that are often confused:
$transaction— ACID atomicity across multiple statements. OneBEGIN/COMMITpair, rollback on throw.pipeline— throughput. N independent queries sent in one TCP round-trip via the Postgres extended-query protocol. Not atomic.
Use transactions when you need all-or-nothing semantics. Use pipelines when you need latency.
$transaction — callback form
Pass a callback that receives a transactional client. Throw to roll back, return to commit.
const result = await db.$transaction(async (tx) => {
const user = await tx.users.create({
data: { email: 'alice@example.com', name: 'Alice' },
});
await tx.posts.create({
data: { userId: user.id, title: 'Hello' },
});
return user;
});
// result is the User returned by the callbackThe tx client has the same surface as db — every table accessor, every method. Queries must use tx, not db, or they'll run on a different connection outside the transaction.
Isolation levels
Pass isolationLevel to set the transaction's isolation on BEGIN:
await db.$transaction(
async (tx) => {
const row = await tx.inventory.findUnique({ where: { sku: 'ABC' } });
await tx.inventory.update({
where: { sku: 'ABC' },
data: { stock: { decrement: 1 } },
});
},
{ isolationLevel: 'serializable' },
);Supported: 'read committed' (Postgres default), 'repeatable read', 'serializable'.
When serializable detects a conflict, it throws SerializationFailureError (TURBINE_E013) with isRetryable: true as const. See the retry section below.
Timeouts
Per-transaction statement timeout, applied as SET LOCAL statement_timeout = '<ms>ms' inside the transaction:
await db.$transaction(
async (tx) => {
await tx.reports.create({ data: { /* ... */ } });
},
{ timeoutMs: 5_000 },
);A query that exceeds the timeout throws TimeoutError (TURBINE_E002) and the transaction rolls back.
Nested transactions — real SAVEPOINTs
Calling $transaction inside another $transaction opens a Postgres SAVEPOINT, not a no-op and not a new connection. The inner block can fail and recover independently:
await db.$transaction(async (tx) => {
const user = await tx.users.create({ data: { email: 'a@b.c' } });
try {
await tx.$transaction(async (inner) => {
await inner.auditLog.create({ data: { userId: user.id, action: 'signup' } });
throw new Error('skip audit log');
});
} catch {
// Inner block rolled back to SAVEPOINT; outer is still healthy
}
await tx.posts.create({ data: { userId: user.id, title: 'Hello' } });
// user + post still commit
});SAVEPOINT names are auto-generated with a counter so nesting is safe.
Retryable errors
Two error classes carry readonly isRetryable = true as const — so TypeScript narrows them and you don't need a runtime field check.
| Code | Class | SQLSTATE |
|---|---|---|
| E012 | DeadlockError | 40P01 |
| E013 | SerializationFailureError | 40001 |
Canonical retry loop:
import { DeadlockError, SerializationFailureError } from 'turbine-orm';
async function withRetry<T>(fn: () => Promise<T>, maxAttempts = 3): Promise<T> {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
const retryable = err instanceof DeadlockError || err instanceof SerializationFailureError;
if (!retryable || attempt === maxAttempts) throw err;
await new Promise((r) => setTimeout(r, 50 * 2 ** (attempt - 1)));
}
}
throw new Error('unreachable');
}
await withRetry(() =>
db.$transaction(
async (tx) => {
/* ... */
},
{ isolationLevel: 'serializable' },
),
);The as const on isRetryable means err.isRetryable is the literal type true on those classes and never narrows to false — your control flow type-checks without casts.
pipeline — N queries, 1 round-trip
A pipeline bundles independent queries and sends them together using the Postgres extended-query pipeline protocol. Each query gets its own parsed SQL, own params, own typed result. No transactional semantics — if query 3 fails, queries 1, 2, 4, 5 still ran.
const [user, posts, commentCount, orgs, latestLogin] = await db.pipeline([
db.users.buildFindUnique({ where: { id: 1 } }),
db.posts.buildFindMany({ where: { userId: 1 }, limit: 10 }),
db.comments.buildCount({ where: { userId: 1 } }),
db.organizations.buildFindMany({ where: { ownerId: 1 } }),
db.sessions.buildFindFirst({ where: { userId: 1 }, orderBy: { createdAt: 'desc' } }),
]);Each build* method returns a DeferredQuery<T> — { sql, params, transform, tag }. The pipeline driver writes them all to the wire, reads all responses, applies each transform.
Result type is a tuple that matches the input order and carries the per-query return type.
When pipeline wins
- Edge runtimes. One round-trip to Neon (~35 ms) instead of five (~175 ms).
- Dashboard loads. Five independent widget queries → one round-trip.
- Fan-out reads. Load the user + their N related collections for a profile page.
When pipeline loses
- You need atomicity. Use
$transaction. - One query depends on another's result. Pipeline queries must be independent — they're all submitted before any response is read.
- A query might fail.
PipelineError(TURBINE_E014) wraps partial-success cases; decide how you want to handle them.
Combining — pipeline inside a transaction
Rarely useful but possible: run a pipeline through tx for batched reads inside a transactional block. The pipeline still uses one round-trip; the transaction still wraps them in BEGIN / COMMIT.
await db.$transaction(async (tx) => {
const [user, posts] = await tx.pipeline([
tx.users.buildFindUnique({ where: { id: 1 } }),
tx.posts.buildFindMany({ where: { userId: 1 } }),
]);
// ...
});See also
- Relations — reads that fan out via
withvspipeline. - Typed Errors —
DeadlockError,SerializationFailureError,TimeoutError,PipelineError. - Serverless — why pipelines matter more on the edge.