Transactions & Pipeline
Turbine provides two mechanisms for executing multiple queries efficiently: transactions for atomicity and pipelines for batching independent queries into a single round-trip.
Transactions
$transaction (Recommended)
The $transaction() method provides a Prisma-style typed transaction API. The callback receives a TransactionClient with the same table accessors as the main client:
await db.$transaction(async (tx) => {
const user = await tx.users.create({
data: { email: 'alice@example.com', name: 'Alice', orgId: 1 },
});
await tx.posts.create({
data: {
userId: user.id,
orgId: 1,
title: 'First Post',
content: 'Hello world!',
},
});
});
// Both inserts succeed or both are rolled back
All queries within the callback run on a dedicated connection inside a BEGIN/COMMIT block. If the callback throws, the transaction is automatically rolled back.
Isolation Levels
await db.$transaction(async (tx) => {
// Reads are repeatable within this transaction
const user = await tx.users.findUnique({ where: { id: 1 } });
// ... more operations
}, {
isolationLevel: 'RepeatableRead',
});
Available isolation levels:
| Level | Description |
|---|---|
| 'ReadUncommitted' | Allows dirty reads (rarely used) |
| 'ReadCommitted' | Default Postgres behavior |
| 'RepeatableRead' | Snapshot isolation -- reads are consistent within the transaction |
| 'Serializable' | Full serializability -- transactions appear to execute sequentially |
Timeouts
await db.$transaction(async (tx) => {
// If this takes longer than 5 seconds, the transaction is rolled back
await tx.users.updateMany({
where: { lastLoginAt: { lt: cutoffDate } },
data: { status: 'inactive' },
});
}, {
timeout: 5000, // milliseconds
});
Nested Transactions (SAVEPOINTs)
The TransactionClient supports nested transactions via Postgres SAVEPOINTs:
await db.$transaction(async (tx) => {
await tx.users.create({
data: { email: 'alice@example.com', name: 'Alice', orgId: 1 },
});
try {
// Nested transaction -- uses SAVEPOINT
await tx.$transaction(async (innerTx) => {
await innerTx.posts.create({
data: { userId: 1, title: 'Post', orgId: 1 },
});
throw new Error('Something went wrong');
});
} catch {
// Inner transaction rolled back to savepoint
// Outer transaction continues -- Alice was still created
}
await tx.posts.create({
data: { userId: 1, title: 'Safe Post', orgId: 1 },
});
});
Raw SQL in Transactions
You can execute raw SQL within a transaction:
await db.$transaction(async (tx) => {
await tx.raw`UPDATE users SET login_count = login_count + 1 WHERE id = ${userId}`;
await tx.raw`INSERT INTO audit_log (user_id, action) VALUES (${userId}, 'login')`;
});
Raw Transaction (Legacy)
For direct access to the pg.PoolClient:
await db.transaction(async (client) => {
await client.query('INSERT INTO users (name) VALUES ($1)', ['Alice']);
await client.query('INSERT INTO posts (user_id, title) VALUES ($1, $2)', [1, 'Hello']);
});
This gives you full control but no typed table accessors. Use $transaction() instead for most use cases.
Pipeline
The pipeline API batches multiple independent queries into a single database round-trip.
Basic Usage
const [user, postCount, recentPosts] = await db.pipeline(
db.users.buildFindUnique({ where: { id: 1 } }),
db.posts.buildCount({ where: { orgId: 1 } }),
db.posts.buildFindMany({
where: { userId: 1 },
orderBy: { createdAt: 'desc' },
limit: 5,
}),
);
// Fully typed results:
// user: User | null
// postCount: number
// recentPosts: Post[]
Pipeline acquires a single connection from the pool, wraps all queries in a transaction for consistency, and returns typed results. This saves N-1 connection checkouts and N-1 network round-trips.
How It Works
Without pipeline:
Query 1: acquire connection -> send -> wait -> receive -> release
Query 2: acquire connection -> send -> wait -> receive -> release
Query 3: acquire connection -> send -> wait -> receive -> release
Total: 3 connection checkouts + 3 round-trips
With pipeline:
acquire connection -> send Q1,Q2,Q3 -> wait -> receive R1,R2,R3 -> release
Total: 1 connection checkout + 1 round-trip
Pipeline with Nested Queries
Pipelines compose with nested queries. Each pipelined query can include with clauses:
const [myProfile, teamActivity, orgOverview] = await db.pipeline(
db.users.buildFindUnique({
where: { id: myId },
with: { posts: { with: { comments: true }, limit: 10 } },
}),
db.users.buildFindMany({
where: { orgId, role: 'member' },
limit: 20,
with: { posts: { limit: 3 } },
}),
db.organizations.buildFindUnique({
where: { id: orgId },
with: { users: true },
}),
);
Three pipelined queries, each with nested relations. Total round-trips: 1. Total SQL queries: 3 (each is a single json_agg query).
Build Methods
Every query method has a corresponding build* method that returns a DeferredQuery descriptor:
const deferred = db.users.buildFindMany({
where: { orgId: 1 },
limit: 10,
});
// deferred.sql => 'SELECT users.* FROM users WHERE org_id = $1 LIMIT 10'
// deferred.params => [1]
// deferred.tag => 'users.findMany'
// deferred.transform => (result) => T[]
Build methods do not execute the query. They return the SQL, parameters, and a transform function. Pass them to db.pipeline() to execute.
Available build methods:
buildFindUnique(args)buildFindMany(args)buildCreate(args)buildCreateMany(args)buildUpdate(args)buildUpdateMany(args)buildDelete(args)buildDeleteMany(args)buildUpsert(args)buildCount(args)buildGroupBy(args)buildAggregate(args)
Real-World Example: Dashboard Page
// app/dashboard/page.tsx (Next.js Server Component)
export default async function Dashboard() {
const session = await auth();
const [user, postCount, recentPosts, teamMembers, orgStats] = await db.pipeline(
db.users.buildFindUnique({ where: { id: session.userId } }),
db.posts.buildCount({ where: { userId: session.userId } }),
db.posts.buildFindMany({
where: { userId: session.userId },
orderBy: { createdAt: 'desc' },
limit: 5,
with: { comments: { limit: 3 } },
}),
db.users.buildFindMany({
where: { orgId: session.orgId },
limit: 10,
}),
db.users.buildCount({ where: { orgId: session.orgId } }),
);
// 5 queries, 1 round-trip, ~7ms total
return <DashboardUI user={user} postCount={postCount} ... />;
}
Pipeline vs Transaction
| Feature | Pipeline | Transaction |
|---|---|---|
| Purpose | Batch independent queries | Atomic operations |
| Consistency | Wrapped in a transaction | Full ACID |
| Queries | Independent (no data dependencies) | Can depend on previous results |
| API | db.pipeline(q1, q2, q3) | db.$transaction(async (tx) => ...) |
| Round-trips | 1 | 1 (for the transaction) |
Use pipeline when queries are independent (dashboard data loading). Use transaction when operations must be atomic (creating related records).