面向 Node.js 服务端的轻量 RPC 模块,支持 unary 与 streaming 调用模型,优先支持 VextJS,同时保留 Express、Koa、Fastify 等框架的接入能力。
本文只说明模块的使用方式、默认行为和适用边界,不展开需求与设计过程。
- 安装
- 适用场景
- 模块定位
- 核心概念
- 定义共享契约
- VextJS — 暴露 RPC 服务
- VextJS — 调用远端服务
- Express — 挂载 RPC 服务
- Koa — 挂载 RPC 服务
- Fastify — 挂载 RPC 服务
- 通用客户端
- 配置项
- 错误模型
- metadata 与透传
- 服务发现与负载均衡
- 可替换组件
- 不提供的能力
- 默认口径
npm install vextjs-rpc schema-dsl子路径适配器随主包一起发布,无需额外安装。
- Node.js 服务之间需要统一 unary 与 streaming 调用方式,无论使用哪种框架。
- 不想引入 gRPC/IDL 体系,但需要类型安全的跨服务调用。
- 需要在应用层统一 timeout、retry 与错误码映射,并透传平台注入的 trace 上下文。
- 需要在 VextJS、Express、Koa、Fastify 等框架之间共享同一套调用模型。
vextjs-rpc 是应用层 RPC 框架,关注点是服务间的契约定义与类型安全调用。
本模块负责:
- 契约定义(router / procedure / schema-dsl 校验)
- unary 与 streaming 调用模型(双向流、服务端流、客户端流)
- HTTP + JSON 序列化与反序列化
- 输入/输出 schema 校验与统一错误模型
- context 构建与 procedure 调用链路
- 框架适配(VextJS / Express / Koa / Fastify handler 与 client)
- 客户端侧 timeout 与重试
- 透传平台注入的 trace 上下文(转发 requestId / traceparent,不负责生成)
- 服务端鉴权钩子(
authhook,应用层逻辑)
以下由网关、服务网格或平台层负责,不在本模块范围内:
| 能力 | 典型实现 |
|---|---|
| 服务注册与发现 | Nacos / Consul / Kubernetes Service |
| 实例级负载均衡 | 服务网格(Istio / Linkerd)/ L7 网关 |
| 熔断与限流 | Sentinel / 网格流量策略 |
| mTLS 加密传输 | 服务网格 sidecar |
| trace context 生成(requestId / traceparent) | API 网关 / 入口中间件 |
| 健康检查与流量摘除 | Kubernetes 探针 / 注册中心 |
内置的
staticResolver+roundRobin仅供开发环境和无服务网格的简单部署使用;生产环境若已有服务网格,只需将地址配为 mesh 提供的稳定 ClusterIP / Service DNS,无需依赖内置负载均衡策略。
| 概念 | 作用 |
|---|---|
router |
把多个 procedure 组织成一个模块 |
procedure |
描述一个远端方法:输入 schema、输出 schema、处理函数 |
stream |
描述一个流式方法:持续输入、持续输出,或双向持续收发 |
handler |
把 router 挂到具体宿主框架 |
client |
从调用方访问远端 procedure |
resolver |
决定 client 如何找到下游服务地址 |
loadBalancer |
在多个地址之间选择下游实例 |
schema 通过内置的 schema-dsl 定义,这是本模块唯一的校验库。同一份 router 可以同时被服务端和客户端引用,不需要手写两份类型。
下面先用 query / mutation 展示基础契约定义;streaming 与它共享同一套 router、context、metadata 和错误模型。
// contracts/user.ts
import { createRouter, query } from 'vextjs-rpc'
import { dsl } from 'schema-dsl'
export const userRouter = createRouter({
getById: query({
input: dsl({ id: 'string!' }),
output: dsl({ id: 'string!', name: 'string!', email: 'email!' }),
resolve: async ({ input, ctx }) => {
return ctx.services.user.findById(input.id)
},
}),
list: query({
input: dsl({ page: 'integer:1-!', pageSize: 'integer:1-100' }),
output: dsl({ total: 'integer!', items: 'array!' }),
resolve: async ({ input, ctx }) => {
return ctx.services.user.list(input)
},
}),
})// contracts/user.ts(续)
import { createRouter, query, mutation } from 'vextjs-rpc'
import { dsl } from 'schema-dsl'
export const userRouter = createRouter({
// ...query 略
create: mutation({
input: dsl({
name: 'string:1-64!',
email: 'email!',
role: 'admin|user|guest',
}),
output: dsl({ id: 'string!' }),
resolve: async ({ input, ctx }) => {
return ctx.services.user.create(input)
},
}),
update: mutation({
input: dsl({
id: 'string!',
name: 'string:1-64',
email: 'email',
}),
output: dsl({ ok: 'boolean!' }),
resolve: async ({ input, ctx }) => {
await ctx.services.user.update(input.id, input)
return { ok: true }
},
}),
})当一个服务对外暴露多个业务模块时,可以把多个 router 合并:
// contracts/index.ts
import { createRouter } from 'vextjs-rpc'
import { userRouter } from './user'
import { orderRouter } from './order'
export const appRouter = createRouter({
user: userRouter,
order: orderRouter,
})
export type AppRouter = typeof appRouter在 VextJS 中,推荐把 vextjs-rpc 直接当成一个插件使用:同一个插件同时完成 app.rpc 挂载与 /rpc 服务端入口注册。
在 VextJS 中,推荐默认按以下方式接入:
- 默认
/rpc作为服务端入口路径 - 默认复用
app.services、request.user、x-request-id/traceparent - 默认把调用入口挂到
app.rpc - 默认通过
app.fetch.create()发起出站 RPC 请求 createContext、auth、policies、observability作为覆盖点,而不是每个项目都必须手写的大段样板
// src/plugins/rpc.ts
import { rpcPlugin } from 'vextjs-rpc/vextjs'
import { staticResolver } from 'vextjs-rpc'
import { appRouter } from '../contracts'
export default rpcPlugin({
router: appRouter,
resolver: staticResolver({
user: [process.env.USER_RPC_URL ?? 'http://user-service:3000/rpc'],
order: [process.env.ORDER_RPC_URL ?? 'http://order-service:3000/rpc'],
}),
})最简插件默认完成:
- 将 client 挂到
app.rpc - 在当前服务内注册
/rpc入口 - context 默认复用
app、request、app.services、request.user、x-request-id/traceparent - 出站调用默认复用
app.fetch.create() - 错误日志默认走
app.logger
// src/plugins/rpc.ts
import { rpcPlugin } from 'vextjs-rpc/vextjs'
import { staticResolver } from 'vextjs-rpc'
import { appRouter } from '../contracts'
export default rpcPlugin({
router: appRouter,
resolver: staticResolver({
user: [process.env.USER_RPC_URL ?? 'http://user-service:3000/rpc'],
}),
path: '/internal/rpc',
createContext: async ({ app, request }) => ({
app,
request,
services: app.services,
auth: request.user,
requestId: request.headers['x-request-id'],
tenantId: request.headers['x-tenant-id'],
}),
auth: (ctx) => {
if (!ctx.auth) throw new Error('UNAUTHORIZED')
},
policies: [
(ctx) => {
if (!ctx.tenantId) throw new Error('TENANT_REQUIRED')
},
],
onError: ({ error, ctx }) => {
ctx?.app.logger.error('rpc error', { error })
},
})如果你只想保留单侧能力:
// 只作为调用方
export default rpcPlugin({
resolver: staticResolver({ user: ['http://user-service:3000/rpc'] }),
server: false,
})
// 只作为服务提供方
export default rpcPlugin({
router: appRouter,
client: false,
})只有在你需要显式接管服务端入口注册时,才建议退回 mountVextRpc() 和 createVextRpcClient() 这两个低阶 API。
// src/services/order.ts
export class OrderService {
constructor(private app: VextApp) {}
async createOrder(userId: string, items: OrderItem[]) {
const user = await this.app.rpc.user.getById({ id: userId })
return this.app.rpc.order.create({ userId: user.id, items })
}
}插件模式下默认完成:
- client 已挂到
app.rpc - 默认负载策略使用
roundRobin - 默认透传当前请求上下文里的
requestId/traceparent - timeout、retry 使用模块默认值
// src/plugins/rpc.ts
import { definePlugin } from 'vextjs'
import { createVextRpcClient } from 'vextjs-rpc/vextjs'
import { staticResolver, roundRobin } from 'vextjs-rpc'
import type { AppRouter } from '../contracts'
export default definePlugin({
name: 'rpc',
setup(app) {
app.extend(
'rpc',
createVextRpcClient<AppRouter>({
app,
resolver: staticResolver({
user: [process.env.USER_RPC_URL ?? 'http://user-service:3000/rpc'],
order: [process.env.ORDER_RPC_URL ?? 'http://order-service:3000/rpc'],
}),
loadBalancer: roundRobin(),
timeout: 3000,
retry: 1,
metadata: ({ requestId, traceparent }) => ({
requestId,
traceparent,
source: 'gateway',
}),
}),
)
},
})这两个 API 主要用于:
- 你想显式拆开 server / client 生命周期
- 你想自己接管
/rpc路径注册 - 你不希望采用插件默认组合层
示例中的
buildServices()代表应用层提供的 service 注入函数(如 DI 容器的入口),Koa、Fastify 示例同理;VextJS 直接使用app.services。
import express from 'express'
import { createExpressRpcHandler } from 'vextjs-rpc/express'
import { appRouter } from './contracts'
const app = express()
app.use(express.json())
app.use(
'/rpc',
createExpressRpcHandler({
router: appRouter,
createContext: async ({ req, res }) => ({
req,
res,
services: buildServices(),
requestId: req.header('x-request-id'),
}),
onError: ({ error }) => {
console.error('rpc error', error)
},
}),
)
app.listen(3000)import Koa from 'koa'
import Router from '@koa/router'
import bodyParser from 'koa-bodyparser'
import { createKoaRpcHandler } from 'vextjs-rpc/koa'
import { appRouter } from './contracts'
const app = new Koa()
const router = new Router()
app.use(bodyParser())
router.use(
'/rpc',
createKoaRpcHandler({
router: appRouter,
createContext: async ({ ctx }) => ({
ctx,
services: buildServices(),
requestId: ctx.get('x-request-id'),
}),
onError: ({ error }) => {
console.error('rpc error', error)
},
}),
)
app.use(router.routes())
app.listen(3000)import Fastify from 'fastify'
import { createFastifyRpcPlugin } from 'vextjs-rpc/fastify'
import { appRouter } from './contracts'
const app = Fastify()
await app.register(createFastifyRpcPlugin, {
prefix: '/rpc',
router: appRouter,
createContext: async ({ request }) => ({
request,
services: buildServices(),
requestId: request.headers['x-request-id'],
}),
onError: ({ error }) => {
app.log.error('rpc error', error)
},
})
app.listen({ port: 3000 })在非 VextJS 框架中,使用 createRpcClient 创建独立客户端实例:
// rpc.ts
import { createRpcClient, staticResolver, roundRobin } from 'vextjs-rpc'
import type { AppRouter } from './contracts'
export const rpc = createRpcClient<AppRouter>({
resolver: staticResolver({
user: [process.env.USER_RPC_URL ?? 'http://user-service:3000/rpc'],
order: [process.env.ORDER_RPC_URL ?? 'http://order-service:3000/rpc'],
}),
loadBalancer: roundRobin(),
timeout: 3000,
retry: 1,
metadata: ({ requestId, traceparent }) => ({
requestId,
traceparent,
}),
})// 业务代码中直接调用
import { rpc } from './rpc'
const user = await rpc.user.getById({ id: userId })
const order = await rpc.order.create({ userId: user.id, items })| 字段 | 类型 | 说明 |
|---|---|---|
resolver |
Resolver |
解析目标服务地址 |
loadBalancer |
LoadBalancer |
在多地址间选择下游 |
timeout |
number |
单次调用超时,毫秒,默认 5000 |
retry |
number |
内置重试简写,默认 0;按默认分类器执行保守重试 |
retryPolicy |
{ maxAttempts?, shouldRetry?, backoff? } |
高级重试配置,用于显式覆写默认分类器与退避策略 |
metadata |
object | (ctx) => object |
每次调用注入的 metadata |
headers |
Record<string, string> |
每次请求附加的 HTTP 请求头 |
默认 retry 边界:
- 默认只自动重试
query serverStream仅在尚未收到首个 chunk 的建链阶段错误时允许自动重试mutation、clientStream、bidiStream默认不自动重试- 一旦已经收到首字节或首个 chunk,不再自动重放
| 字段 | 类型 | 说明 |
|---|---|---|
path |
string |
RPC 挂载路径 |
router |
Router |
当前暴露的 router |
createContext |
(hostCtx) => Context |
构建 procedure 上下文 |
onError |
(info) => void |
统一错误处理钩子 |
auth |
(ctx) => void | never |
鉴权钩子,抛错即拒绝请求 |
policies |
PolicyHook[] |
策略链,可拒绝、短路或补充上下文字段 |
observability |
ObservabilityHook |
tracing / metrics / logging 生命周期钩子 |
错误分三类:
| 类型 | 来源 | 示例 |
|---|---|---|
| 业务错误 | procedure 主动抛出,带明确错误码 | 用户不存在、权限不足 |
| transport 错误 | 网络失败、超时、连接不可达 | 503、ECONNREFUSED |
| 框架错误 | schema 校验失败、上下文构建失败、未注册 procedure | 400 输入校验失败 |
统一错误结构:
{
code: string // 如 'NOT_FOUND' / 'UNAUTHORIZED' / 'TIMEOUT'
message: string
details?: unknown // 业务自定义扩展字段
retryable: boolean // client 是否可以自动重试
status: number // 对应 HTTP status,如 404 / 403 / 504
}在 procedure 中抛出业务错误:
import { RpcError } from 'vextjs-rpc'
resolve: async ({ input, ctx }) => {
const user = await ctx.services.user.findById(input.id)
if (!user) {
throw new RpcError('NOT_FOUND', `用户 ${input.id} 不存在`, { retryable: false })
}
return user
}metadata 函数在每次 client 调用时执行,将当前请求中的 trace 上下文转发给下游服务,形成完整的调用链路。
requestId 和 traceparent 由网关或入口中间件在请求入口处生成,以 HTTP 请求头形式传入当前服务(见各框架
createContext中的req.header('x-request-id'))。vextjs-rpc 负责转发这些值到下游调用,不负责生成。
// client 端:转发上游注入的 trace 上下文至下游
metadata: ({ requestId, traceparent }) => ({
requestId,
traceparent,
tenantId: process.env.TENANT_ID,
})// server 端:从 meta 读取
resolve: async ({ input, ctx, meta }) => {
ctx.app.logger.info('incoming rpc', {
requestId: meta.requestId,
traceparent: meta.traceparent,
})
// ...
}生产环境:若已有 Kubernetes / 服务网格(Istio、Linkerd 等),服务发现和实例级负载均衡由平台层负责,将地址配为 ClusterIP / Service DNS 即可,本模块内置策略不参与。
开发环境 / 无服务网格:可使用内置的最小化策略:
import { staticResolver, roundRobin } from 'vextjs-rpc'
// staticResolver:从静态配置表读取地址,支持多实例
resolver: staticResolver({
user: ['http://user-1:3000/rpc', 'http://user-2:3000/rpc'],
})
// roundRobin:轮询选一个
loadBalancer: roundRobin()如果后续需要接入 Nacos、Consul 或其他注册中心,可以实现 Resolver 接口接入,不需要修改 client 的主结构。
以下组件都提供默认实现。直接使用默认值即可;有特殊场景时,再按需替换对应组件。
| 组件 | 默认实现 | 用途 |
|---|---|---|
Transport |
transport/http |
统一 unary 与 streaming 的传输抽象 |
Codec |
codec/json |
统一请求/响应与流式消息编解码 |
Resolver |
staticResolver |
自定义服务地址解析 |
LoadBalancer |
roundRobin |
自定义负载策略 |
AuthHook |
auth/noop |
服务端统一鉴权 |
PolicyHook |
policy/passThrough |
拒绝、短路或补充上下文字段 |
ObservabilityHook |
observability/noop |
接入 tracing / metrics / logging |
以下能力不由 vextjs-rpc 提供:
- gRPC / Protobuf 主通道
- 浏览器端运行时
- 完整注册中心治理(Nacos / Consul 接入)
- 完整 IDL 编译链
- 分布式事务、熔断、限流
| 项目 | 取值 |
|---|---|
| 传输 | HTTP + JSON |
| schema 库 | schema-dsl(内置,唯一支持) |
| 调用模型 | unary + streaming |
| 宿主 | VextJS / Express / Koa / Fastify |
| 发现策略 | static resolver |
| 负载策略 | round-robin |
| Node.js 基线 | >= 18 |