驱动适配器实现指南(Prisma 7 Driver Adapter Implementation Guide)
概述
本指南提供为 Prisma ORM v7 实现自定义驱动适配器(Driver Adapter)所需的全部信息,包括架构概览、接口定义、实现步骤、类型转换、错误处理和测试策略。适用于为任何数据库构建 Prisma 适配器的场景。
适用场景
- 实现或修改 Prisma 驱动适配器
- 添加新的数据库驱动支持
- 涉及
SqlDriverAdapter/Transaction接口的工作 - 理解事务生命周期协议
- 错误映射和验证
架构概览
┌─────────────────────────────────────────────────────────────────┐
│ PrismaClient │
│ (需要适配器工厂) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SqlMigrationAwareDriverAdapterFactory │
│ ┌─────────────────────┐ ┌─────────────────────────────┐ │
│ │ connect() │ │ connectToShadowDb() │ │
│ │ → SqlDriverAdapter │ │ → SqlDriverAdapter │ │
│ └─────────────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SqlDriverAdapter │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ queryRaw() │ │ executeRaw() │ │ startTransaction() │ │
│ │ → ResultSet │ │ → number │ │ → Transaction │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │executeScript │ │ dispose() │ │ getConnectionInfo() │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Transaction │
│ 继承 SqlQueryable + commit() + rollback() + options │
│ (仅生命周期钩子 — Prisma 通过 executeRaw 发送 SQL) │
└─────────────────────────────────────────────────────────────────┘
必需接口
从 @prisma/driver-adapter-utils 导入:
import type {
ColumnType,
IsolationLevel,
SqlDriverAdapter,
SqlMigrationAwareDriverAdapterFactory,
SqlQuery,
SqlQueryable,
SqlResultSet,
Transaction,
TransactionOptions,
ArgType,
ConnectionInfo,
MappedError,
} from "@prisma/driver-adapter-utils";
import {
ColumnTypeEnum,
DriverAdapterError,
} from "@prisma/driver-adapter-utils";
接口定义
SqlQuery(queryRaw/executeRaw 的输入)
type SqlQuery = {
sql: string; // 带占位符的参数化 SQL
args: Array<unknown>; // 绑定的参数值
argTypes: Array<ArgType>; // 每个参数的类型提示
};
type ArgType = {
scalarType: ArgScalarType; // 'string' | 'int' | 'bigint' | 'float' | 'decimal' | 'boolean' | 'enum' | 'uuid' | 'json' | 'datetime' | 'bytes' | 'unknown'
dbType?: string;
arity: "scalar" | "list";
};
SqlResultSet(queryRaw 的输出)
interface SqlResultSet {
columnNames: Array<string>; // 按顺序排列的列名
columnTypes: Array<ColumnType>; // 与 columnNames 对应的列类型
rows: Array<Array<unknown>>; // 行数据(数组的数组)
lastInsertId?: string; // 用于不带 RETURNING 的 INSERT
}
SqlDriverAdapter
interface SqlDriverAdapter extends SqlQueryable {
executeScript(script: string): Promise<void>;
startTransaction(isolationLevel?: IsolationLevel): Promise<Transaction>;
getConnectionInfo?(): ConnectionInfo;
dispose(): Promise<void>;
}
Transaction
interface Transaction extends SqlQueryable {
readonly options: TransactionOptions;
commit(): Promise<void>;
rollback(): Promise<void>;
}
type TransactionOptions = { usePhantomQuery: boolean };
SqlMigrationAwareDriverAdapterFactory
interface SqlMigrationAwareDriverAdapterFactory {
readonly provider: "mysql" | "postgres" | "sqlite" | "sqlserver";
readonly adapterName: string;
connect(): Promise<SqlDriverAdapter>;
connectToShadowDb(): Promise<SqlDriverAdapter>;
}
实现步骤
步骤 1:创建 Queryable 基类
class MyQueryable<TClient> implements SqlQueryable {
readonly provider = "postgres" as const; // 或 'sqlite' | 'mysql' | 'sqlserver'
readonly adapterName = "@my-org/adapter-mydb" as const;
constructor(protected readonly client: TClient) {}
async queryRaw(query: SqlQuery): Promise<SqlResultSet> {
try {
const args = query.args.map((arg, i) =>
mapArg(arg, query.argTypes[i] ?? { scalarType: "unknown", arity: "scalar" })
);
// 使用驱动执行查询
const result = await this.client.query(query.sql, args);
// 提取列元数据
const columnNames = /* 从结果中获取 */;
const columnTypes = /* 映射到 ColumnTypeEnum */;
// 将行映射为 ResultValue 数组
const rows = result.map(row => mapRow(row, columnTypes));
return { columnNames, columnTypes, rows };
} catch (e) {
this.onError(e);
}
}
async executeRaw(query: SqlQuery): Promise<number> {
try {
const args = query.args.map((arg, i) =>
mapArg(arg, query.argTypes[i] ?? { scalarType: "unknown", arity: "scalar" })
);
const result = await this.client.query(query.sql, args);
return result.affectedRows ?? 0;
} catch (e) {
this.onError(e);
}
}
protected onError(error: unknown): never {
throw new DriverAdapterError(convertDriverError(error));
}
}
步骤 2:创建 Transaction 类
关键:commit() 和 rollback() 仅是生命周期钩子。它们不得发出 SQL。Prisma 通过事务对象的 executeRaw 发送 COMMIT/ROLLBACK。
class MyTransaction extends MyQueryable<TClient> implements Transaction {
readonly options: TransactionOptions;
readonly #release: () => void;
constructor(
client: TClient,
options: TransactionOptions,
release: () => void,
) {
super(client);
this.options = options;
this.#release = release;
}
commit(): Promise<void> {
// 不要在此处发出 COMMIT SQL — Prisma 通过 executeRaw 执行
this.#release(); // 释放连接/资源
return Promise.resolve();
}
rollback(): Promise<void> {
// 不要在此处发出 ROLLBACK SQL — Prisma 通过 executeRaw 执行
this.#release();
return Promise.resolve();
}
}
步骤 3:创建 Adapter 类
class MyAdapter extends MyQueryable<TClient> implements SqlDriverAdapter {
#transactionDepth = 0;
constructor(client: TClient) {
super(client);
}
async executeScript(script: string): Promise<void> {
// SQLite:按 ';' 分割并逐条执行
// Postgres:使用多语句执行
try {
// 实现取决于驱动能力
} catch (e) {
this.onError(e);
}
}
async startTransaction(
isolationLevel?: IsolationLevel,
): Promise<Transaction> {
// 验证数据库的隔离级别
const validLevels = new Set<IsolationLevel>([
"READ UNCOMMITTED",
"READ COMMITTED",
"REPEATABLE READ",
"SERIALIZABLE",
]);
if (isolationLevel !== undefined && !validLevels.has(isolationLevel)) {
throw new DriverAdapterError({
kind: "InvalidIsolationLevel",
level: isolationLevel,
});
}
const options: TransactionOptions = { usePhantomQuery: false };
this.#transactionDepth += 1;
const depth = this.#transactionDepth;
try {
if (depth === 1) {
// 发出 BEGIN(带隔离级别时包含隔离级别)
const beginSql = isolationLevel
? `BEGIN ISOLATION LEVEL ${isolationLevel}`
: "BEGIN";
await this.client.query(beginSql);
} else {
// 嵌套事务:使用保存点(Savepoint)
await this.client.query(`SAVEPOINT sp_${depth}`);
}
} catch (e) {
this.#transactionDepth -= 1;
this.onError(e);
}
const release = () => {
this.#transactionDepth -= 1;
};
return new MyTransaction(this.client, options, release);
}
getConnectionInfo(): ConnectionInfo {
return { supportsRelationJoins: true };
}
async dispose(): Promise<void> {
await this.client.close();
}
}
步骤 4:创建 Factory 类
export type MyAdapterConfig = {
url: string;
};
export type MyAdapterOptions = {
shadowDatabaseUrl?: string;
};
export class MyAdapterFactory implements SqlMigrationAwareDriverAdapterFactory {
readonly provider = "postgres" as const;
readonly adapterName = "@my-org/adapter-mydb" as const;
constructor(
private readonly config: MyAdapterConfig,
private readonly options?: MyAdapterOptions,
) {}
connect(): Promise<SqlDriverAdapter> {
return Promise.resolve(new MyAdapter(openConnection(this.config.url)));
}
connectToShadowDb(): Promise<SqlDriverAdapter> {
const url = this.options?.shadowDatabaseUrl ?? this.config.url;
return Promise.resolve(new MyAdapter(openConnection(url)));
}
}
转换辅助函数
参数映射(输入)
将 Prisma 参数值转换为驱动原生类型:
function mapArg(arg: unknown, argType: ArgType): unknown {
if (arg === null || arg === undefined) return null;
// 字符串 → 数字(整型列)
if (typeof arg === "string" && argType.scalarType === "int")
return Number.parseInt(arg, 10);
// 字符串 → 数字(浮点列)
if (typeof arg === "string" && argType.scalarType === "float")
return Number.parseFloat(arg);
// 字符串 → BigInt(大整数列)
if (typeof arg === "string" && argType.scalarType === "bigint")
return BigInt(arg);
// Base64 字符串 → Buffer(字节列)
if (typeof arg === "string" && argType.scalarType === "bytes")
return Buffer.from(arg, "base64");
return arg;
}
行映射(输出)
将驱动结果值转换为 Prisma 期望的类型:
function mapRow(row: unknown[], columnTypes: ColumnType[]): ResultValue[] {
const result: ResultValue[] = [];
for (let i = 0; i < row.length; i++) {
const value = row[i] ?? null;
const colType = columnTypes[i];
if (value === null) {
result.push(null);
continue;
}
// bigint → 字符串(JSON 安全)
if (typeof value === "bigint") {
result.push(value.toString());
continue;
}
// Date → ISO 8601 字符串
if (value instanceof Date) {
result.push(value.toISOString());
continue;
}
// JSON 对象 → 字符串化
if (colType === ColumnTypeEnum.Json && typeof value === "object") {
result.push(JSON.stringify(value));
continue;
}
result.push(value as ResultValue);
}
return result;
}
列类型推断
当驱动不提供类型元数据时,从 JS 值推断:
function inferColumnType(value: NonNullable<unknown>): ColumnType {
if (typeof value === "boolean") return ColumnTypeEnum.Boolean;
if (typeof value === "bigint") return ColumnTypeEnum.Int64;
if (value instanceof Uint8Array) return ColumnTypeEnum.Bytes;
if (value instanceof Date) return ColumnTypeEnum.DateTime;
if (Array.isArray(value)) return ColumnTypeEnum.Text; // 回退
if (typeof value === "object") return ColumnTypeEnum.Json;
if (typeof value === "number") return ColumnTypeEnum.UnknownNumber;
return ColumnTypeEnum.Text;
}
错误处理
将驱动错误映射为 MappedError,以便 Prisma 正确处理:
function convertDriverError(error: unknown): MappedError {
if (error instanceof Error) {
const dbError = error as Error & { code?: string; errno?: number };
// PostgreSQL 错误码示例
if (dbError.code === "23505") {
return { kind: "UniqueConstraintViolation" }; // 唯一约束冲突
}
if (dbError.code === "23502") {
return { kind: "NullConstraintViolation" }; // 非空约束冲突
}
if (dbError.code === "23503") {
return { kind: "ForeignKeyConstraintViolation" }; // 外键约束冲突
}
if (dbError.code === "42P01") {
return { kind: "TableDoesNotExist" }; // 表不存在
}
// SQLite 错误示例
if (error.name === "SQLiteError") {
return {
kind: "sqlite",
extendedCode: dbError.errno ?? 1,
message: error.message,
};
}
// PostgreSQL 原始错误
if (dbError.code) {
return {
kind: "postgres",
code: dbError.code,
severity: "ERROR",
message: error.message,
detail: undefined,
column: undefined,
hint: undefined,
};
}
}
return { kind: "GenericJs", id: 0 };
}
数据库特定注意事项
SQLite
- 打开数据库时设置
safeIntegers: true以获取大整数的bigint类型 - 仅支持
SERIALIZABLE隔离级别 executeScript:按;分割并逐条执行- 布尔值:存储为 0/1,返回为 boolean
PostgreSQL
- 支持所有标准隔离级别
- 使用连接池(PgBouncer)时,使用
prepare: false - 事务需要专用连接(
reserve()模式) executeScript:使用多语句执行int8列可能返回字符串(驱动已字符串化)numeric列返回字符串以保持精度
MySQL/MariaDB
- 支持
READ UNCOMMITTED、READ COMMITTED、REPEATABLE READ、SERIALIZABLE - 使用
?占位符作为参数 - 大值时将
BIGINT作为字符串处理
测试策略
单元测试(不使用 PrismaClient)
直接使用原始数据库驱动测试适配器:
describe("queryRaw", () => {
test("返回列名和类型", async () => {
const adapter = new MyAdapter(createTestConnection());
const result = await adapter.queryRaw({
sql: "SELECT id, name FROM users",
args: [],
argTypes: [],
});
expect(result.columnNames).toEqual(["id", "name"]);
expect(result.columnTypes[0]).toBe(ColumnTypeEnum.Int32);
});
});
describe("startTransaction", () => {
test("提交后数据持久化", async () => {
const adapter = new MyAdapter(createTestConnection());
const tx = await adapter.startTransaction();
await tx.executeRaw({
sql: "INSERT INTO users (name) VALUES (?)",
args: ["Alice"],
argTypes: [],
});
// Prisma 通过 executeRaw 发送 COMMIT
await tx.executeRaw({ sql: "COMMIT", args: [], argTypes: [] });
await tx.commit(); // 仅生命周期钩子
// 验证数据已持久化
});
});
端到端测试(使用 PrismaClient)
describe("E2E", () => {
let prisma: PrismaClient;
beforeEach(async () => {
const factory = new MyAdapterFactory({ url: TEST_DB_URL });
prisma = new PrismaClient({ adapter: factory });
});
test("CRUD 操作", async () => {
const user = await prisma.user.create({ data: { name: "Alice" } });
expect(user.id).toBeGreaterThan(0);
const found = await prisma.user.findUnique({ where: { id: user.id } });
expect(found?.name).toBe("Alice");
});
test("事务错误时回滚", async () => {
await expect(
prisma.$transaction(async (tx) => {
await tx.user.create({ data: { name: "Bob" } });
throw new Error("Rollback!");
}),
).rejects.toThrow();
expect(await prisma.user.count()).toBe(0);
});
});
使用示例
import { PrismaClient } from "./generated/prisma/client";
import { MyAdapterFactory } from "@my-org/adapter-mydb";
const factory = new MyAdapterFactory({
url: process.env.DATABASE_URL!,
});
const prisma = new PrismaClient({ adapter: factory });
// 正常使用 Prisma
const users = await prisma.user.findMany();
完成清单
在认为适配器完成之前,请确认:
-
SqlMigrationAwareDriverAdapterFactory已实现connect()和connectToShadowDb() -
SqlDriverAdapter实现了queryRaw、executeRaw、executeScript、startTransaction、dispose -
Transaction实现了queryRaw、executeRaw、commit、rollback,且options: { usePhantomQuery: false } -
commit()和rollback()仅为生命周期钩子(不发出 SQL) -
startTransaction发出BEGIN(深度 1)或SAVEPOINT sp_N(嵌套) - 参数映射处理:string->int、string->bigint、string->float、base64->bytes
- 行映射处理:bigint->string、Date->ISO 字符串、JSON->字符串
- 列类型正确映射到
ColumnTypeEnum - 错误已用
DriverAdapterError包装,且包含正确的MappedError类型 - 目标数据库的隔离级别验证
- queryRaw、executeRaw、executeScript、事务的单元测试通过
- 使用真实 PrismaClient 的端到端测试通过
附录:ColumnTypeEnum 值
const ColumnTypeEnum = {
Int32: 0, Int64: 1, Float: 2, Double: 3,
Numeric: 4, Boolean: 5, Character: 6, Text: 7,
Date: 8, Time: 9, DateTime: 10, Json: 11,
Enum: 12, Bytes: 13, Set: 14, Uuid: 15,
// 数组变体
Int32Array: 64, Int64Array: 65, FloatArray: 66,
DoubleArray: 67, NumericArray: 68, BooleanArray: 69,
CharacterArray: 70, TextArray: 71, DateArray: 72,
TimeArray: 73, DateTimeArray: 74, JsonArray: 75,
EnumArray: 76, BytesArray: 77, UuidArray: 78,
// 特殊
UnknownNumber: 128,
} as const;