说来也巧,作为多年的 Golang Guy,最近写了一点 Swift,为了研究 Swift 如何可以像 Golang 一样写出简单优雅的还不影响主进程的并发逻辑,突然被 Swift 的 Actor 吸引住了。
虽然很早就知道 Actor 模型,但才疏学浅,很多细节分不清,深入了解收获颇深。
什么是 Actor 模型
先从 Swift 说起,Actor 是 5.5 引入一种安全的并发模型,actor
是一个可以用来定义结构的关键字,而使用该关键字定义出来的结构自动符合 Actor 协议。
实现了 Actor 协议的结构是一种特殊的类,有诸多限制也有其特殊能力,从使用的视角看来,就是外部的访问必须使用 await
异步完成。如下面的代码:
actor User {
var score = 10
func printScore() {
print("My score is (score)")
}
func copyScore(from other: User) async {
score = await other.score
}
}
let actor1 = User()
let actor2 = User()
await print(actor1.score)
await actor1.copyScore(from: actor2)
但其实看起来只是异步执行的函数,Swift 帮我们做了很多事情,而这些事情就可以一窥 Actor 模型的本质:
- 每个 Actor 对象都有一个收件箱,异步的访问请求会进入收件箱,然后串行执行
- 不允许从外部写入内部属性,无论有没有
await
- 每次只有一段代码可以访问 Actor 的内部属性,也就是说内部属性没有竞态条件
是不是很熟悉,Actor 模型是一个天然的「用通讯共享内存」。
Actor 模型的定义
『不要通过共享内存来通信,应该使用通信来共享内存』,这是在 Golang 社区很有名的并发设计哲学,加上 Golang 的影响力,以至于让大家误认为 Golang 才是该哲学的首创。
实则不然,更为古老的 Erlang 语言也遵循了同样的设计,然而这两者在具体实现上其实有一些不同,其中前者使用通信顺序进程(Communication Sequential Process,CSP),而后者使用 Actor 模型进行设计。一定程度上讲,两家也算师出同门。
事实上,Actor 是个出自学术派的比较早古的模型,据说灵感来自物理学。和同样出自学术派的 Lisp 一样,充满魅力的同时也有一堆初看苦涩难懂的概念。这些定义可以精简为:
- 一切皆是 Actor(同理于面向对象编程中的一切皆是对象)
- Actor 可以拥有一个用于接收消息的邮箱(邮箱本身也是一个 Actor)
- 接收到消息的 Actor 实体可以:
- 向其他 Actor 发数量有限的消息
- 创建数量有限的新 Actor
- 指定要用于接收的下一条消息的行为
- 第3点定义的操作均可以安全的并行执行
Actor 模型的启示
作为事件驱动爱好者,很久之前我在思考最小可执行模块的问题,受 FaaS 的影响,我一度觉得事件 + 函数就是一个完美的模型,并在很多地方使用 Eventbus + hook 函数进行组织逻辑。并做了一个简单的基于 Topic 的 Golang Eventbus:https://github.com/hyponet/eventbus。
package main
import (
"fmt"
"github.com/hyponet/eventbus/bus"
)
func bobDoSomething() {
fmt.Println("Bob do something")
}
func aliceDoSomething() {
fmt.Println("Alice do something")
}
func main() {
_, _ = bus.Subscribe("partner.bob.do", bobDoSomething)
_, _ = bus.Subscribe("partner.alice.do", aliceDoSomething)
bus.Publish("partner.*.do")
}
但后期我发现内部状态同步和并发控制是个问题,如果函数存在一个需要内部维护的状态,纯函数表现力就会变弱。通过引入结构体,并将结构体的函数注册到 Eventbus 上,则存在同结构体的多个函数并行执行的竞态条件问题。
并且无论是基于 Topic 的消息,或者直接使用 channel,消息都是单向流动的,执行结果返回的实现相对复杂,尤其是多生产者多消费者模型时。
而 Actor 模型能解决这些问题,Actor 实例封装自己的状态,并与其他 Actor 隔离。Actor 模型内部状态由自己维护,通过消息传递修改状态,避免锁和内存原子性问题。其设计天然支持负载均衡和水平扩容,但同时又引入了 Actor 生命周期管理、集合操作等挑战。
当然 Golang 已经具备了非常优秀的并发设计(CSP),但是 Actor 仍然是一个理论完整的补充,CSP 和 Actor 的讨论可见:https://www.quora.com/How-are-Akka-actors-different-from-Go-channels-How-are-two-related-to-each-other,不做引战。
protoactor-go
理论是枯燥的,在好奇之下,了解到 Golang 也有自己的 Actor 框架:protoactor-go,正是因为早期实现 eventbus 的经历,在阅读了 protoactor-go 代码之后,深深的陷入了城北徐公之美。
核心概念
虽然 protoactor-go 支持的特性挺多,但其核心的概念并不复杂,紧紧围绕这 Actor 模型,整理如下:
Name | Desc | Notes |
---|---|---|
actor | actor 实体,具体的业务功能 | |
props | 自定义 actor 的模版 | |
context | 执行上下文,并承担 actor 操作 | 更像是 contenxt + actor wrapper |
mailbox | 收件箱,将消息串行丢给 actor | actor 操作需要通过 context |
process | 运行时结构,用来描述一个运行中的 actor | 不一定是真的进程 |
dispatcher | 执行抽象,用来管理 mailbox 的消息处理 | 默认提供同步、异步两种 |
Hello World
官方仓库的 Hello World 示例基本涵盖了核心概念,使用的基本流程就是:
- 定义 Actor
- 基于 Props 创建 Actor
- 通过 Context 发送消息
type Hello struct{ Who string }
type SetBehaviorActor struct{}
func (state *SetBehaviorActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
context.SetBehavior(state.Other)
}
}
func (state *SetBehaviorActor) Other(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("%v, ey we are now handling messages in another behavior", msg.Who)
}
}
func NewSetBehaviorActor() actor.Actor {
return &SetBehaviorActor{}
}
func main() {
context := actor.EmptyRootContext
props := actor.PropsFromProducer(NewSetBehaviorActor)
pid, err := context.Spawn(props)
if err != nil {
panic(err)
}
context.Send(pid, Hello{Who: "Roger"})
context.Send(pid, Hello{Who: "Roger"})
console.ReadLine()
}
消息处理的工作流程
protoactor-go 整体采用消息驱动设计,并且区分了系统消息和用户消息,系统消息用来控制流程,比如暂停/恢复/重启一个 Actor,用户消息则传递给用户编写的业务 Actor 处理。
为了将消息飞轮转起来,protoactor-go 整体本质上还是一个 sub/pub 设计模型,pub 方就是调用 context 的 send 方法的部分(参考上文的 Hello Roger),而 sub 就是用户自定义的 Actor(参考上文的 SetBehaviorActor)。
Mailbox
为了打通 pub 和 sub,核心角色就是 Mailbox。Mailbox 起到两个作用:
- 缓存消息,承当 Message Queue
- 调度消息,通过 dispatcher,调用 context 提供的 Invoke 方法。
MailBox 处理未读消息,processMessages
是注册给 dispatcher
的 handler,当收到消息后执行 run
方法,直到队列为空。
func (m *defaultMailbox) processMessages() {
process:
m.run()
// set mailbox to idle
atomic.StoreInt32(&m.schedulerStatus, idle)
sys := atomic.LoadInt32(&m.sysMessages)
user := atomic.LoadInt32(&m.userMessages)
// check if there are still messages to process (sent after the message loop ended)
if sys > 0 || (atomic.LoadInt32(&m.suspended) == 0 && user > 0) {
// try setting the mailbox back to running
if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
// fmt.Printf("looping %v %v %v\n", sys, user, m.suspended)
goto process
}
}
if user == 0 && (atomic.LoadInt32(&m.suspended) == 0) {
for _, ms := range m.middlewares {
ms.MailboxEmpty()
}
}
}
MailBox 的执行逻辑,有一个简单的吞吐限制,实现非常有趣。先处理系统消息,再处理用户消息。其中 invoker
是 context 实现的某个 interface,这样的 interface 很多,这也是我觉得 context 更像 actor wrapper 的原因。
func (m *defaultMailbox) run() {
var msg interface{}
defer func() {
if r := recover(); r != nil {
m.invoker.EscalateFailure(r, msg)
}
}()
i, t := 0, m.dispatcher.Throughput()
for {
if i > t {
i = 0
runtime.Gosched()
}
i++
// keep processing system messages until queue is empty
if msg = m.systemMailbox.Pop(); msg != nil {
atomic.AddInt32(&m.sysMessages, -1)
switch msg.(type) {
case *SuspendMailbox:
atomic.StoreInt32(&m.suspended, 1)
case *ResumeMailbox:
atomic.StoreInt32(&m.suspended, 0)
default:
m.invoker.InvokeSystemMessage(msg)
}
for _, ms := range m.middlewares {
ms.MessageReceived(msg)
}
continue
}
// didn't process a system message, so break until we are resumed
if atomic.LoadInt32(&m.suspended) == 1 {
return
}
if msg = m.userMailbox.Pop(); msg != nil {
atomic.AddInt32(&m.userMessages, -1)
m.invoker.InvokeUserMessage(msg)
for _, ms := range m.middlewares {
ms.MessageReceived(msg)
}
} else {
return
}
}
}
dispatcher
dispatcher 内置了两种实现,分别来控制处理消息时是同步还是异步,这个在并发同步的时候很有用,消息的发送者可以确保当消息发送完成后,消息的接受者也已经处理完了。巧合的是我的 EventBus 也是有相同的设计,但是确实 protoactor-go 的抽象更好。
type goroutineDispatcher int
var _ Dispatcher = goroutineDispatcher(0)
func (goroutineDispatcher) Schedule(fn func()) {
go fn()
}
func (d goroutineDispatcher) Throughput() int {
return int(d)
}
func NewDefaultDispatcher(throughput int) Dispatcher {
return goroutineDispatcher(throughput)
}
type synchronizedDispatcher int
var _ Dispatcher = synchronizedDispatcher(0)
func (synchronizedDispatcher) Schedule(fn func()) {
fn()
}
func (d synchronizedDispatcher) Throughput() int {
return int(d)
}
func NewSynchronizedDispatcher(throughput int) Dispatcher {
return synchronizedDispatcher(throughput)
}
Context
context 虽然看起来是执行上下文,但却是执行大管家。这里只讨论处理消息的逻辑 processMessage
,可以看到,真正的入口是 ctx.defaultReceive
:
func (ctx *actorContext) processMessage(m interface{}) {
if ctx.props.receiverMiddlewareChain != nil {
ctx.props.receiverMiddlewareChain(ctx.ensureExtras().context, WrapEnvelope(m))
return
}
if ctx.props.contextDecoratorChain != nil {
ctx.ensureExtras().context.Receive(WrapEnvelope(m))
return
}
ctx.messageOrEnvelope = m
ctx.defaultReceive()
ctx.messageOrEnvelope = nil // release message
}
defaultReceive
还可以看到各种 Hook 和基于消息的流程控制,这些控制流程在不同阶段均有渗透,作者考虑的非常全面。
func (ctx *actorContext) defaultReceive() {
switch msg := ctx.Message().(type) {
case *PoisonPill:
ctx.Stop(ctx.self)
case AutoRespond:
if ctx.props.contextDecoratorChain != nil {
ctx.actor.Receive(ctx.ensureExtras().context)
} else {
ctx.actor.Receive(ctx)
}
res := msg.GetAutoResponse(ctx)
ctx.Respond(res)
default:
// are we using decorators, if so, ensure it has been created
if ctx.props.contextDecoratorChain != nil {
ctx.actor.Receive(ctx.ensureExtras().context)
return
}
ctx.actor.Receive(ctx)
}
}
总结来说 context 的处理消息的逻辑非常精简:
- 各种 Hook
- 流程控制
- 调用用户自行实现的
Receive
方法
以上就是从发送消息到处理消息的流程,protoactor-go 的代码质量很高,而且充满小巧思和并发技巧,推荐大家有时间阅读一下。
参考: