Transactions & Pipelines

Two distinct tools that are often confused:

  • $transaction — ACID atomicity across multiple statements. One BEGIN / COMMIT pair, rollback on throw.
  • pipeline — throughput. N independent queries sent in one TCP round-trip via the Postgres extended-query protocol. Not atomic.

Use transactions when you need all-or-nothing semantics. Use pipelines when you need latency.

$transaction — callback form

Pass a callback that receives a transactional client. Throw to roll back, return to commit.

const result = await db.$transaction(async (tx) => {
  const user = await tx.users.create({
    data: { email: 'alice@example.com', name: 'Alice' },
  });
 
  await tx.posts.create({
    data: { userId: user.id, title: 'Hello' },
  });
 
  return user;
});
// result is the User returned by the callback

The tx client has the same surface as db — every table accessor, every method. Queries must use tx, not db, or they'll run on a different connection outside the transaction.

Isolation levels

Pass isolationLevel to set the transaction's isolation on BEGIN:

await db.$transaction(
  async (tx) => {
    const row = await tx.inventory.findUnique({ where: { sku: 'ABC' } });
    await tx.inventory.update({
      where: { sku: 'ABC' },
      data: { stock: { decrement: 1 } },
    });
  },
  { isolationLevel: 'serializable' },
);

Supported: 'read committed' (Postgres default), 'repeatable read', 'serializable'.

When serializable detects a conflict, it throws SerializationFailureError (TURBINE_E013) with isRetryable: true as const. See the retry section below.

Timeouts

Per-transaction statement timeout, applied as SET LOCAL statement_timeout = '<ms>ms' inside the transaction:

await db.$transaction(
  async (tx) => {
    await tx.reports.create({ data: { /* ... */ } });
  },
  { timeoutMs: 5_000 },
);

A query that exceeds the timeout throws TimeoutError (TURBINE_E002) and the transaction rolls back.

Nested transactions — real SAVEPOINTs

Calling $transaction inside another $transaction opens a Postgres SAVEPOINT, not a no-op and not a new connection. The inner block can fail and recover independently:

await db.$transaction(async (tx) => {
  const user = await tx.users.create({ data: { email: 'a@b.c' } });
 
  try {
    await tx.$transaction(async (inner) => {
      await inner.auditLog.create({ data: { userId: user.id, action: 'signup' } });
      throw new Error('skip audit log');
    });
  } catch {
    // Inner block rolled back to SAVEPOINT; outer is still healthy
  }
 
  await tx.posts.create({ data: { userId: user.id, title: 'Hello' } });
  // user + post still commit
});

SAVEPOINT names are auto-generated with a counter so nesting is safe.

Retryable errors

Two error classes carry readonly isRetryable = true as const — so TypeScript narrows them and you don't need a runtime field check.

CodeClassSQLSTATE
E012DeadlockError40P01
E013SerializationFailureError40001

Canonical retry loop:

import { DeadlockError, SerializationFailureError } from 'turbine-orm';
 
async function withRetry<T>(fn: () => Promise<T>, maxAttempts = 3): Promise<T> {
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (err) {
      const retryable = err instanceof DeadlockError || err instanceof SerializationFailureError;
      if (!retryable || attempt === maxAttempts) throw err;
      await new Promise((r) => setTimeout(r, 50 * 2 ** (attempt - 1)));
    }
  }
  throw new Error('unreachable');
}
 
await withRetry(() =>
  db.$transaction(
    async (tx) => {
      /* ... */
    },
    { isolationLevel: 'serializable' },
  ),
);

The as const on isRetryable means err.isRetryable is the literal type true on those classes and never narrows to false — your control flow type-checks without casts.

pipeline — N queries, 1 round-trip

A pipeline bundles independent queries and sends them together using the Postgres extended-query pipeline protocol. Each query gets its own parsed SQL, own params, own typed result. No transactional semantics — if query 3 fails, queries 1, 2, 4, 5 still ran.

const [user, posts, commentCount, orgs, latestLogin] = await db.pipeline([
  db.users.buildFindUnique({ where: { id: 1 } }),
  db.posts.buildFindMany({ where: { userId: 1 }, limit: 10 }),
  db.comments.buildCount({ where: { userId: 1 } }),
  db.organizations.buildFindMany({ where: { ownerId: 1 } }),
  db.sessions.buildFindFirst({ where: { userId: 1 }, orderBy: { createdAt: 'desc' } }),
]);

Each build* method returns a DeferredQuery<T>{ sql, params, transform, tag }. The pipeline driver writes them all to the wire, reads all responses, applies each transform.

Result type is a tuple that matches the input order and carries the per-query return type.

When pipeline wins

  • Edge runtimes. One round-trip to Neon (~35 ms) instead of five (~175 ms).
  • Dashboard loads. Five independent widget queries → one round-trip.
  • Fan-out reads. Load the user + their N related collections for a profile page.

When pipeline loses

  • You need atomicity. Use $transaction.
  • One query depends on another's result. Pipeline queries must be independent — they're all submitted before any response is read.
  • A query might fail. PipelineError (TURBINE_E014) wraps partial-success cases; decide how you want to handle them.

Combining — pipeline inside a transaction

Rarely useful but possible: run a pipeline through tx for batched reads inside a transactional block. The pipeline still uses one round-trip; the transaction still wraps them in BEGIN / COMMIT.

await db.$transaction(async (tx) => {
  const [user, posts] = await tx.pipeline([
    tx.users.buildFindUnique({ where: { id: 1 } }),
    tx.posts.buildFindMany({ where: { userId: 1 } }),
  ]);
  // ...
});

See also

  • Relations — reads that fan out via with vs pipeline.
  • Typed ErrorsDeadlockError, SerializationFailureError, TimeoutError, PipelineError.
  • Serverless — why pipelines matter more on the edge.