Transactions & Pipeline

Turbine provides two mechanisms for executing multiple queries efficiently: transactions for atomicity and pipelines for batching independent queries into a single round-trip.


Transactions

$transaction (Recommended)

The $transaction() method provides a Prisma-style typed transaction API. The callback receives a TransactionClient with the same table accessors as the main client:

TypeScript
await db.$transaction(async (tx) => {
  const user = await tx.users.create({
    data: { email: 'alice@example.com', name: 'Alice', orgId: 1 },
  });

  await tx.posts.create({
    data: {
      userId: user.id,
      orgId: 1,
      title: 'First Post',
      content: 'Hello world!',
    },
  });
});
// Both inserts succeed or both are rolled back

All queries within the callback run on a dedicated connection inside a BEGIN/COMMIT block. If the callback throws, the transaction is automatically rolled back.

Isolation Levels

TypeScript
await db.$transaction(async (tx) => {
  // Reads are repeatable within this transaction
  const user = await tx.users.findUnique({ where: { id: 1 } });
  // ... more operations
}, {
  isolationLevel: 'RepeatableRead',
});

Available isolation levels:

| Level | Description | |---|---| | 'ReadUncommitted' | Allows dirty reads (rarely used) | | 'ReadCommitted' | Default Postgres behavior | | 'RepeatableRead' | Snapshot isolation -- reads are consistent within the transaction | | 'Serializable' | Full serializability -- transactions appear to execute sequentially |

Timeouts

TypeScript
await db.$transaction(async (tx) => {
  // If this takes longer than 5 seconds, the transaction is rolled back
  await tx.users.updateMany({
    where: { lastLoginAt: { lt: cutoffDate } },
    data: { status: 'inactive' },
  });
}, {
  timeout: 5000, // milliseconds
});

Nested Transactions (SAVEPOINTs)

The TransactionClient supports nested transactions via Postgres SAVEPOINTs:

TypeScript
await db.$transaction(async (tx) => {
  await tx.users.create({
    data: { email: 'alice@example.com', name: 'Alice', orgId: 1 },
  });

  try {
    // Nested transaction -- uses SAVEPOINT
    await tx.$transaction(async (innerTx) => {
      await innerTx.posts.create({
        data: { userId: 1, title: 'Post', orgId: 1 },
      });
      throw new Error('Something went wrong');
    });
  } catch {
    // Inner transaction rolled back to savepoint
    // Outer transaction continues -- Alice was still created
  }

  await tx.posts.create({
    data: { userId: 1, title: 'Safe Post', orgId: 1 },
  });
});

Raw SQL in Transactions

You can execute raw SQL within a transaction:

TypeScript
await db.$transaction(async (tx) => {
  await tx.raw`UPDATE users SET login_count = login_count + 1 WHERE id = ${userId}`;
  await tx.raw`INSERT INTO audit_log (user_id, action) VALUES (${userId}, 'login')`;
});

Raw Transaction (Legacy)

For direct access to the pg.PoolClient:

TypeScript
await db.transaction(async (client) => {
  await client.query('INSERT INTO users (name) VALUES ($1)', ['Alice']);
  await client.query('INSERT INTO posts (user_id, title) VALUES ($1, $2)', [1, 'Hello']);
});

This gives you full control but no typed table accessors. Use $transaction() instead for most use cases.


Pipeline

The pipeline API batches multiple independent queries into a single database round-trip.

Basic Usage

TypeScript
const [user, postCount, recentPosts] = await db.pipeline(
  db.users.buildFindUnique({ where: { id: 1 } }),
  db.posts.buildCount({ where: { orgId: 1 } }),
  db.posts.buildFindMany({
    where: { userId: 1 },
    orderBy: { createdAt: 'desc' },
    limit: 5,
  }),
);

// Fully typed results:
// user:        User | null
// postCount:   number
// recentPosts: Post[]

Pipeline acquires a single connection from the pool, wraps all queries in a transaction for consistency, and returns typed results. This saves N-1 connection checkouts and N-1 network round-trips.

How It Works

Without pipeline:

Query 1: acquire connection -> send -> wait -> receive -> release
Query 2: acquire connection -> send -> wait -> receive -> release
Query 3: acquire connection -> send -> wait -> receive -> release
Total: 3 connection checkouts + 3 round-trips

With pipeline:

acquire connection -> send Q1,Q2,Q3 -> wait -> receive R1,R2,R3 -> release
Total: 1 connection checkout + 1 round-trip

Pipeline with Nested Queries

Pipelines compose with nested queries. Each pipelined query can include with clauses:

TypeScript
const [myProfile, teamActivity, orgOverview] = await db.pipeline(
  db.users.buildFindUnique({
    where: { id: myId },
    with: { posts: { with: { comments: true }, limit: 10 } },
  }),
  db.users.buildFindMany({
    where: { orgId, role: 'member' },
    limit: 20,
    with: { posts: { limit: 3 } },
  }),
  db.organizations.buildFindUnique({
    where: { id: orgId },
    with: { users: true },
  }),
);

Three pipelined queries, each with nested relations. Total round-trips: 1. Total SQL queries: 3 (each is a single json_agg query).

Build Methods

Every query method has a corresponding build* method that returns a DeferredQuery descriptor:

TypeScript
const deferred = db.users.buildFindMany({
  where: { orgId: 1 },
  limit: 10,
});

// deferred.sql     => 'SELECT users.* FROM users WHERE org_id = $1 LIMIT 10'
// deferred.params  => [1]
// deferred.tag     => 'users.findMany'
// deferred.transform => (result) => T[]

Build methods do not execute the query. They return the SQL, parameters, and a transform function. Pass them to db.pipeline() to execute.

Available build methods:

  • buildFindUnique(args)
  • buildFindMany(args)
  • buildCreate(args)
  • buildCreateMany(args)
  • buildUpdate(args)
  • buildUpdateMany(args)
  • buildDelete(args)
  • buildDeleteMany(args)
  • buildUpsert(args)
  • buildCount(args)
  • buildGroupBy(args)
  • buildAggregate(args)

Real-World Example: Dashboard Page

TypeScript
// app/dashboard/page.tsx (Next.js Server Component)
export default async function Dashboard() {
  const session = await auth();

  const [user, postCount, recentPosts, teamMembers, orgStats] = await db.pipeline(
    db.users.buildFindUnique({ where: { id: session.userId } }),
    db.posts.buildCount({ where: { userId: session.userId } }),
    db.posts.buildFindMany({
      where: { userId: session.userId },
      orderBy: { createdAt: 'desc' },
      limit: 5,
      with: { comments: { limit: 3 } },
    }),
    db.users.buildFindMany({
      where: { orgId: session.orgId },
      limit: 10,
    }),
    db.users.buildCount({ where: { orgId: session.orgId } }),
  );

  // 5 queries, 1 round-trip, ~7ms total
  return <DashboardUI user={user} postCount={postCount} ... />;
}

Pipeline vs Transaction

| Feature | Pipeline | Transaction | |---|---|---| | Purpose | Batch independent queries | Atomic operations | | Consistency | Wrapped in a transaction | Full ACID | | Queries | Independent (no data dependencies) | Can depend on previous results | | API | db.pipeline(q1, q2, q3) | db.$transaction(async (tx) => ...) | | Round-trips | 1 | 1 (for the transaction) |

Use pipeline when queries are independent (dashboard data loading). Use transaction when operations must be atomic (creating related records).