驱动适配器实现指南(Prisma 7 Driver Adapter Implementation Guide)

Verified 高级 Advanced 参考型 Reference ⚡ Claude Code 专属 ⚡ Claude Code Optimized
12 min read · 620 lines

Prisma 官方驱动适配器实现指南:架构 + 接口 + 4 步实现 + 类型转换

驱动适配器实现指南(Prisma 7 Driver Adapter Implementation Guide)

概述

本指南提供为 Prisma ORM v7 实现自定义驱动适配器(Driver Adapter)所需的全部信息,包括架构概览、接口定义、实现步骤、类型转换、错误处理和测试策略。适用于为任何数据库构建 Prisma 适配器的场景。

适用场景

  • 实现或修改 Prisma 驱动适配器
  • 添加新的数据库驱动支持
  • 涉及 SqlDriverAdapter / Transaction 接口的工作
  • 理解事务生命周期协议
  • 错误映射和验证

架构概览

┌─────────────────────────────────────────────────────────────────┐
│                         PrismaClient                            │
│                    (需要适配器工厂)                               │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│            SqlMigrationAwareDriverAdapterFactory                │
│   ┌─────────────────────┐    ┌─────────────────────────────┐    │
│   │ connect()           │    │ connectToShadowDb()         │    │
│   │ → SqlDriverAdapter  │    │ → SqlDriverAdapter          │    │
│   └─────────────────────┘    └─────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                      SqlDriverAdapter                           │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│  │ queryRaw()   │ │ executeRaw() │ │ startTransaction()       │ │
│  │ → ResultSet  │ │ → number     │ │ → Transaction            │ │
│  └──────────────┘ └──────────────┘ └──────────────────────────┘ │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│  │executeScript │ │ dispose()    │ │ getConnectionInfo()      │ │
│  └──────────────┘ └──────────────┘ └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                        Transaction                              │
│  继承 SqlQueryable + commit() + rollback() + options            │
│  (仅生命周期钩子 — Prisma 通过 executeRaw 发送 SQL)              │
└─────────────────────────────────────────────────────────────────┘

必需接口

@prisma/driver-adapter-utils 导入:

import type {
  ColumnType,
  IsolationLevel,
  SqlDriverAdapter,
  SqlMigrationAwareDriverAdapterFactory,
  SqlQuery,
  SqlQueryable,
  SqlResultSet,
  Transaction,
  TransactionOptions,
  ArgType,
  ConnectionInfo,
  MappedError,
} from "@prisma/driver-adapter-utils";
import {
  ColumnTypeEnum,
  DriverAdapterError,
} from "@prisma/driver-adapter-utils";

接口定义

SqlQuery(queryRaw/executeRaw 的输入)

type SqlQuery = {
  sql: string;           // 带占位符的参数化 SQL
  args: Array<unknown>;  // 绑定的参数值
  argTypes: Array<ArgType>; // 每个参数的类型提示
};

type ArgType = {
  scalarType: ArgScalarType; // 'string' | 'int' | 'bigint' | 'float' | 'decimal' | 'boolean' | 'enum' | 'uuid' | 'json' | 'datetime' | 'bytes' | 'unknown'
  dbType?: string;
  arity: "scalar" | "list";
};

SqlResultSet(queryRaw 的输出)

interface SqlResultSet {
  columnNames: Array<string>;   // 按顺序排列的列名
  columnTypes: Array<ColumnType>; // 与 columnNames 对应的列类型
  rows: Array<Array<unknown>>;  // 行数据(数组的数组)
  lastInsertId?: string;        // 用于不带 RETURNING 的 INSERT
}

SqlDriverAdapter

interface SqlDriverAdapter extends SqlQueryable {
  executeScript(script: string): Promise<void>;
  startTransaction(isolationLevel?: IsolationLevel): Promise<Transaction>;
  getConnectionInfo?(): ConnectionInfo;
  dispose(): Promise<void>;
}

Transaction

interface Transaction extends SqlQueryable {
  readonly options: TransactionOptions;
  commit(): Promise<void>;
  rollback(): Promise<void>;
}

type TransactionOptions = { usePhantomQuery: boolean };

SqlMigrationAwareDriverAdapterFactory

interface SqlMigrationAwareDriverAdapterFactory {
  readonly provider: "mysql" | "postgres" | "sqlite" | "sqlserver";
  readonly adapterName: string;
  connect(): Promise<SqlDriverAdapter>;
  connectToShadowDb(): Promise<SqlDriverAdapter>;
}

实现步骤

步骤 1:创建 Queryable 基类

class MyQueryable<TClient> implements SqlQueryable {
  readonly provider = "postgres" as const; // 或 'sqlite' | 'mysql' | 'sqlserver'
  readonly adapterName = "@my-org/adapter-mydb" as const;

  constructor(protected readonly client: TClient) {}

  async queryRaw(query: SqlQuery): Promise<SqlResultSet> {
    try {
      const args = query.args.map((arg, i) =>
        mapArg(arg, query.argTypes[i] ?? { scalarType: "unknown", arity: "scalar" })
      );

      // 使用驱动执行查询
      const result = await this.client.query(query.sql, args);

      // 提取列元数据
      const columnNames = /* 从结果中获取 */;
      const columnTypes = /* 映射到 ColumnTypeEnum */;

      // 将行映射为 ResultValue 数组
      const rows = result.map(row => mapRow(row, columnTypes));

      return { columnNames, columnTypes, rows };
    } catch (e) {
      this.onError(e);
    }
  }

  async executeRaw(query: SqlQuery): Promise<number> {
    try {
      const args = query.args.map((arg, i) =>
        mapArg(arg, query.argTypes[i] ?? { scalarType: "unknown", arity: "scalar" })
      );
      const result = await this.client.query(query.sql, args);
      return result.affectedRows ?? 0;
    } catch (e) {
      this.onError(e);
    }
  }

  protected onError(error: unknown): never {
    throw new DriverAdapterError(convertDriverError(error));
  }
}

步骤 2:创建 Transaction 类

关键commit()rollback() 仅是生命周期钩子。它们不得发出 SQL。Prisma 通过事务对象的 executeRaw 发送 COMMIT/ROLLBACK

class MyTransaction extends MyQueryable<TClient> implements Transaction {
  readonly options: TransactionOptions;
  readonly #release: () => void;

  constructor(
    client: TClient,
    options: TransactionOptions,
    release: () => void,
  ) {
    super(client);
    this.options = options;
    this.#release = release;
  }

  commit(): Promise<void> {
    // 不要在此处发出 COMMIT SQL — Prisma 通过 executeRaw 执行
    this.#release(); // 释放连接/资源
    return Promise.resolve();
  }

  rollback(): Promise<void> {
    // 不要在此处发出 ROLLBACK SQL — Prisma 通过 executeRaw 执行
    this.#release();
    return Promise.resolve();
  }
}

步骤 3:创建 Adapter 类

class MyAdapter extends MyQueryable<TClient> implements SqlDriverAdapter {
  #transactionDepth = 0;

  constructor(client: TClient) {
    super(client);
  }

  async executeScript(script: string): Promise<void> {
    // SQLite:按 ';' 分割并逐条执行
    // Postgres:使用多语句执行
    try {
      // 实现取决于驱动能力
    } catch (e) {
      this.onError(e);
    }
  }

  async startTransaction(
    isolationLevel?: IsolationLevel,
  ): Promise<Transaction> {
    // 验证数据库的隔离级别
    const validLevels = new Set<IsolationLevel>([
      "READ UNCOMMITTED",
      "READ COMMITTED",
      "REPEATABLE READ",
      "SERIALIZABLE",
    ]);

    if (isolationLevel !== undefined && !validLevels.has(isolationLevel)) {
      throw new DriverAdapterError({
        kind: "InvalidIsolationLevel",
        level: isolationLevel,
      });
    }

    const options: TransactionOptions = { usePhantomQuery: false };

    this.#transactionDepth += 1;
    const depth = this.#transactionDepth;

    try {
      if (depth === 1) {
        // 发出 BEGIN(带隔离级别时包含隔离级别)
        const beginSql = isolationLevel
          ? `BEGIN ISOLATION LEVEL ${isolationLevel}`
          : "BEGIN";
        await this.client.query(beginSql);
      } else {
        // 嵌套事务:使用保存点(Savepoint)
        await this.client.query(`SAVEPOINT sp_${depth}`);
      }
    } catch (e) {
      this.#transactionDepth -= 1;
      this.onError(e);
    }

    const release = () => {
      this.#transactionDepth -= 1;
    };
    return new MyTransaction(this.client, options, release);
  }

  getConnectionInfo(): ConnectionInfo {
    return { supportsRelationJoins: true };
  }

  async dispose(): Promise<void> {
    await this.client.close();
  }
}

步骤 4:创建 Factory 类

export type MyAdapterConfig = {
  url: string;
};

export type MyAdapterOptions = {
  shadowDatabaseUrl?: string;
};

export class MyAdapterFactory implements SqlMigrationAwareDriverAdapterFactory {
  readonly provider = "postgres" as const;
  readonly adapterName = "@my-org/adapter-mydb" as const;

  constructor(
    private readonly config: MyAdapterConfig,
    private readonly options?: MyAdapterOptions,
  ) {}

  connect(): Promise<SqlDriverAdapter> {
    return Promise.resolve(new MyAdapter(openConnection(this.config.url)));
  }

  connectToShadowDb(): Promise<SqlDriverAdapter> {
    const url = this.options?.shadowDatabaseUrl ?? this.config.url;
    return Promise.resolve(new MyAdapter(openConnection(url)));
  }
}

转换辅助函数

参数映射(输入)

将 Prisma 参数值转换为驱动原生类型:

function mapArg(arg: unknown, argType: ArgType): unknown {
  if (arg === null || arg === undefined) return null;

  // 字符串 → 数字(整型列)
  if (typeof arg === "string" && argType.scalarType === "int")
    return Number.parseInt(arg, 10);

  // 字符串 → 数字(浮点列)
  if (typeof arg === "string" && argType.scalarType === "float")
    return Number.parseFloat(arg);

  // 字符串 → BigInt(大整数列)
  if (typeof arg === "string" && argType.scalarType === "bigint")
    return BigInt(arg);

  // Base64 字符串 → Buffer(字节列)
  if (typeof arg === "string" && argType.scalarType === "bytes")
    return Buffer.from(arg, "base64");

  return arg;
}

行映射(输出)

将驱动结果值转换为 Prisma 期望的类型:

function mapRow(row: unknown[], columnTypes: ColumnType[]): ResultValue[] {
  const result: ResultValue[] = [];

  for (let i = 0; i < row.length; i++) {
    const value = row[i] ?? null;
    const colType = columnTypes[i];

    if (value === null) {
      result.push(null);
      continue;
    }

    // bigint → 字符串(JSON 安全)
    if (typeof value === "bigint") {
      result.push(value.toString());
      continue;
    }

    // Date → ISO 8601 字符串
    if (value instanceof Date) {
      result.push(value.toISOString());
      continue;
    }

    // JSON 对象 → 字符串化
    if (colType === ColumnTypeEnum.Json && typeof value === "object") {
      result.push(JSON.stringify(value));
      continue;
    }

    result.push(value as ResultValue);
  }

  return result;
}

列类型推断

当驱动不提供类型元数据时,从 JS 值推断:

function inferColumnType(value: NonNullable<unknown>): ColumnType {
  if (typeof value === "boolean") return ColumnTypeEnum.Boolean;
  if (typeof value === "bigint") return ColumnTypeEnum.Int64;
  if (value instanceof Uint8Array) return ColumnTypeEnum.Bytes;
  if (value instanceof Date) return ColumnTypeEnum.DateTime;
  if (Array.isArray(value)) return ColumnTypeEnum.Text;   // 回退
  if (typeof value === "object") return ColumnTypeEnum.Json;
  if (typeof value === "number") return ColumnTypeEnum.UnknownNumber;
  return ColumnTypeEnum.Text;
}

错误处理

将驱动错误映射为 MappedError,以便 Prisma 正确处理:

function convertDriverError(error: unknown): MappedError {
  if (error instanceof Error) {
    const dbError = error as Error & { code?: string; errno?: number };

    // PostgreSQL 错误码示例
    if (dbError.code === "23505") {
      return { kind: "UniqueConstraintViolation" };    // 唯一约束冲突
    }
    if (dbError.code === "23502") {
      return { kind: "NullConstraintViolation" };      // 非空约束冲突
    }
    if (dbError.code === "23503") {
      return { kind: "ForeignKeyConstraintViolation" }; // 外键约束冲突
    }
    if (dbError.code === "42P01") {
      return { kind: "TableDoesNotExist" };            // 表不存在
    }

    // SQLite 错误示例
    if (error.name === "SQLiteError") {
      return {
        kind: "sqlite",
        extendedCode: dbError.errno ?? 1,
        message: error.message,
      };
    }

    // PostgreSQL 原始错误
    if (dbError.code) {
      return {
        kind: "postgres",
        code: dbError.code,
        severity: "ERROR",
        message: error.message,
        detail: undefined,
        column: undefined,
        hint: undefined,
      };
    }
  }

  return { kind: "GenericJs", id: 0 };
}

数据库特定注意事项

SQLite

  • 打开数据库时设置 safeIntegers: true 以获取大整数的 bigint 类型
  • 仅支持 SERIALIZABLE 隔离级别
  • executeScript:按 ; 分割并逐条执行
  • 布尔值:存储为 0/1,返回为 boolean

PostgreSQL

  • 支持所有标准隔离级别
  • 使用连接池(PgBouncer)时,使用 prepare: false
  • 事务需要专用连接(reserve() 模式)
  • executeScript:使用多语句执行
  • int8 列可能返回字符串(驱动已字符串化)
  • numeric 列返回字符串以保持精度

MySQL/MariaDB

  • 支持 READ UNCOMMITTEDREAD COMMITTEDREPEATABLE READSERIALIZABLE
  • 使用 ? 占位符作为参数
  • 大值时将 BIGINT 作为字符串处理

测试策略

单元测试(不使用 PrismaClient)

直接使用原始数据库驱动测试适配器:

describe("queryRaw", () => {
  test("返回列名和类型", async () => {
    const adapter = new MyAdapter(createTestConnection());
    const result = await adapter.queryRaw({
      sql: "SELECT id, name FROM users",
      args: [],
      argTypes: [],
    });
    expect(result.columnNames).toEqual(["id", "name"]);
    expect(result.columnTypes[0]).toBe(ColumnTypeEnum.Int32);
  });
});

describe("startTransaction", () => {
  test("提交后数据持久化", async () => {
    const adapter = new MyAdapter(createTestConnection());
    const tx = await adapter.startTransaction();
    await tx.executeRaw({
      sql: "INSERT INTO users (name) VALUES (?)",
      args: ["Alice"],
      argTypes: [],
    });
    // Prisma 通过 executeRaw 发送 COMMIT
    await tx.executeRaw({ sql: "COMMIT", args: [], argTypes: [] });
    await tx.commit(); // 仅生命周期钩子
    // 验证数据已持久化
  });
});

端到端测试(使用 PrismaClient)

describe("E2E", () => {
  let prisma: PrismaClient;

  beforeEach(async () => {
    const factory = new MyAdapterFactory({ url: TEST_DB_URL });
    prisma = new PrismaClient({ adapter: factory });
  });

  test("CRUD 操作", async () => {
    const user = await prisma.user.create({ data: { name: "Alice" } });
    expect(user.id).toBeGreaterThan(0);

    const found = await prisma.user.findUnique({ where: { id: user.id } });
    expect(found?.name).toBe("Alice");
  });

  test("事务错误时回滚", async () => {
    await expect(
      prisma.$transaction(async (tx) => {
        await tx.user.create({ data: { name: "Bob" } });
        throw new Error("Rollback!");
      }),
    ).rejects.toThrow();

    expect(await prisma.user.count()).toBe(0);
  });
});

使用示例

import { PrismaClient } from "./generated/prisma/client";
import { MyAdapterFactory } from "@my-org/adapter-mydb";

const factory = new MyAdapterFactory({
  url: process.env.DATABASE_URL!,
});

const prisma = new PrismaClient({ adapter: factory });

// 正常使用 Prisma
const users = await prisma.user.findMany();

完成清单

在认为适配器完成之前,请确认:

  • SqlMigrationAwareDriverAdapterFactory 已实现 connect()connectToShadowDb()
  • SqlDriverAdapter 实现了 queryRawexecuteRawexecuteScriptstartTransactiondispose
  • Transaction 实现了 queryRawexecuteRawcommitrollback,且 options: { usePhantomQuery: false }
  • commit()rollback() 仅为生命周期钩子(不发出 SQL)
  • startTransaction 发出 BEGIN(深度 1)或 SAVEPOINT sp_N(嵌套)
  • 参数映射处理:string->int、string->bigint、string->float、base64->bytes
  • 行映射处理:bigint->string、Date->ISO 字符串、JSON->字符串
  • 列类型正确映射到 ColumnTypeEnum
  • 错误已用 DriverAdapterError 包装,且包含正确的 MappedError 类型
  • 目标数据库的隔离级别验证
  • queryRaw、executeRaw、executeScript、事务的单元测试通过
  • 使用真实 PrismaClient 的端到端测试通过

附录:ColumnTypeEnum 值

const ColumnTypeEnum = {
  Int32: 0,     Int64: 1,      Float: 2,       Double: 3,
  Numeric: 4,   Boolean: 5,    Character: 6,   Text: 7,
  Date: 8,      Time: 9,       DateTime: 10,   Json: 11,
  Enum: 12,     Bytes: 13,     Set: 14,        Uuid: 15,
  // 数组变体
  Int32Array: 64,    Int64Array: 65,    FloatArray: 66,
  DoubleArray: 67,   NumericArray: 68,  BooleanArray: 69,
  CharacterArray: 70, TextArray: 71,    DateArray: 72,
  TimeArray: 73,     DateTimeArray: 74, JsonArray: 75,
  EnumArray: 76,     BytesArray: 77,    UuidArray: 78,
  // 特殊
  UnknownNumber: 128,
} as const;

相关技能 Related Skills