Skip to the content.

Order 事件溯源:从 Controller 到核心层的完整示例

本文把仓库中的 PostgreSQL + Event Sourcing 示例串成一条完整调用链,重点说明:

1. 涉及的关键文件

Web / API

Domain / Core

Infrastructure / PostgreSQL


2. Controller 层:只处理 HTTP,不写业务规则

OrdersController 提供了 6 个接口:

Controller 的职责只有三件事:

  1. 接收 HTTP 参数
  2. 调用应用服务
  3. Either<Error, TResult> 转成 HTTP 响应

不直接访问数据库,也不直接写 Decide/Evolve 规则

以“创建订单”为例

控制器入口:

[HttpPost("{orderId:guid}/create")]
public async Task<IActionResult> Create(Guid orderId, [FromBody] CreateOrderRequest request)
    => Ok(await ExecuteAsync(() => _workflowAppService.CreateAsync(orderId, request, HttpContext.RequestAborted)));

可以看到它只是把:

传给 OrderWorkflowAppService


3. Application 层:把 HTTP 请求编排成领域命令

3.1 命令映射

OrderWorkflowAppService.CreateAsync 会把 Web 请求映射成领域命令:

new CreateOrder(orderId, request.CustomerId, request.Currency)

这一层负责“编排”,不负责业务真相。它做的事包括:

3.2 为什么要在这里触发 projector

在这个示例里,我把投影器放在应用层同步触发,是为了让 Controller 示例更完整:

这是一种演示型实现。未来如果你要改成标准 CQRS,可以把 projector 放到后台订阅器里异步执行。


4. Domain/Core 层:业务规则的真正落点

关键文件是:

4.1 Command

例如:

4.2 Event

例如:

4.3 State

OrderState 不是 ORM 实体,而是一个纯状态对象,包含:

4.4 Decide:决定产生什么事件

比如确认订单时:

也就是说:

决策结果不是“直接改数据库”,而是“决定接下来要追加哪些事件”。

4.5 Evolve:根据事件推进状态

例如:

这就是函数式事件溯源的核心:

state = history.Aggregate(initialState, Evolve)
newEvents = Decide(state, command)

5. Infrastructure 层:把事件写入 PostgreSQL

5.1 EventStoreDbContext

EventStoreDbContext 定义了 4 张核心表:

对应 SQL 在:

/home/runner/work/MS.Microservice/MS.Microservice/docs/postgresql/event-sourcing-order.sql

5.2 PostgresEventStore

PostgresEventStore.AppendToStreamAsync 的流程是:

  1. 查当前 stream 的最大版本号
  2. 检查是否等于 expectedVersion
  3. 把新事件序列化后追加到 event_store
  4. 如果 (stream_id, version) 唯一约束冲突,则抛并发异常

这里体现的就是 optimistic concurrency。

5.3 SnapshotStore

PostgresSnapshotStore 负责:

示例中 OrderCommandService 默认每 100 个事件打一次快照。

5.4 Projection

OrderReadModelProjectorevent_store 的全局位置往后拉取事件,并更新:

所以读侧查询不需要每次都扫全事件流。


6. 一次完整请求到底经历了什么

POST /api/v1/orders/{orderId}/items/add 为例:

Step 1:Controller

OrdersController.AddItem 收到请求。

Step 2:Application Service

OrderWorkflowAppService.AddItemAsync

Step 3:Command Service

OrderCommandService.HandleAsync

Step 4:Projector

应用层调用 OrderReadModelProjector.ProjectAsync

Step 5:返回给客户端

应用层再读取一次当前状态,返回:


7. 如何调用这些接口

假设订单号是:

3fa85f64-5717-4562-b3fc-2c963f66afa6

7.1 创建订单

POST /api/v1/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6/create
Content-Type: application/json

{
  "customerId": "cust-001",
  "currency": "CNY"
}

7.2 添加商品

POST /api/v1/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6/items/add
Content-Type: application/json

{
  "productId": "sku-apple",
  "unitPrice": 12.5,
  "quantity": 2
}

7.3 再添加一个商品

POST /api/v1/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6/items/add
Content-Type: application/json

{
  "productId": "sku-book",
  "unitPrice": 30,
  "quantity": 1
}

7.4 确认订单

POST /api/v1/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6/confirm

7.5 查询订单

GET /api/v1/orders/3fa85f64-5717-4562-b3fc-2c963f66afa6

返回结果里会包含:


8. 这个例子最重要的设计点

8.1 Controller 不写业务规则

规则都在 OrderAggregate.DecideOrderAggregate.Evolve

8.2 Application 只做编排

它负责把 HTTP 世界翻译成领域世界,而不是替代领域决策。

8.3 PostgreSQL 是 append-only event store

真正的事实保存在 event_storeorder_read_model 只是投影。

8.4 查询既可以走投影,也可以走重放

这个示例两者都展示了:


9. 后续怎么继续扩展

如果你要把这个示例正式产品化,下一步建议是:

  1. OrdersController 增加请求验证器
  2. OrderReadModelProjector 改成后台异步订阅器
  3. 增加订单列表查询接口(直接查 order_read_model
  4. 增加快照策略配置化
  5. 增加 event upcaster / schema version 兼容
  6. 增加集成测试,覆盖真实 PostgreSQL append + projection 流程

10. 一句话总结

这个例子里,控制器只接 HTTP,应用层只做编排,核心层只做 Decide/Evolve,基础设施层只负责把事件和投影可靠地落到 PostgreSQL。这正是事件溯源在这个仓库里的落地方式。