GoでActor Model入門(Proto.Actor)
はじめに
皆さんにゃっはろー。お元気ですか?
最近ホロライブにハマっているyanomです。最近のイチオシはぺこーらです!
可愛いので皆さんもぜひ見てみてください!
っと、全然脈絡はないですが、一応この記事はGo 3 Advent Calendar 2020 13日目の記事です。
Akka
最近仕事では、JavaでAkkaを使って非同期メッセージパッシングを使ったシステムを開発していたりします。Akkaは本当によくできており手軽に使う事ができるのでおすすめです。
ただ、今回はGoのアドベントカレンダーということで、GoでAkkaっぽいことができないか探していたところ、Proto.Actorというライブラリを見つけたので、簡単に入門をやってみました。
Proto.Actor
Akka.Netの開発メンバだった方が、開発しているということで使い方はかなり似ています。
また、言語はGoの他にC#版もあるみたいです。
(GoのチャネルとActorについての違いについてはここに詳しく書いてあります。)
Hallo Proto.Actor
まずはチュートリアルから。
環境構築などは、gitに書いてあるのでそちらを参照してください。
type Hello struct{ Who string } type HelloActor struct{} func (state *HelloActor) Receive(context actor.Context) { switch msg := context.Message().(type) { case Hello: fmt.Printf("Hello %v\n", msg.Who) } } func main() { context := actor.EmptyRootContext props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} }) pid, err := context.Spawn(props) if err != nil { panic(err) } context.Send(pid, Hello{Who: "Roger"}) console.ReadLine() }
実行すると、こんな感じでHelloActorに対して、Rogerというメッセージが送信されていることがわかります。
yanom@DESKTOP-0VPGPUH:~/go/src/proto-actor$ ./proto-actor Hello Roger
backpressure
Actorモデルでは各Actorがmailboxを持っており、Actorはそこからひとつずつメッセージを取り出して処理を実行しています。
Proto.Actorではそのmailboxに振る舞いを簡単に定義できるようになっています。
これを使ってバックプレッシャーを実現することが可能となっています。
package main import ( "log" "sync/atomic" "time" console "github.com/AsynkronIT/goconsole" "github.com/AsynkronIT/protoactor-go/actor" "github.com/AsynkronIT/protoactor-go/mailbox" ) // sent to producer to request more work type requestMoreWork struct { items int } type requestWorkBehavior struct { tokens int64 producer *actor.PID } func (m *requestWorkBehavior) MailboxStarted() { log.Println("mailboxstarted...") m.requestMore() } func (m *requestWorkBehavior) MessagePosted(msg interface{}) { log.Println("posted...") } func (m *requestWorkBehavior) MessageReceived(msg interface{}) { log.Println("messagereceived...") atomic.AddInt64(&m.tokens, -1) if m.tokens == 0 { m.requestMore() } } func (m *requestWorkBehavior) MailboxEmpty() { log.Println("empty...") } func (m *requestWorkBehavior) requestMore() { log.Println("Requesting more tokens") m.tokens = 5 system.Root.Send(m.producer, &requestMoreWork{items: 5}) } type producer struct { requestedWork int producedWork int worker *actor.PID } func (p *producer) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *actor.Started: log.Println("started...") // spawn our worker workerProps := actor.PropsFromProducer(func() actor.Actor { return &worker{} }) mb := mailbox.Unbounded(&requestWorkBehavior{ producer: ctx.Self(), }) p.worker = ctx.Spawn(workerProps.WithMailbox(mb)) case *requestMoreWork: p.requestedWork += msg.items log.Println("Producer got a new work request") ctx.Send(ctx.Self(), &produce{}) case *produce: // produce more work log.Println("Producer is producing work") p.producedWork++ ctx.Send(p.worker, &work{p.producedWork}) // decrease our workload and tell ourselves to produce more work if p.requestedWork > 0 { log.Println("send produce") p.requestedWork-- ctx.Send(ctx.Self(), &produce{}) } } } type produce struct{} type worker struct{} func (w *worker) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *work: log.Printf("Worker is working %+v", msg) time.Sleep(100 * time.Millisecond) } } type work struct { id int } var system = actor.NewActorSystem() func main() { producerProps := actor.PropsFromProducer(func() actor.Actor { return &producer{} }) system.Root.Spawn(producerProps) _, _ = console.ReadLine() }
実行してみるとわかりますが、workerが一定数処理を終わるまで、producerは次の処理を投げないように制御できていることが分かります。
他にも色々サンプルが揃っているので興味がある方はぜひ覗いてみてください。
まとめ
非同期メッセージパッシングが簡単にできるようになるProto.Actorを紹介しました。
Go版はまだベータ版ということで変わる可能性がありますが、その手軽さはAkka以上だと感じました。
また、実際の開発では考慮が必要になるであろう、バックプレッシャーについても手軽に実現できるようになっているので、そこも素晴らしいと思います。今後も色々遊んでみたいと思います。
あと、久しぶりにGoを書きましたが、やっぱり最高です!