Transactions & Pipelines

Two distinct tools that are often confused:

  • $transaction — ACID atomicity across multiple statements. One BEGIN / COMMIT pair, rollback on throw.
  • pipeline — throughput. N independent queries sent in one TCP round-trip via the Postgres extended-query protocol. Not atomic.

Use transactions when you need all-or-nothing semantics. Use pipelines when you need latency.

$transaction — callback form

Pass a callback that receives a transactional client. Throw to roll back, return to commit.

const result = await db.$transaction(async (tx) => {
  const user = await tx.users.create({
    data: { email: 'alice@example.com', name: 'Alice' },
  });
 
  await tx.posts.create({
    data: { userId: user.id, title: 'Hello' },
  });
 
  return user;
});
// result is the User returned by the callback

The tx client has the same surface as db — every table accessor, every method. Queries must use tx, not db, or they'll run on a different connection outside the transaction.

Isolation levels

Pass isolationLevel to set the transaction's isolation on BEGIN:

await db.$transaction(
  async (tx) => {
    const row = await tx.inventory.findUnique({ where: { sku: 'ABC' } });
    await tx.inventory.update({
      where: { sku: 'ABC' },
      data: { stock: { decrement: 1 } },
    });
  },
  { isolationLevel: 'Serializable' },
);

Supported: 'ReadUncommitted', 'ReadCommitted' (Postgres default), 'RepeatableRead', 'Serializable'.

When Serializable detects a conflict, it throws SerializationFailureError (TURBINE_E013) with isRetryable: true as const. See the retry section below.

Timeouts

Per-transaction timeout in milliseconds, enforced client-side: if the callback exceeds timeout, Turbine destroys the connection (aborting any in-flight statement server-side) and rolls back:

await db.$transaction(
  async (tx) => {
    await tx.reports.create({ data: { /* ... */ } });
  },
  { timeout: 5_000 },
);

A query that exceeds the timeout throws TimeoutError (TURBINE_E002) and the transaction rolls back.

Nested transactions — real SAVEPOINTs

Calling $transaction inside another $transaction opens a Postgres SAVEPOINT, not a no-op and not a new connection. The inner block can fail and recover independently:

await db.$transaction(async (tx) => {
  const user = await tx.users.create({ data: { email: 'a@b.c' } });
 
  try {
    await tx.$transaction(async (inner) => {
      await inner.auditLog.create({ data: { userId: user.id, action: 'signup' } });
      throw new Error('skip audit log');
    });
  } catch {
    // Inner block rolled back to SAVEPOINT; outer is still healthy
  }
 
  await tx.posts.create({ data: { userId: user.id, title: 'Hello' } });
  // user + post still commit
});

SAVEPOINT names are auto-generated with a counter so nesting is safe.

Multi-tenant queries with RLS session context

Set transaction-local Postgres settings (GUCs) so PostgreSQL Row-Level Security policies that call current_setting() filter rows for you. This is the clean way to do multi-tenant isolation: the database enforces the tenant boundary, not your application code.

Pass sessionContext to $transaction:

// Postgres policy: USING (tenant_id = current_setting('app.current_tenant')::int)
const rows = await db.$transaction(
  async (tx) => tx.documents.findMany(),
  { sessionContext: { 'app.current_tenant': tenantId } },
);
// Only this tenant's documents come back — the policy did the filtering

Each entry is applied as SELECT set_config(name, value, true) right after BEGIN, so the setting is scoped to the transaction and resets automatically on commit or rollback. Values may be strings, numbers, or booleans (numbers and booleans are coerced to strings, since GUCs are text). An invalid setting name throws ValidationError and rolls the transaction back before any query runs.

$withSession shorthand

For a single-purpose session, $withSession skips the options object — pass the context first, then the callback:

const rows = await db.$withSession(
  { 'app.current_tenant': tenantId },
  async (tx) => tx.documents.findMany(),
);

$withSession(ctx, fn) is exactly $transaction(fn, { sessionContext: ctx }). Use it when the only reason you're opening a transaction is to scope the RLS context.

Retryable errors

Two error classes carry readonly isRetryable = true as const — so TypeScript narrows them and you don't need a runtime field check.

CodeClassSQLSTATE
E012DeadlockError40P01
E013SerializationFailureError40001

Canonical retry loop:

import { DeadlockError, SerializationFailureError } from 'turbine-orm';
 
async function withRetry<T>(fn: () => Promise<T>, maxAttempts = 3): Promise<T> {
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (err) {
      const retryable = err instanceof DeadlockError || err instanceof SerializationFailureError;
      if (!retryable || attempt === maxAttempts) throw err;
      await new Promise((r) => setTimeout(r, 50 * 2 ** (attempt - 1)));
    }
  }
  throw new Error('unreachable');
}
 
await withRetry(() =>
  db.$transaction(
    async (tx) => {
      /* ... */
    },
    { isolationLevel: 'Serializable' },
  ),
);

The as const on isRetryable means err.isRetryable is the literal type true on those classes and never narrows to false — your control flow type-checks without casts.

pipeline — N queries, 1 round-trip

A pipeline bundles independent queries and sends them together using the Postgres extended-query pipeline protocol. Each query gets its own parsed SQL, own params, own typed result. No transactional semantics — if query 3 fails, queries 1, 2, 4, 5 still ran.

const [user, posts, commentCount, orgs, latestLogin] = await db.pipeline([
  db.users.buildFindUnique({ where: { id: 1 } }),
  db.posts.buildFindMany({ where: { userId: 1 }, limit: 10 }),
  db.comments.buildCount({ where: { userId: 1 } }),
  db.organizations.buildFindMany({ where: { ownerId: 1 } }),
  db.sessions.buildFindFirst({ where: { userId: 1 }, orderBy: { createdAt: 'desc' } }),
]);

Each build* method returns a DeferredQuery<T>{ sql, params, transform, tag }. The pipeline driver writes them all to the wire, reads all responses, applies each transform.

Result type is a tuple that matches the input order and carries the per-query return type.

When pipeline wins

  • Edge runtimes. One round-trip to Neon (~35 ms) instead of five (~175 ms).
  • Dashboard loads. Five independent widget queries → one round-trip.
  • Fan-out reads. Load the user + their N related collections for a profile page.

When pipeline loses

  • You need atomicity. Use $transaction.
  • One query depends on another's result. Pipeline queries must be independent — they're all submitted before any response is read.
  • A query might fail. PipelineError (TURBINE_E014) wraps partial-success cases; decide how you want to handle them.

Combining — pipeline inside a transaction

Rarely useful but possible: run a pipeline through tx for batched reads inside a transactional block. The pipeline still uses one round-trip; the transaction still wraps them in BEGIN / COMMIT.

await db.$transaction(async (tx) => {
  const [user, posts] = await tx.pipeline([
    tx.users.buildFindUnique({ where: { id: 1 } }),
    tx.posts.buildFindMany({ where: { userId: 1 } }),
  ]);
  // ...
});

See also

  • Relations — reads that fan out via with vs pipeline.
  • Typed ErrorsDeadlockError, SerializationFailureError, TimeoutError, PipelineError.
  • Serverless — why pipelines matter more on the edge.