Skip to content

vextjs/vextjs-rpc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

vextjs-rpc

面向 Node.js 服务端的轻量 RPC 模块,支持 unary 与 streaming 调用模型,优先支持 VextJS,同时保留 Express、Koa、Fastify 等框架的接入能力。

本文只说明模块的使用方式、默认行为和适用边界,不展开需求与设计过程。

目录

安装

npm install vextjs-rpc schema-dsl

子路径适配器随主包一起发布,无需额外安装。

适用场景

  • Node.js 服务之间需要统一 unary 与 streaming 调用方式,无论使用哪种框架。
  • 不想引入 gRPC/IDL 体系,但需要类型安全的跨服务调用。
  • 需要在应用层统一 timeout、retry 与错误码映射,并透传平台注入的 trace 上下文。
  • 需要在 VextJS、Express、Koa、Fastify 等框架之间共享同一套调用模型。

模块定位

vextjs-rpc 是应用层 RPC 框架,关注点是服务间的契约定义与类型安全调用

本模块负责:

  • 契约定义(router / procedure / schema-dsl 校验)
  • unary 与 streaming 调用模型(双向流、服务端流、客户端流)
  • HTTP + JSON 序列化与反序列化
  • 输入/输出 schema 校验与统一错误模型
  • context 构建与 procedure 调用链路
  • 框架适配(VextJS / Express / Koa / Fastify handler 与 client)
  • 客户端侧 timeout 与重试
  • 透传平台注入的 trace 上下文(转发 requestId / traceparent,不负责生成)
  • 服务端鉴权钩子(auth hook,应用层逻辑)

以下由网关、服务网格或平台层负责,不在本模块范围内:

能力 典型实现
服务注册与发现 Nacos / Consul / Kubernetes Service
实例级负载均衡 服务网格(Istio / Linkerd)/ L7 网关
熔断与限流 Sentinel / 网格流量策略
mTLS 加密传输 服务网格 sidecar
trace context 生成(requestId / traceparent) API 网关 / 入口中间件
健康检查与流量摘除 Kubernetes 探针 / 注册中心

内置的 staticResolver + roundRobin 仅供开发环境无服务网格的简单部署使用;生产环境若已有服务网格,只需将地址配为 mesh 提供的稳定 ClusterIP / Service DNS,无需依赖内置负载均衡策略。

核心概念

概念 作用
router 把多个 procedure 组织成一个模块
procedure 描述一个远端方法:输入 schema、输出 schema、处理函数
stream 描述一个流式方法:持续输入、持续输出,或双向持续收发
handler 把 router 挂到具体宿主框架
client 从调用方访问远端 procedure
resolver 决定 client 如何找到下游服务地址
loadBalancer 在多个地址之间选择下游实例

定义共享契约

schema 通过内置的 schema-dsl 定义,这是本模块唯一的校验库。同一份 router 可以同时被服务端和客户端引用,不需要手写两份类型。

下面先用 query / mutation 展示基础契约定义;streaming 与它共享同一套 router、context、metadata 和错误模型。

query — 读操作

// contracts/user.ts
import { createRouter, query } from 'vextjs-rpc'
import { dsl } from 'schema-dsl'

export const userRouter = createRouter({
  getById: query({
    input: dsl({ id: 'string!' }),
    output: dsl({ id: 'string!', name: 'string!', email: 'email!' }),
    resolve: async ({ input, ctx }) => {
      return ctx.services.user.findById(input.id)
    },
  }),

  list: query({
    input: dsl({ page: 'integer:1-!', pageSize: 'integer:1-100' }),
    output: dsl({ total: 'integer!', items: 'array!' }),
    resolve: async ({ input, ctx }) => {
      return ctx.services.user.list(input)
    },
  }),
})

mutation — 写操作

// contracts/user.ts(续)
import { createRouter, query, mutation } from 'vextjs-rpc'
import { dsl } from 'schema-dsl'

export const userRouter = createRouter({
  // ...query 略

  create: mutation({
    input: dsl({
      name: 'string:1-64!',
      email: 'email!',
      role: 'admin|user|guest',
    }),
    output: dsl({ id: 'string!' }),
    resolve: async ({ input, ctx }) => {
      return ctx.services.user.create(input)
    },
  }),

  update: mutation({
    input: dsl({
      id: 'string!',
      name: 'string:1-64',
      email: 'email',
    }),
    output: dsl({ ok: 'boolean!' }),
    resolve: async ({ input, ctx }) => {
      await ctx.services.user.update(input.id, input)
      return { ok: true }
    },
  }),
})

namespace — 多 router 组合

当一个服务对外暴露多个业务模块时,可以把多个 router 合并:

// contracts/index.ts
import { createRouter } from 'vextjs-rpc'
import { userRouter } from './user'
import { orderRouter } from './order'

export const appRouter = createRouter({
  user: userRouter,
  order: orderRouter,
})

export type AppRouter = typeof appRouter

VextJS — 暴露 RPC 服务

在 VextJS 中,推荐把 vextjs-rpc 直接当成一个插件使用:同一个插件同时完成 app.rpc 挂载与 /rpc 服务端入口注册。

在 VextJS 中,推荐默认按以下方式接入:

  • 默认 /rpc 作为服务端入口路径
  • 默认复用 app.servicesrequest.userx-request-id / traceparent
  • 默认把调用入口挂到 app.rpc
  • 默认通过 app.fetch.create() 发起出站 RPC 请求
  • createContextauthpoliciesobservability 作为覆盖点,而不是每个项目都必须手写的大段样板

插件最简接入

// src/plugins/rpc.ts
import { rpcPlugin } from 'vextjs-rpc/vextjs'
import { staticResolver } from 'vextjs-rpc'
import { appRouter } from '../contracts'

export default rpcPlugin({
  router: appRouter,
  resolver: staticResolver({
    user:  [process.env.USER_RPC_URL  ?? 'http://user-service:3000/rpc'],
    order: [process.env.ORDER_RPC_URL ?? 'http://order-service:3000/rpc'],
  }),
})

最简插件默认完成:

  • 将 client 挂到 app.rpc
  • 在当前服务内注册 /rpc 入口
  • context 默认复用 apprequestapp.servicesrequest.userx-request-id / traceparent
  • 出站调用默认复用 app.fetch.create()
  • 错误日志默认走 app.logger

按需收敛插件行为

// src/plugins/rpc.ts
import { rpcPlugin } from 'vextjs-rpc/vextjs'
import { staticResolver } from 'vextjs-rpc'
import { appRouter } from '../contracts'

export default rpcPlugin({
  router: appRouter,
  resolver: staticResolver({
    user: [process.env.USER_RPC_URL ?? 'http://user-service:3000/rpc'],
  }),
  path: '/internal/rpc',
  createContext: async ({ app, request }) => ({
    app,
    request,
    services: app.services,
    auth: request.user,
    requestId: request.headers['x-request-id'],
    tenantId: request.headers['x-tenant-id'],
  }),
  auth: (ctx) => {
    if (!ctx.auth) throw new Error('UNAUTHORIZED')
  },
  policies: [
    (ctx) => {
      if (!ctx.tenantId) throw new Error('TENANT_REQUIRED')
    },
  ],
  onError: ({ error, ctx }) => {
    ctx?.app.logger.error('rpc error', { error })
  },
})

如果你只想保留单侧能力:

// 只作为调用方
export default rpcPlugin({
  resolver: staticResolver({ user: ['http://user-service:3000/rpc'] }),
  server: false,
})

// 只作为服务提供方
export default rpcPlugin({
  router: appRouter,
  client: false,
})

只有在你需要显式接管服务端入口注册时,才建议退回 mountVextRpc()createVextRpcClient() 这两个低阶 API。


VextJS — 调用远端服务

插件挂载后直接调用

// src/services/order.ts
export class OrderService {
  constructor(private app: VextApp) {}

  async createOrder(userId: string, items: OrderItem[]) {
    const user = await this.app.rpc.user.getById({ id: userId })
    return this.app.rpc.order.create({ userId: user.id, items })
  }
}

插件模式下默认完成:

  • client 已挂到 app.rpc
  • 默认负载策略使用 roundRobin
  • 默认透传当前请求上下文里的 requestId / traceparent
  • timeout、retry 使用模块默认值

低阶 API 仍可按需使用

// src/plugins/rpc.ts
import { definePlugin } from 'vextjs'
import { createVextRpcClient } from 'vextjs-rpc/vextjs'
import { staticResolver, roundRobin } from 'vextjs-rpc'
import type { AppRouter } from '../contracts'

export default definePlugin({
  name: 'rpc',

  setup(app) {
    app.extend(
      'rpc',
      createVextRpcClient<AppRouter>({
        app,
        resolver: staticResolver({
          user:  [process.env.USER_RPC_URL  ?? 'http://user-service:3000/rpc'],
          order: [process.env.ORDER_RPC_URL ?? 'http://order-service:3000/rpc'],
        }),
        loadBalancer: roundRobin(),
        timeout: 3000,
        retry: 1,
        metadata: ({ requestId, traceparent }) => ({
          requestId,
          traceparent,
          source: 'gateway',
        }),
      }),
    )
  },
})

这两个 API 主要用于:

  • 你想显式拆开 server / client 生命周期
  • 你想自己接管 /rpc 路径注册
  • 你不希望采用插件默认组合层

Express — 挂载 RPC 服务

示例中的 buildServices() 代表应用层提供的 service 注入函数(如 DI 容器的入口),Koa、Fastify 示例同理;VextJS 直接使用 app.services

import express from 'express'
import { createExpressRpcHandler } from 'vextjs-rpc/express'
import { appRouter } from './contracts'

const app = express()

app.use(express.json())

app.use(
  '/rpc',
  createExpressRpcHandler({
    router: appRouter,
    createContext: async ({ req, res }) => ({
      req,
      res,
      services: buildServices(),
      requestId: req.header('x-request-id'),
    }),
    onError: ({ error }) => {
      console.error('rpc error', error)
    },
  }),
)

app.listen(3000)

Koa — 挂载 RPC 服务

import Koa from 'koa'
import Router from '@koa/router'
import bodyParser from 'koa-bodyparser'
import { createKoaRpcHandler } from 'vextjs-rpc/koa'
import { appRouter } from './contracts'

const app = new Koa()
const router = new Router()

app.use(bodyParser())

router.use(
  '/rpc',
  createKoaRpcHandler({
    router: appRouter,
    createContext: async ({ ctx }) => ({
      ctx,
      services: buildServices(),
      requestId: ctx.get('x-request-id'),
    }),
    onError: ({ error }) => {
      console.error('rpc error', error)
    },
  }),
)

app.use(router.routes())
app.listen(3000)

Fastify — 挂载 RPC 服务

import Fastify from 'fastify'
import { createFastifyRpcPlugin } from 'vextjs-rpc/fastify'
import { appRouter } from './contracts'

const app = Fastify()

await app.register(createFastifyRpcPlugin, {
  prefix: '/rpc',
  router: appRouter,
  createContext: async ({ request }) => ({
    request,
    services: buildServices(),
    requestId: request.headers['x-request-id'],
  }),
  onError: ({ error }) => {
    app.log.error('rpc error', error)
  },
})

app.listen({ port: 3000 })

通用客户端

在非 VextJS 框架中,使用 createRpcClient 创建独立客户端实例:

// rpc.ts
import { createRpcClient, staticResolver, roundRobin } from 'vextjs-rpc'
import type { AppRouter } from './contracts'

export const rpc = createRpcClient<AppRouter>({
  resolver: staticResolver({
    user:  [process.env.USER_RPC_URL  ?? 'http://user-service:3000/rpc'],
    order: [process.env.ORDER_RPC_URL ?? 'http://order-service:3000/rpc'],
  }),
  loadBalancer: roundRobin(),
  timeout: 3000,
  retry: 1,
  metadata: ({ requestId, traceparent }) => ({
    requestId,
    traceparent,
  }),
})
// 业务代码中直接调用
import { rpc } from './rpc'

const user = await rpc.user.getById({ id: userId })
const order = await rpc.order.create({ userId: user.id, items })

配置项

client 配置

字段 类型 说明
resolver Resolver 解析目标服务地址
loadBalancer LoadBalancer 在多地址间选择下游
timeout number 单次调用超时,毫秒,默认 5000
retry number 内置重试简写,默认 0;按默认分类器执行保守重试
retryPolicy { maxAttempts?, shouldRetry?, backoff? } 高级重试配置,用于显式覆写默认分类器与退避策略
metadata object | (ctx) => object 每次调用注入的 metadata
headers Record<string, string> 每次请求附加的 HTTP 请求头

默认 retry 边界:

  • 默认只自动重试 query
  • serverStream 仅在尚未收到首个 chunk 的建链阶段错误时允许自动重试
  • mutationclientStreambidiStream 默认不自动重试
  • 一旦已经收到首字节或首个 chunk,不再自动重放

handler(服务端)配置

字段 类型 说明
path string RPC 挂载路径
router Router 当前暴露的 router
createContext (hostCtx) => Context 构建 procedure 上下文
onError (info) => void 统一错误处理钩子
auth (ctx) => void | never 鉴权钩子,抛错即拒绝请求
policies PolicyHook[] 策略链,可拒绝、短路或补充上下文字段
observability ObservabilityHook tracing / metrics / logging 生命周期钩子

错误模型

错误分三类:

类型 来源 示例
业务错误 procedure 主动抛出,带明确错误码 用户不存在、权限不足
transport 错误 网络失败、超时、连接不可达 503、ECONNREFUSED
框架错误 schema 校验失败、上下文构建失败、未注册 procedure 400 输入校验失败

统一错误结构:

{
  code: string        // 如 'NOT_FOUND' / 'UNAUTHORIZED' / 'TIMEOUT'
  message: string
  details?: unknown   // 业务自定义扩展字段
  retryable: boolean  // client 是否可以自动重试
  status: number      // 对应 HTTP status,如 404 / 403 / 504
}

在 procedure 中抛出业务错误:

import { RpcError } from 'vextjs-rpc'

resolve: async ({ input, ctx }) => {
  const user = await ctx.services.user.findById(input.id)
  if (!user) {
    throw new RpcError('NOT_FOUND', `用户 ${input.id} 不存在`, { retryable: false })
  }
  return user
}

metadata 与透传

metadata 函数在每次 client 调用时执行,将当前请求中的 trace 上下文转发给下游服务,形成完整的调用链路。

requestId 和 traceparent 由网关或入口中间件在请求入口处生成,以 HTTP 请求头形式传入当前服务(见各框架 createContext 中的 req.header('x-request-id'))。vextjs-rpc 负责转发这些值到下游调用,不负责生成。

// client 端:转发上游注入的 trace 上下文至下游
metadata: ({ requestId, traceparent }) => ({
  requestId,
  traceparent,
  tenantId: process.env.TENANT_ID,
})
// server 端:从 meta 读取
resolve: async ({ input, ctx, meta }) => {
  ctx.app.logger.info('incoming rpc', {
    requestId: meta.requestId,
    traceparent: meta.traceparent,
  })
  // ...
}

服务发现与负载均衡

生产环境:若已有 Kubernetes / 服务网格(Istio、Linkerd 等),服务发现和实例级负载均衡由平台层负责,将地址配为 ClusterIP / Service DNS 即可,本模块内置策略不参与。

开发环境 / 无服务网格:可使用内置的最小化策略:

import { staticResolver, roundRobin } from 'vextjs-rpc'

// staticResolver:从静态配置表读取地址,支持多实例
resolver: staticResolver({
  user: ['http://user-1:3000/rpc', 'http://user-2:3000/rpc'],
})

// roundRobin:轮询选一个
loadBalancer: roundRobin()

如果后续需要接入 Nacos、Consul 或其他注册中心,可以实现 Resolver 接口接入,不需要修改 client 的主结构。


可替换组件

以下组件都提供默认实现。直接使用默认值即可;有特殊场景时,再按需替换对应组件。

组件 默认实现 用途
Transport transport/http 统一 unary 与 streaming 的传输抽象
Codec codec/json 统一请求/响应与流式消息编解码
Resolver staticResolver 自定义服务地址解析
LoadBalancer roundRobin 自定义负载策略
AuthHook auth/noop 服务端统一鉴权
PolicyHook policy/passThrough 拒绝、短路或补充上下文字段
ObservabilityHook observability/noop 接入 tracing / metrics / logging

不提供的能力

以下能力不由 vextjs-rpc 提供:

  • gRPC / Protobuf 主通道
  • 浏览器端运行时
  • 完整注册中心治理(Nacos / Consul 接入)
  • 完整 IDL 编译链
  • 分布式事务、熔断、限流

默认口径

项目 取值
传输 HTTP + JSON
schema 库 schema-dsl(内置,唯一支持)
调用模型 unary + streaming
宿主 VextJS / Express / Koa / Fastify
发现策略 static resolver
负载策略 round-robin
Node.js 基线 >= 18

About

A lightweight Node.js inter-service communication module based on the JSON-RPC 2.0 protocol. It provides a simple API that allows services to make remote calls as if they were calling local functions, eliminating the need for complex proto files and offering out-of-the-box functionality.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors