Richard Imaoka's Blog

2017年より職業Scalaプログラマになった、リチャード・伊真岡のブログです。

builderscon 低温調理肉会 2017秋 @ 新宿御苑に行ってきた

buildersconスタッフのリチャード伊間岡です。lestrrat氏主催の肉会に行ってきました。

peatix.com

いきなりの写真ですけどこれ。これ、前菜ですからね!

f:id:richard-imaoka:20171118171222p:plain

(赤いけどちゃんと火のとおった肉です)

上記のリンクにもあるように

本会の目的は「主宰の牧が大量に肉を調理したい欲求を叶えること」です。

主宰氏が用意した大量の肉をbuildersconに関わる人々(主にスタッフなど)といっしょに地下の食堂でワイワイしながら食べようという企画がこの肉会です。

ちなみに低温調理と言われても聞いたことない人にはなんのことだかわからないですよね。要はこういうことです。anovaという調理機器を使っていい感じに肉を仕上げようという企みです。

medium.com

このブログに並ぶ肉料理記事の数々を見て主宰氏の本気度がお分かりになるのではないかと思います。

とにかく肉を食っただけの今日の会なんですけど、とにかく旨かったです。

そして旨い肉をパンに挟んで食べるのもそれはそれは幸せな体験でした。

f:id:richard-imaoka:20171118173250p:plain

(贅沢サンド)

つまり何が言いたいかというと「buildersconに関わってるとこんなに美味しい思いをできるんですよ!!!」主宰が食の求道者だから。

f:id:richard-imaoka:20171118173514p:plain 基本肉はでかい、かなりでかい。「厚さは肉の量の問題だけじゃない。完璧な肉の外部と内部の焼き加減のバランスを得るためにはそれなりの厚さが必要だ。」 ( - iOSDC Japan 2017 にてlestrrat氏のトークより"Serious Eats"の引用部分)

f:id:richard-imaoka:20171118173637p:plain

なんかバナナを入れたホットワインとかオシャレ(?)な物が出てくるなど。

食うのに夢中になってたらこれ以外写真取ってなかった…!

とにかく大満足な会でした。これだけでもbuildersconに参加してよかったと思いますし、むしろいい肉がでな いならbuilderaconじゃないですし、私はそこに肉をもたらす会がある限りbuildersconには参加し続けようと思う次第です。私は食の奴隷。

lestrratさんありがとねー。

builderscon tokyo 2017動画は外注でなく我々スタッフが撮影しました! 振り返りと次回への野望

みなさま、builderscon tokyo 2017 - Aug 3, 4, 5 2017は楽しんでいただけましたでしょうか?

楽しんでいただけた方も、または残念ながらご参加いただけなかった方も、こちらに動画をアップロードしています。 ぜひぜひ楽しんでください。

www.youtube.com

(2017/08/12現在、まだ動画の編集及び確認作業中で、すべては公開されていません。)

ちなみに私、今回は主に動画撮影・編集担当としてはたらいておりました。 そう、↑で公開している動画は、我々スタッフで撮影して編集しているのです。外部業者に依頼はしていません。 手作り感あふれる趣があるのはそのためですね。

buildersconでは動画においてもbuilderの精神で、自分たちでやっちゃうのです。 自分たちでやると工夫のし甲斐があってノウハウがたまる面もあるのでおもしろいですね。 今回もいろいろ勉強させていただき、私の動画力が格段に上がりました。 次回以降の開催に活かしたいことがたくさんありましたし、次回以降に試したいこともたくさんできました。

ただ、とんでもない金額のお金が転がり込んできたら外注して超絶美麗な動画を作りますので、膨大な資金をbuildersconに注ぎ込んでくださる方はいつでも猛烈に大歓迎です。

f:id:richard-imaoka:20170804234807p:plain

もしそうなったらCG入れたりとか、火花飛ばしたりとかしますよ。 あと、映りこむスタッフの顔をイケメンに差し替えたり。

まあ実際のところ、今後もbuildersconでは動画を自家製でやっていく可能性が高いので、 もしビデオの撮り方を覚えたいという人は是非次回以降の動画作成を手伝ってみてはいかがですか? 一緒にユーチューバーの夢を見ましょう(違)

個人的には次回以降に映像のクオリティを上げたい野望はあります。 そのためには圧倒的な手間と、それからややお金が掛かるので実現はむずかしいんですけどね。 あと、動画の超早出し公開のワークフロー作成に燃え上がれる人とか。 これも手間がすごいので今はできる目途がありません。 でも、やりたいという情熱がある人が来れば、カンファレンスを利用して、動画作成の技を覚えるチャンス。

それからさらに個人的にカンファレンス動画以外もつくりたいです。 ウェブサイトトップページに載せるムダにカッコいいインタビューとか。 あと、ドキュメンタリー映像的なのも作りたいですね笑「builderの夜明け」みたいな(おや、そのタイトル…?)

動画で何かやらかしたい野望がある人、buildersconではそんな情熱を待っています。 誰も手を上げなければ私がやらかすだけなので、もったいないですよ笑。

とにかく、撮影担当は(失敗を考えて胃が痛かったけど笑)楽しかったです。エンジニアが手を広げる趣味としてはよい候補ではないでしょうかね?

Typesafe Akka Remote Sampleの図解 - 2/2 LookupApplication編

TypeSafeのAkka Remote Samples with Scalaに含まれる2つ目のサンプルアプリケーション

前回の記事に引き続き、何かとわかりにくいTypeSafe社の@TypeSafeのAkka Remoteのサンプルについて、図解していきたいと思います。

サンプルの中には2つのアプリケーションが含まれていて、この記事はその2つ目、LookupApplicationについてです

メッセージの型としてのcase class MathOpは前回の記事でも解説した通りです

f:id:richard-imaoka:20151105024105p:plain

// sample/remote/calculator/MathOp.scala
trait MathOp
final case class Add(nbr1: Int, nbr2: Int) extends MathOp
final case class Subtract(nbr1: Int, nbr2: Int) extends MathOp
final case class Multiply(nbr1: Int, nbr2: Int) extends MathOp
final case class Divide(nbr1: Double, nbr2: Int) extends MathOp

Akkaでよく使われるcase classをメッセージの型として使う方法です。足し、引き、掛け、割り算に相当する以上の4つが定義されています。

MathResultも同様に前回の記事の通りです

f:id:richard-imaoka:20151105024107p:plain

// sample/remote/calculator/MathOp.scala
trait MathResult
final case class AddResult(nbr: Int, nbr2: Int, result: Int) extends MathResult
final case class SubtractResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult
final case class MultiplicationResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult
final case class DivisionResult(nbr1: Double, nbr2: Int, result: Double) extends MathResult

それぞれに対する結果型も用意されています。以下で見るようにActorはこれらの型のメッセージをやり取りして、計算の入力と結果を受け渡します。

CalculatorActorは計算入力を受け取って結果を返す、LookupActorはRemoteで生成されたCalculatorActorを探して、監視したうえで、計算を行わせます

f:id:richard-imaoka:20151105024114p:plain

// sample/remote/calculator/calculatorActor.scala
class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

こちらも前回の記事で解説した通りです。

例えばMultiply型のメッセージを受け取ったときは、その結果であるMultiplicationResult型のメッセージを送信元"sender"に投げ返します。

LookupActorは(LookupActorから見て)RemoteにあるCalculatorActorを探しに行きます

f:id:richard-imaoka:20151106020837p:plain

今まで出てきたActorに比べてLookupActorはやや複雑です

f:id:richard-imaoka:20151106020802p:plain

//sample/remote/calculator/LookupActor.scala
class LookupActor(path: String) extends Actor { ... }

まず、上記のConstructionの部分を見ましょう。path変数には

  • path = "akka.tcp://CalculatorSystem@127.0.0.1:2552/user/calculator"

が入ってきます。

LookupActorは最初に呼び出される関数sendIdentifyRequest()はActorSelectionに対して、Identifyメッセージを送っています

f:id:richard-imaoka:20151106020816p:plain

Identifyについては後述しますので、まずはActorSelectionについて。

ActorSelectionは、上記の"akka.tcp://..."のようなパス(URL)に対して!メソッドでメッセージを送ることができます。

f:id:richard-imaoka:20151106020826p:plain

つまり、Akkaでは

  • ActorRef
  • ActorSelection

の2つに対して!メソッドでメッセージが送れることになります。

Akkaに備わっているのIdentify, ActorIdentityメッセージ型は、ActorSelection宛てにメッセージを送ったときにActorRefを得ることができます

f:id:richard-imaoka:20151106020843p:plain

次に、Identify, ActorIdentityは、Akkaに備わっているメッセージ型です。

AkkaのActorはIdentifyを受け取ると、ActorIdentityをsender()に返します。その際、ActorIdentityはActorRefを第2引数にもっています。

//akka.actor.ActorIdentity
 case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) 

LookupActorではその第2引数として返ってきたActorRefを使って、context.watch()しています。

f:id:richard-imaoka:20151106023558p:plain

context.watch()すると、別のActorを監視することができる、すなわち監視対象のActorがStopすると、Terminatedメッセージを受け取ることになります。

f:id:richard-imaoka:20151106020918p:plain

    case Terminated(`actor`) =>
      println("Calculator terminated")
      sendIdentifyRequest()
      context.become(identifying)

LookupActorではこのあともう一度sendIdentifyRequest()を呼んでいるので、TerminatedになったCalculatorActorの代わりのCalculatorActor(別インスタンス)が同じパス

  • "akka.tcp://CalculatorSystem@127.0.0.1:2552/user/calculator"

上にあれば、再びCalculatorActorを監視に入れることになります。

context.become(active(actor))によって、receiveメソッドの実装はactiveメソッドに切り替わります

f:id:richard-imaoka:20151106020857p:plain

無事CalculatorActorの監視に成功したら、次は

      context.become(active(actor))

によって、receiveメソッドの動作をactive()メソッドに入れ替えます。

  def active(actor: ActorRef): Actor.Receive = {
    case op: MathOp => actor ! op
    case result: MathResult => result match {
      case AddResult(n1, n2, r) =>
        printf("Add result: %d + %d = %d\n", n1, n2, r)
      case SubtractResult(n1, n2, r) =>
        printf("Sub result: %d - %d = %d\n", n1, n2, r)
    }
    case Terminated(`actor`) =>
      println("Calculator terminated")
      sendIdentifyRequest()
      context.become(identifying)
    case ReceiveTimeout =>
    // ignore

これは、

  • MathOpを受け取ればCalculatorActor (actor) に転送
  • MatuResultを受け取ればprintf表示
  • Terminatedを(優位つの監視対象である)CalculatorActorから受け取れば、もう一度でsendIdentifyRequest()監視

f:id:richard-imaoka:20151106023558p:plain

という動作をします。

LokupApplication

最後にアプリケーションの説明です。これも前回の記事同様、main関数はややこしいのですが…、とにかくstartRemoteCalculatorSystem()とstartRemoteLookupSystem()という二つの関数を走らせるだけです。

コマンドライン引数」のCalculatorとLookupを渡すと、2つの関数をの別のプロセスで走らせることができます。

sbt "runMain sample.remote.calculator.CreationApplication Calculator"
sbt "runMain sample.remote.calculator.CreationApplication Lookup"

args.isEmpty、すなわちコマンドライン引数を渡さないと、一つのプロセスの中で2つの関数を走らせます。s

object LookupApplication {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty || args.head == "Calculator")
      startRemoteCalculatorSystem()
    if (args.isEmpty || args.head == "Lookup")
      startRemoteLookupSystem()
  }

  def startRemoteCalculatorSystem(): Unit = {
    ...
  }

  def startRemoteLookupSystem(): Unit = {
    ...
  }
}

startRemoteCalculatorSystem()はActorSystemを初期化したうえで、そこからCalculatorActorも生成してしまいます。

これにより、CalculatorActorは後述のLookupActorから見てRemoteになります。

startRemoteLookupSystem()は,CalculatorActorのパス

  • "akka.tcp://CalculatorSystem@127.0.0.1:2552/user/calculator"

をLookupActorに渡して、後はLookupActor経由でどんどんAddとSubtractメッセージを投げ続けるだけです。

Typesafe Akka Remote Sampleの図解 - 1/2 CreationAppliation編

TypeSafeのAkka Remoteのサンプルがなんともわかりにくい気がするので、ここで図解してみることにしてみました。

TypeSafeのAkka Remote Samples with Scalaには2つのサンプルアプリケーションが含まれている

一つはCreationApplication, もう一つはLookupApplicationです。これらはコードに共通部分もありますが、別々のアプリケーションです

この記事ではCreationAppliationのみを解説します。LookupApplicationについては別記事にしようと思います。

メッセージの型としてのcase class

Akkaでよく使われるcase classをメッセージの型として使う方法です。足し、引き、掛け、割り算に相当する以下の4つが定義されています。

// sample/remote/calculator/MathOp.scala
trait MathOp
final case class Add(nbr1: Int, nbr2: Int) extends MathOp
final case class Subtract(nbr1: Int, nbr2: Int) extends MathOp
final case class Multiply(nbr1: Int, nbr2: Int) extends MathOp
final case class Divide(nbr1: Double, nbr2: Int) extends MathOp

f:id:richard-imaoka:20151105024105p:plain

それぞれに対する結果型も用意されています。以下で見るようにActorはこれらの型のメッセージをやり取りして、計算の入力と結果を受け渡します。

// sample/remote/calculator/MathOp.scala
trait MathResult
final case class AddResult(nbr: Int, nbr2: Int, result: Int) extends MathResult
final case class SubtractResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult
final case class MultiplicationResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult
final case class DivisionResult(nbr1: Double, nbr2: Int, result: Double) extends MathResult

f:id:richard-imaoka:20151105024107p:plain

CalculatorActorは計算入力を受け取って結果を返す、CreationActorはCalculatorActorを生成して、計算を行わせる

CalculatorActor

CalculatorActorの実装は以下の通りです。

// sample/remote/calculator/calculatorActor.scala
class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

例えばMultiply型のメッセージを受け取ったときは、その結果であるMultiplicationResult型のメッセージを送信元"sender"に投げ返します。

f:id:richard-imaoka:20151105024114p:plain

CreationActor

CreationActorの方は、

// sample/remote/calculator/CreationActor .scala
class CreationActor extends Actor {
  def receive = {
    case op: MathOp =>
      val calculator = context.actorOf(Props[CalculatorActor])
      calculator ! op
    case result: MathResult => result match {
      case MultiplicationResult(n1, n2, r) =>
        printf("Mul result: %d * %d = %d\n", n1, n2, r)
        context.stop(sender())
      case DivisionResult(n1, n2, r) =>
        printf("Div result: %.0f / %d = %.2f\n", n1, n2, r)
        context.stop(sender())
    }
  }
}

計算の入力(MathOp型のメッセージ)を受け取ると、CalculatorActorを生成します。

     val calculator = context.actorOf(Props[CalculatorActor])

f:id:richard-imaoka:20151105024118p:plain

そしてその生成したCalculatorActorに計算入力(MathOp型のメッセージ)を投げて

      calculator ! op

MathResult型のメッセージを受け取ります。

  def receive = {
  ...
    case result: MathResult => result match {
      case MultiplicationResult(n1, n2, r) =>
        ...
      case DivisionResult(n1, n2, r) =>
        ...
    }

f:id:richard-imaoka:20151105024121p:plain

以上の手順が終わったら、生成したCalculatorActorを以下のコードによって停止します。sender()となっていますが、これはMathResultのsenderなので、すなわちCalculatorActorです。

        context.stop(sender())

f:id:richard-imaoka:20151105024124p:plain

CreateApplication

最後にアプリケーションの説明です。なんだかこのmain関数はややこしいのですが…、とにかくstartRemoteWorkerSystem()とstartRemoteCreationSystem()という二つの関数を走らせるだけです。

コマンドライン引数」のCalculatorWorkerとCreationを渡すと、2つの関数をの別のプロセスで走らせることができます。

sbt "runMain sample.remote.calculator.CreationApplication CalculatorWorker"
sbt "runMain sample.remote.calculator.CreationApplication Creation"

args.isEmpty、すなわちコマンドライン引数を渡さないと、一つのプロセスの中で2つの関数を走らせます。s

object CreationApplication {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty || args.head == "CalculatorWorker")
      startRemoteWorkerSystem()
    if (args.isEmpty || args.head == "Creation")
      startRemoteCreationSystem()
  }

  def startRemoteWorkerSystem(): Unit = {
    ...
  }

  def startRemoteCreationSystem(): Unit = {
    ...
  }
}

startRemoteWorkerSystem()はActorSystemを初期化するだけで、それ自体は何もしません。

startRemoteCreationSystem()はもう一つの関数startRemoteWorkerSystem()ないで作られたActorSystem("CalculatorSystem")のしたにCreationActorを生成します。

f:id:richard-imaoka:20151105024126p:plain

これはAkka Remotingで説明されているRemote Creationになり、remotecreation.confに以下を指定することによって実現しています。

//remotecreation.conf
akka {
  actor {
    deployment {
      "/creationActor/*" {
        remote = "akka.tcp://CalculatorWorkerSystem@127.0.0.1:2552"
      }
    }
  }
  ...
}

そしてあとはAkkaのスケジューラを使って、MultiplyとDivideメッセージを送り続け、CreationActorはCalculationActorを逐一生成、停止して計算を行っていきます。 f:id:richard-imaoka:20151105024129p:plain

Akka Internals (Akkaの内部動作を知る) Remoteでメッセージを送る場合の ! メソッドの動作

今回の記事のポイント

Remote Actorを使うときにAkkaの ! メソッドの内部動作は変わり、

Remote Actorにメッセージを送るときは、数段階の「内部」Actorを経由して送られる

ということを確認していきたいと思います。

前回の記事

richard-imaoka.hatenablog.com

概要の説明

まず、送信元のActor Aからあて先のActor Bには、Local Actorの時と同じように、 ! メソッドを使って送ります。

//Code within actorA
    actorB ! "Some Message"

f:id:richard-imaoka:20151104010843p:plain

ただし、実際には以下の3つの「内部」Actorを経由します。

f:id:richard-imaoka:20151104010848p:plain

RemoteActorRefの実装によって、Remoteのメッセージ送信の動作がLocalの時とは変わっている

この違いは、Remote Actorに送るときは、メッセージの宛先のActorRefが、RemoteActorRefになっているためで、その! メソッドは以下のようになっています。

//class RemoteActorRef in akka/remote/RemoteActorRefProvider.scala
  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
    ...
    try remote.send(message, Option(sender), this) catch handleException
  }

このメソッドは以下のsendメソッドを呼び出しますが、sendメソッドメッセージの元々のあて先(actorB)をEndpointManager型のActorに差し替えます

//class Remoting (extends RemoteTranspor) in akka/remote/Remoting.scala
  override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
    case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
    ...
  }

これは、Sendというcase classに元々の宛先を保存し、manager変数(EndpointManager型のActor)のtellメソッドを呼ぶことで実現しています。

f:id:richard-imaoka:20151104010916p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

Sendというcase classに内包されたメッセージは、さらに(前回の記事で見たように)Envelopeというcase classに内包されて、EndPointManager actorのMailboxに届けられます。

f:id:richard-imaoka:20151104010935p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

そこから、ReliableDeliverySupervisorというActorを経由して、EndPointWriterというActorに届けられます。

f:id:richard-imaoka:20151104010938p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

このEndPointWriterが実際に元々の宛先であったRemote ActorのActorBにメッセージを送ります。

Sendというcase classに元々の宛先を保存しているので、複数のActorを経由しても最終的にActorBにメッセージを送ることができます。

EndPointWriterの動作

このEndPointWriterの実装はやや複雑なのですが、簡単に言うと

  • メッセージを逐一送っていくのではなく、一旦バッファにためて後で一気に送る

このEndPointWriteの実装や「どれくらいの頻度でバッファを全部クリアに一気にメッセージを送るか」のチューニング、というのはメッセージングのパフォーマンスを左右する重要なところなので、いつか記事を書ければと思います。

さて、このEndPointWriterというActorの実装を見ていくと、メッセージをためるバッファはJavaの標準クラスであるLinkedListを使っています。

//class EndpointReader in akka.remote.Endpoint.scala
  val buffer = new java.util.LinkedList[AnyRef]

このバッファの実装はAkkaのパフォーマンスを大きく左右すると考えられるので、おそらくLinkedListは十分なパフォーマンスを持っているということなのでしょう。

次に以下のメソッドですが

//class EndpointReader in akka.remote.Endpoint.scala
  def sendBufferedMessages(): Unit = {
    ...
    val ok = writePrioLoop() && writeLoop(SendBufferBatchSize)
    ...
  }

このsendBufferedMessages()メソッドが呼ばれると、メッセージをためたバッファがクリアされ、メッセージがRemote Actor宛てに一気に送信されることになります。

EndPointWriterのなかではsendBufferedMessages()が繰り返し呼ばれて(EndPointWriter自身が呼び出しを決められた秒数毎にスケジュールしている)いることがわかります。

sendBufferedMessages()内で呼び出されるwritePrioLoop() も writeLoop(SendBufferBatchSize) も、バッファ内部の個別のメッセージを送る際には以下のwriteSend()メソッドを呼び出していて、

//class EndpointReader in akka.remote.Endpoint.scala
  def writeSend(s: Send): Boolean = try {
    handle match {
      case Some(h) ⇒
        ...
        val pdu = codec.constructMessage( ... )
        ...
          val ok = h.write(pdu)
        ...
  }

このhandleというのは

handle = Option[AkkaProtocolHandle]

となっていて、

          val ok = h.write(pdu)

をよびだすと、AkkaProtocolHandle traitのインスタンス(デフォルトではtcp向けの実装)を使って、実装されたプロトコルでメッセージを送ります。

AkkaProtocolHandle についても、いつか別の記事を書こうと思います。

Akka Internal - (Akkaの内部動作を知る) Actorのreceiveメソッドはどのように呼ばれるか?

Akkaの!メソッドでメッセージを「受け取る」側の動作はどうなっているのか?

前回の記事ではAkkaの!メソッドでメッセージを「送る側」の動作が完全なNon-Blockingであることを確認しました。

richard-imaoka.hatenablog.com

今回はメッセージを受け取る側、すなわちActorのreceiveメソッドがどのように呼ばれるのかを見ていきます。

  def receive = {
    ...
  }

Actorのreceiveメソッド内にブレークポイントを置いてデバッグすると、一番最初にJavaのForkJoinWorkerThreadが動いていることがわかる

Actorのreceiveメソッドはメッセージを送る側(この記事の場合mainスレッド)とは別のスレッドで動きます。その際にForkJoinWorkerThreadを使っているのですね。

f:id:richard-imaoka:20151027054815p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

//scala.concurrent.forkjoin.ForkJoinWorkerThread
    public void run() {
        ...
            this.pool.runWorker(this.workQueue);
   }

さらにここから、MailboxクラスのprocessMailbox()というメソッドが呼ばれているのがわかります。前回の記事で見たように、MailboxはMessaging Queueを持っていて、これがNon-Blocking (lock-free) queueになっています。

//akka.dispatch.Mailbox
  override final def run(): Unit = {
    try {
      ...
        processMailbox() //Then deal with messages
      ...
    }
  }

というわけで、このMessaging QueueはmainスレッドからもActorが動いている別スレッドからもアクセスされる、スレッド間共有オブジェクトです。

processMailbox()メソッドの中身は以下のようになっています。"next"というのはここではMessaging Queueから取り出したメッセージになっていて、actor (ActorCell)のinvokeメソッドはActorのreceiveメソッドを呼び出すようになっています。

//akka.dispatch.Maibox

  @tailrec private final def processMailbox(
    ...
    if (shouldProcessMessage) {
      val next = dequeue()
      ...
        actor invoke next
      ...
      }
    }

f:id:richard-imaoka:20151027054834p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

Akka Internals (Akkaの内部動作を知る) メッセージを送る ! メソッド

今回の記事のポイント

Akkaで ! メソッドの呼び出しを行うとき、

    actor ! "Message from main()"

実際のAkka内部のコードから

ことを確認したいと思います。これは

  • メッセージのやり取りにNon-Blocking(lock-free) Queueを使う

ことで実現されています。

概要の説明

前回の記事のコードの一部を抜き出して…

richard-imaoka.hatenablog.com

この2行は、

    val system = ActorSystem("ActorDebuggerSystem")
    val actor  = system.actorOf(Props[LoggingActor])

イメージで表すとこのようになります。

f:id:richard-imaoka:20151011184430p:plain

Actorの生成は実際には別スレッドでおこなわれ、メインスレッド側では別スレッドでのActor生成が完了したかどうか気にする必要なくメッセージを送れます

Actor宛て(正確にはActorRef)には!メソッドを使ってメッセージを送ることができます。

    actor ! "Message from main()"

f:id:richard-imaoka:20151011214317p:plain

詳細

詳細 メッセージを送るまで

Actor(Ref)の!メソッドはCell traitのメソッドを呼び出します。

//trait Cell in akka/actor/dungeon/ActorCell.scala
  final def sendMessage(message: Any, sender: ActorRef): Unit
    = sendMessage(Envelope(message, sender, system))

CellはActorの内部で使われているtraitで、普段は気にする必要はありません。

messageは"Message from main()"というString型の文字列でしたが、それをEnvelopeというクラスにラップしています。

Envelopeはmessage, sender, systemという3つのメンバを持っているだけの単純なクラスです。

f:id:richard-imaoka:20151011213938p:plain

そしてCellのsendMessage()メソッドは…

//trait Dispatch in akka/actor/Dispatch.scala
  def sendMessage(msg: Envelope): Unit =
    try {
      ...
      dispatcher.dispatch(this, msg)
    } 

このDispatcherというのが何かを説明するのは難しいのですが…

ここではAkkaにおいて、「メッセージを送り出すもの」くらいの意味で捉えておきましょう。

Space Alcでdispatchを検索すると"〔書類・荷物などを〕送る、送り出す、発送する、急送する"という意味があることがわかります。

f:id:richard-imaoka:20151011214842p:plain

dispatcher.dispatch()メソッド

上の図の水色で囲った部分、dispatcher.dispatch()メソッドの中身を見ていきましょう。

//class Dispatcher in akka/dispatch/Dispatcher.scala
  protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
    val mbox = receiver.mailbox
    mbox.enqueue(receiver.self, invocation)
    registerForExecution(mbox, true, false)
  }

まずreceiver: ActorCellのMailboxを取得して

    val mbox = receiver.mailbox

f:id:richard-imaoka:20151011214916p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

そのMailboxにenqueue()メソッドを使ってメッセージ(ここではinvocationという名前に変わっています)を送っています

    mbox.enqueue(receiver.self, invocation)

f:id:richard-imaoka:20151011215122p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

enqueue()メソッドの中身はこうです。

//class Mailbox in akka/dispatch/Mailbox.scala
  def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)

実はこのMailboxにはMessageQueueというものが付随していて、そのMessageQueueは以下のように宣言されています。

//object UnboundedMailbox in in akka/dispatch/Mailbox.scala
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { ... }

ConcurrentLinkedQueueというのはJavaの標準APIから来ていて、Oracleドキュメンテーションを見ると…

ConcurrentLinkedQueue

この実装では、効率のよい非ブロックのアルゴリズムが使用されます。このアルゴリズムは次の資料で記述されているものに基づきます「Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms」Maged M. Michael、Michael L. Scott

すなわち、Non-Blockingなキューであることがわかります。ここまでの時点まではAkkaの!メソッドはNon-Blockingであることが確認できました!

残りの部分もNon-Blockingであることを確認していきましょう。

registerForExecution() - メッセージを送った後、Mailboxを別スレッドの実行用にマークしておく

さきほどのDispatcherのdispatch()メソッドに戻るとmbox.enqueue()の呼び出しが終了すると次に、

    registerForExecution(mbox, true, false)

が呼び出されています。この中身は…

//class Dispatcher in akka/dispatch/Dispatcher.scala
 protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
    ...
    if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { 
      if (mbox.setAsScheduled()) {
        ...
          executorService execute mbox
        ...
      } ...
    } ...
  }

と、Mailboxがmbox.setAsScheduled()によって(別スレッドでの)実行用にマークされていることがわかります。

executorService execute mboxは実際にexecuteをその場で行うのでなく…

//java/util/concurrent/Executor.java
public interface Executor {
    /**
     * Executes the given command at some time in the future.  ...
     */
    void execute(Runnable command);
}

とあるように、将来のどこかの時点で別スレッドで実行を行うメソッドです。

f:id:richard-imaoka:20151011184600p:plain

Image courtesy of digitalart at FreeDigitalPhotos.net

まとめ

以上で見てきたように

    actor ! "Message from main()"

は以下のようにmainスレッド内でメッセージを

  • MailBoxのMessagingQueue(Non-Blocking Queue)に入れ

  • ExecutorServiceを使って(Mailboxごと)将来のexcecuteをスケジュール(すぐには実行しない)

することにより、完全にNon-Blockingになっていることがわかります。