Golang 的 Actor 模型

2024/07/01 19:35 pm posted in  Golang

说来也巧,作为多年的 Golang Guy,最近写了一点 Swift,为了研究 Swift 如何可以像 Golang 一样写出简单优雅的还不影响主进程的并发逻辑,突然被 Swift 的 Actor 吸引住了。

虽然很早就知道 Actor 模型,但才疏学浅,很多细节分不清,深入了解收获颇深。

什么是 Actor 模型

先从 Swift 说起,Actor 是 5.5 引入一种安全的并发模型,actor 是一个可以用来定义结构的关键字,而使用该关键字定义出来的结构自动符合 Actor 协议。

实现了 Actor 协议的结构是一种特殊的类,有诸多限制也有其特殊能力,从使用的视角看来,就是外部的访问必须使用 await 异步完成。如下面的代码:

actor User {
    var score = 10

    func printScore() {
        print("My score is (score)")
    }

    func copyScore(from other: User) async {
        score = await other.score
    }
}

let actor1 = User()
let actor2 = User()

await print(actor1.score)
await actor1.copyScore(from: actor2)

但其实看起来只是异步执行的函数,Swift 帮我们做了很多事情,而这些事情就可以一窥 Actor 模型的本质:

  1. 每个 Actor 对象都有一个收件箱,异步的访问请求会进入收件箱,然后串行执行
  2. 不允许从外部写入内部属性,无论有没有 await
  3. 每次只有一段代码可以访问 Actor 的内部属性,也就是说内部属性没有竞态条件

是不是很熟悉,Actor 模型是一个天然的「用通讯共享内存」。

Actor 模型的定义

『不要通过共享内存来通信,应该使用通信来共享内存』,这是在 Golang 社区很有名的并发设计哲学,加上 Golang 的影响力,以至于让大家误认为 Golang 才是该哲学的首创。

实则不然,更为古老的 Erlang 语言也遵循了同样的设计,然而这两者在具体实现上其实有一些不同,其中前者使用通信顺序进程(Communication Sequential Process,CSP),而后者使用 Actor 模型进行设计。一定程度上讲,两家也算师出同门。

事实上,Actor 是个出自学术派的比较早古的模型,据说灵感来自物理学。和同样出自学术派的 Lisp 一样,充满魅力的同时也有一堆初看苦涩难懂的概念。这些定义可以精简为:

  1. 一切皆是 Actor(同理于面向对象编程中的一切皆是对象)
  2. Actor 可以拥有一个用于接收消息的邮箱(邮箱本身也是一个 Actor)
  3. 接收到消息的 Actor 实体可以:
    1. 向其他 Actor 发数量有限的消息
    2. 创建数量有限的新 Actor
    3. 指定要用于接收的下一条消息的行为
  4. 第3点定义的操作均可以安全的并行执行

Actor 模型的启示

作为事件驱动爱好者,很久之前我在思考最小可执行模块的问题,受 FaaS 的影响,我一度觉得事件 + 函数就是一个完美的模型,并在很多地方使用 Eventbus + hook 函数进行组织逻辑。并做了一个简单的基于 Topic 的 Golang Eventbus:https://github.com/hyponet/eventbus

package main

import (
	"fmt"
	"github.com/hyponet/eventbus/bus"
)

func bobDoSomething() {
	fmt.Println("Bob do something")
}

func aliceDoSomething() {
	fmt.Println("Alice do something")
}

func main() {
	_, _ = bus.Subscribe("partner.bob.do", bobDoSomething)
	_, _ = bus.Subscribe("partner.alice.do", aliceDoSomething)
	bus.Publish("partner.*.do")
}

但后期我发现内部状态同步和并发控制是个问题,如果函数存在一个需要内部维护的状态,纯函数表现力就会变弱。通过引入结构体,并将结构体的函数注册到 Eventbus 上,则存在同结构体的多个函数并行执行的竞态条件问题。

并且无论是基于 Topic 的消息,或者直接使用 channel,消息都是单向流动的,执行结果返回的实现相对复杂,尤其是多生产者多消费者模型时。

而 Actor 模型能解决这些问题,Actor 实例封装自己的状态,并与其他 Actor 隔离。Actor 模型内部状态由自己维护,通过消息传递修改状态,避免锁和内存原子性问题。其设计天然支持负载均衡和水平扩容,但同时又引入了 Actor 生命周期管理、集合操作等挑战。

当然 Golang 已经具备了非常优秀的并发设计(CSP),但是 Actor 仍然是一个理论完整的补充,CSP 和 Actor 的讨论可见:https://www.quora.com/How-are-Akka-actors-different-from-Go-channels-How-are-two-related-to-each-other,不做引战。

protoactor-go

理论是枯燥的,在好奇之下,了解到 Golang 也有自己的 Actor 框架:protoactor-go,正是因为早期实现 eventbus 的经历,在阅读了 protoactor-go 代码之后,深深的陷入了城北徐公之美。

核心概念

虽然 protoactor-go 支持的特性挺多,但其核心的概念并不复杂,紧紧围绕这 Actor 模型,整理如下:

Name Desc Notes
actor actor 实体,具体的业务功能
props 自定义 actor 的模版
context 执行上下文,并承担 actor 操作 更像是 contenxt + actor wrapper
mailbox 收件箱,将消息串行丢给 actor actor 操作需要通过 context
process 运行时结构,用来描述一个运行中的 actor 不一定是真的进程
dispatcher 执行抽象,用来管理 mailbox 的消息处理 默认提供同步、异步两种

Hello World

官方仓库的 Hello World 示例基本涵盖了核心概念,使用的基本流程就是:

  1. 定义 Actor
  2. 基于 Props 创建 Actor
  3. 通过 Context 发送消息
type Hello struct{ Who string }
type SetBehaviorActor struct{}

func (state *SetBehaviorActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
        context.SetBehavior(state.Other)
    }
}

func (state *SetBehaviorActor) Other(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("%v, ey we are now handling messages in another behavior", msg.Who)
    }
}

func NewSetBehaviorActor() actor.Actor {
    return &SetBehaviorActor{}
}

func main() {
    context := actor.EmptyRootContext
    props := actor.PropsFromProducer(NewSetBehaviorActor)
    pid, err := context.Spawn(props)
    if err != nil {
        panic(err)
    }
    context.Send(pid, Hello{Who: "Roger"})
    context.Send(pid, Hello{Who: "Roger"})
    console.ReadLine()
}

消息处理的工作流程

protoactor-go 整体采用消息驱动设计,并且区分了系统消息和用户消息,系统消息用来控制流程,比如暂停/恢复/重启一个 Actor,用户消息则传递给用户编写的业务 Actor 处理。

为了将消息飞轮转起来,protoactor-go 整体本质上还是一个 sub/pub 设计模型,pub 方就是调用 context 的 send 方法的部分(参考上文的 Hello Roger),而 sub 就是用户自定义的 Actor(参考上文的 SetBehaviorActor)。

Mailbox

为了打通 pub 和 sub,核心角色就是 Mailbox。Mailbox 起到两个作用:

  1. 缓存消息,承当 Message Queue
  2. 调度消息,通过 dispatcher,调用 context 提供的 Invoke 方法。

MailBox 处理未读消息,processMessages 是注册给 dispatcher 的 handler,当收到消息后执行 run 方法,直到队列为空。

func (m *defaultMailbox) processMessages() {
process:
	m.run()

	// set mailbox to idle
	atomic.StoreInt32(&m.schedulerStatus, idle)
	sys := atomic.LoadInt32(&m.sysMessages)
	user := atomic.LoadInt32(&m.userMessages)
	// check if there are still messages to process (sent after the message loop ended)
	if sys > 0 || (atomic.LoadInt32(&m.suspended) == 0 && user > 0) {
		// try setting the mailbox back to running
		if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
			//	fmt.Printf("looping %v %v %v\n", sys, user, m.suspended)
			goto process
		}
	}

	if user == 0 && (atomic.LoadInt32(&m.suspended) == 0) {
		for _, ms := range m.middlewares {
			ms.MailboxEmpty()
		}
	}
}

MailBox 的执行逻辑,有一个简单的吞吐限制,实现非常有趣。先处理系统消息,再处理用户消息。其中 invoker 是 context 实现的某个 interface,这样的 interface 很多,这也是我觉得 context 更像 actor wrapper 的原因。

func (m *defaultMailbox) run() {
	var msg interface{}

	defer func() {
		if r := recover(); r != nil {
			m.invoker.EscalateFailure(r, msg)
		}
	}()

	i, t := 0, m.dispatcher.Throughput()
	for {
		if i > t {
			i = 0
			runtime.Gosched()
		}

		i++

		// keep processing system messages until queue is empty
		if msg = m.systemMailbox.Pop(); msg != nil {
			atomic.AddInt32(&m.sysMessages, -1)
			switch msg.(type) {
			case *SuspendMailbox:
				atomic.StoreInt32(&m.suspended, 1)
			case *ResumeMailbox:
				atomic.StoreInt32(&m.suspended, 0)
			default:
				m.invoker.InvokeSystemMessage(msg)
			}
			for _, ms := range m.middlewares {
				ms.MessageReceived(msg)
			}
			continue
		}

		// didn't process a system message, so break until we are resumed
		if atomic.LoadInt32(&m.suspended) == 1 {
			return
		}

		if msg = m.userMailbox.Pop(); msg != nil {
			atomic.AddInt32(&m.userMessages, -1)
			m.invoker.InvokeUserMessage(msg)
			for _, ms := range m.middlewares {
				ms.MessageReceived(msg)
			}
		} else {
			return
		}
	}
}

dispatcher

dispatcher 内置了两种实现,分别来控制处理消息时是同步还是异步,这个在并发同步的时候很有用,消息的发送者可以确保当消息发送完成后,消息的接受者也已经处理完了。巧合的是我的 EventBus 也是有相同的设计,但是确实 protoactor-go 的抽象更好。

type goroutineDispatcher int

var _ Dispatcher = goroutineDispatcher(0)

func (goroutineDispatcher) Schedule(fn func()) {
	go fn()
}

func (d goroutineDispatcher) Throughput() int {
	return int(d)
}

func NewDefaultDispatcher(throughput int) Dispatcher {
	return goroutineDispatcher(throughput)
}

type synchronizedDispatcher int

var _ Dispatcher = synchronizedDispatcher(0)

func (synchronizedDispatcher) Schedule(fn func()) {
	fn()
}

func (d synchronizedDispatcher) Throughput() int {
	return int(d)
}

func NewSynchronizedDispatcher(throughput int) Dispatcher {
	return synchronizedDispatcher(throughput)
}

Context

context 虽然看起来是执行上下文,但却是执行大管家。这里只讨论处理消息的逻辑 processMessage,可以看到,真正的入口是 ctx.defaultReceive

func (ctx *actorContext) processMessage(m interface{}) {
	if ctx.props.receiverMiddlewareChain != nil {
		ctx.props.receiverMiddlewareChain(ctx.ensureExtras().context, WrapEnvelope(m))

		return
	}

	if ctx.props.contextDecoratorChain != nil {
		ctx.ensureExtras().context.Receive(WrapEnvelope(m))

		return
	}

	ctx.messageOrEnvelope = m
	ctx.defaultReceive()
	ctx.messageOrEnvelope = nil // release message
}

defaultReceive 还可以看到各种 Hook 和基于消息的流程控制,这些控制流程在不同阶段均有渗透,作者考虑的非常全面。

func (ctx *actorContext) defaultReceive() {
	switch msg := ctx.Message().(type) {
	case *PoisonPill:
		ctx.Stop(ctx.self)

	case AutoRespond:
		if ctx.props.contextDecoratorChain != nil {
			ctx.actor.Receive(ctx.ensureExtras().context)
		} else {
			ctx.actor.Receive(ctx)
		}

		res := msg.GetAutoResponse(ctx)
		ctx.Respond(res)

	default:
		// are we using decorators, if so, ensure it has been created
		if ctx.props.contextDecoratorChain != nil {
			ctx.actor.Receive(ctx.ensureExtras().context)

			return
		}

		ctx.actor.Receive(ctx)
	}
}

总结来说 context 的处理消息的逻辑非常精简:

  1. 各种 Hook
  2. 流程控制
  3. 调用用户自行实现的 Receive 方法

以上就是从发送消息到处理消息的流程,protoactor-go 的代码质量很高,而且充满小巧思和并发技巧,推荐大家有时间阅读一下。

参考: