yanom blog

様々な技術について書きます

GoでActor Model入門(Proto.Actor)

はじめに

皆さんにゃっはろー。お元気ですか?
最近ホロライブにハマっているyanomです。最近のイチオシはぺこーらです!
可愛いので皆さんもぜひ見てみてください! っと、全然脈絡はないですが、一応この記事はGo 3 Advent Calendar 2020 13日目の記事です。

Akka

最近仕事では、JavaAkkaを使って非同期メッセージパッシングを使ったシステムを開発していたりします。Akkaは本当によくできており手軽に使う事ができるのでおすすめです。
ただ、今回はGoのアドベントカレンダーということで、GoでAkkaっぽいことができないか探していたところ、Proto.Actorというライブラリを見つけたので、簡単に入門をやってみました。

Proto.Actor

github.com

Akka.Netの開発メンバだった方が、開発しているということで使い方はかなり似ています。 また、言語はGoの他にC#版もあるみたいです。
(GoのチャネルとActorについての違いについてはここに詳しく書いてあります。)

Hallo Proto.Actor

まずはチュートリアルから。
環境構築などは、gitに書いてあるのでそちらを参照してください。

type Hello struct{ Who string }
type HelloActor struct{}

func (state *HelloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case Hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    context := actor.EmptyRootContext
    props := actor.PropsFromProducer(func() actor.Actor { return &HelloActor{} })
    pid, err := context.Spawn(props)
    if err != nil {
        panic(err)
    }
    context.Send(pid, Hello{Who: "Roger"})
    console.ReadLine()
}

実行すると、こんな感じでHelloActorに対して、Rogerというメッセージが送信されていることがわかります。

yanom@DESKTOP-0VPGPUH:~/go/src/proto-actor$ ./proto-actor 
Hello Roger

backpressure

Actorモデルでは各Actorがmailboxを持っており、Actorはそこからひとつずつメッセージを取り出して処理を実行しています。
Proto.Actorではそのmailboxに振る舞いを簡単に定義できるようになっています。
これを使ってバックプレッシャーを実現することが可能となっています。

package main

import (
    "log"
    "sync/atomic"
    "time"

    console "github.com/AsynkronIT/goconsole"
    "github.com/AsynkronIT/protoactor-go/actor"
    "github.com/AsynkronIT/protoactor-go/mailbox"
)

// sent to producer to request more work
type requestMoreWork struct {
    items int
}
type requestWorkBehavior struct {
    tokens   int64
    producer *actor.PID
}

func (m *requestWorkBehavior) MailboxStarted() {
    log.Println("mailboxstarted...")
    m.requestMore()
}
func (m *requestWorkBehavior) MessagePosted(msg interface{}) {
    log.Println("posted...")
}
func (m *requestWorkBehavior) MessageReceived(msg interface{}) {
    log.Println("messagereceived...")
    atomic.AddInt64(&m.tokens, -1)
    if m.tokens == 0 {
        m.requestMore()
    }
}
func (m *requestWorkBehavior) MailboxEmpty() {
    log.Println("empty...")
}

func (m *requestWorkBehavior) requestMore() {
    log.Println("Requesting more tokens")
    m.tokens = 5
    system.Root.Send(m.producer, &requestMoreWork{items: 5})
}

type producer struct {
    requestedWork int
    producedWork  int
    worker        *actor.PID
}

func (p *producer) Receive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case *actor.Started:
        log.Println("started...")
        // spawn our worker
        workerProps := actor.PropsFromProducer(func() actor.Actor {
            return &worker{}
        })
        mb := mailbox.Unbounded(&requestWorkBehavior{
            producer: ctx.Self(),
        })
        p.worker = ctx.Spawn(workerProps.WithMailbox(mb))
    case *requestMoreWork:
        p.requestedWork += msg.items
        log.Println("Producer got a new work request")
        ctx.Send(ctx.Self(), &produce{})
    case *produce:
        // produce more work
        log.Println("Producer is producing work")
        p.producedWork++
        ctx.Send(p.worker, &work{p.producedWork})

        // decrease our workload and tell ourselves to produce more work
        if p.requestedWork > 0 {
            log.Println("send produce")
         
            p.requestedWork--
            ctx.Send(ctx.Self(), &produce{})
        }
    }
}

type produce struct{}
type worker struct{}

func (w *worker) Receive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case *work:
        log.Printf("Worker is working %+v", msg)
        time.Sleep(100 * time.Millisecond)
    }
}

type work struct {
    id int
}

var system = actor.NewActorSystem()

func main() {
    producerProps := actor.PropsFromProducer(func() actor.Actor { return &producer{} })
    system.Root.Spawn(producerProps)
    _, _ = console.ReadLine()

}

実行してみるとわかりますが、workerが一定数処理を終わるまで、producerは次の処理を投げないように制御できていることが分かります。

他にも色々サンプルが揃っているので興味がある方はぜひ覗いてみてください。

まとめ

非同期メッセージパッシングが簡単にできるようになるProto.Actorを紹介しました。 Go版はまだベータ版ということで変わる可能性がありますが、その手軽さはAkka以上だと感じました。
また、実際の開発では考慮が必要になるであろう、バックプレッシャーについても手軽に実現できるようになっているので、そこも素晴らしいと思います。今後も色々遊んでみたいと思います。
あと、久しぶりにGoを書きましたが、やっぱり最高です!