読者です 読者をやめる 読者になる 読者になる

seratch's weblog in Japanese

About Scala, Java and Ruby programming in Japaense. If you need English information, go to http://blog.seratch.net/

Scala の Actor 入門

Scala

Scalaのバージョン

この記事が対象とする Scala のバージョンは「2.9.1.final」です。

Actor(Actor)とは?

概念としての Actor は「メッセージを受信し、そのメッセージに基づいて行動するオブジェクト」のことで、メッセージの到着順序は重要ではないですが Scala では到着順にメールボックスと呼ばれるメッセージキューに入れて処理します。

メールボックス(mailbox)

各 Actor はそれぞれにメールボックスと呼ばれるメッセージを格納するためのキューを持っています*1。Actor はここからメッセージを順番にデキューします。

はじめての Actor

Actor は scala.actors.Actor というトレイトを継承したオブジェクトです。メッセージを受信するために act メソッドを実装します。

呼び出し側は「!」メソッドを呼び出してその Actor に対してメッセージを送信します。

送信されたメッセージはメールボックスに入り、順に react/receive メソッドに渡されます。

import actors.Actor
class MyActor extends Actor {
  def act {
    loop {
      react {
        case msg => { 
          Thread.sleep(500L) // Actor 全体をブロックしています(後述)
          println(msg)
        }
      }
    }
  }
}
val myActor = new MyActor
myActor.start()
myActor ! "1"
myActor ! 2
myActor ! 3L
println("no waiting!") // "no waiting!" "1" "2" "3"

Thread.sleep(3000L)

Actor を extends するのではなく、以下のようにして Actor.actor で Actor をつくることもできます。

import actors.Actor._
// val myActor = scala.actors.Actor.actor {
val myActor = actor {
  loop {
    react {
      case msg => { 
        Thread.sleep(500L) // Actor全体をブロックしています(後述)
        println(msg)
      }
    }
  }
}
myActor.start()

Actor は class ではなく Singleton オブジェクトでも OK です。

import actors.Actor
object SingletonActor extends Actor {
  start
  def act = {
    loop {
      react {
        case msg => println(msg)
      }
    }
  }
}
SingletonActor ! "test" // "test"
Thread.sleep(2000L)

同期で結果を受け取る

メッセージ送信時に結果を戻り値として受け取るには「!?」か「!!」を使います。

「!?」はメッセージ送信時、「!!」は結果取得時に同期処理となります。

呼び出される側は「sender ! message」か「reply(message)」で結果を返す実装である必要があります。

そうでない場合、「!?」「!!」の呼び出し側は無言で応答を待ち続けます。

import actors.Actor
class MyActor extends Actor {
  def act {
    loop {
      react {
        case msg => sender ! msg.toString + msg.toString
        // case msg => reply(msg.toString + msg.toString)
      }
    }
  }
}
val myActor = new MyActor
myActor.start()

// 送信元がActorでないので「sender ! message」によるメッセージ送信は破棄されます
myActor ! "hoge" // 非同期

// 同期で結果を受け取ります
val s = myActor !? "hoge"  // 同期
println(s)

// Futureで結果を受け取ります、f()呼び出し時は同期処理になります
val f = myActor !! "hoge" // 非同期
println(f()) // 同期

メッセージの処理中にブロックしてはいけない

Actor では、メッセージの処理中にスレッドをブロックしないよう実装に注意する必要があります。

単に react の中で Thread.sleep(1000L) とやると、その Actor のスレッドを1000ミリ秒ブロックしてしまいます。

例えば、以下のようにスリープ処理を別の Actor にする事でブロックを避ける事ができます。

import actors.Actor
import actors.Actor._
case class Input(val msg: String, val millis: Long)
class NonBlockingSleepActor extends Actor {
  case class Later(val msg: String)
  /**
   * ブロックを避けるスリープ処理(スリープ部分を別のActorにする)
   */
  def later(millis: Long, msg: Any) = {
    val main = self
    actor {
      // 別のActorでスリープした後、呼び出し元のActorにメッセージ送信
      Thread.sleep(millis)
      main ! Later(msg.toString)
    }
  }
  def act() = {
    loop {
      react {
        case Input(msg, millis) => 
          later(millis, msg) // 外部からのメッセージ
        case Later(msg) => 
          println(msg) // laterからのメッセージ(スリープ完了後の処理)
        case _ => throw new RuntimeException
      }
    }
  }
}
val nba = new NonBlockingSleepActor
nba.start
nba ! Input("test", 2000L) 
nba ! Input("hoge", 0L) 
Thread.sleep(3000L)
// すぐに"hoge"
// 2秒後に"test"

次節の例ではタイムアウトを使ったブロックしないスリープ処理用の Actor をつくっています*2

Actor 同士の非同期通信

Actor 同士のやり取りであれば、互いに「!」でメッセージを送り合う事で非同期に結果を受け取る事ができます。

import actors.{Actor, TIMEOUT}
case class Request(val msg: String, val interval: Long)
class UnknownMessageException extends RuntimeException
/**
 * ブロックしないスリープ処理(スリープ部分を別のActorとして定義)
 */
class Sleep(val millis: Long, code: => Unit) extends Actor {
  def act() = {
    reactWithin(millis) {
      case TIMEOUT => 
        code // reactWithinのタイムアウト後に名前渡しで渡された処理が実行されます
    }
  }
}
/**
 * サーバ
 */
class Server extends Actor {
  // 別Actorに名前渡しでスリープ後に行う処理を渡す
  def sleep(millis: Long)(code: => Unit): Unit = new Sleep(millis, code).start
  def act() = {
    loop {
      react {
        case req: Request => {
          // ここをThread.sleep(req.interval)にするとスレッドをブロックしてしまい
          // "hoge"のメッセージ送信後まで"test"の結果送信が待たされます
          sleep(req.interval) {
            sender ! req.msg.toString + req.msg.toString
          }
        }
        case _ => throw new UnknownMessageException
      }
    }
  }
}
/**
 * クライアント
 */
class Client(val server: Server) extends Actor {
  def act() = {
    loop {
      react {
        case req: Request => server ! req
        case response: String => println("returned : " + response)
        case _ => throw new UnknownMessageException
      }
    }
  }
}

val server = new Server
server.start()

val client = new Client(server)
client.start() 
client ! Request("hoge", 1000L)
client ! Request("test", 0L)
// 割とすぐに"returned : testtest"が出力されます
// 1秒以上後に"returned : hogehoge"が出力されます

Thread.sleep(3000L)

「react」と「receive」の違い

メールボックスにメッセージがある場合の挙動は特に変わりませんが、メールボックスが空のとき receive はスレッドをブロックします。

これは receive で処理する Actor を100個立ち上げると、ブロックされた専用スレッドが100個立ち上がる事を意味します*3

react は戻り値が Nothing 型で receive は任意の値を返すことができますが、メッセージ送信元への応答はどちらの場合も「sender ! message」です。

以下は scala 2.8.1.final の src/scala/actors/Actor.scala からの抜粋です。

  /**
   * Lightweight variant of <code>receive</code>.
   *
   * Actions in <code>f</code> have to contain the rest of the
   * computation of <code>self</code>, as this method will never
   * return.
   *
   * @param  f a partial function specifying patterns and actions
   * @return   this function never returns
   */
  def react(f: PartialFunction[Any, Unit]): Nothing =
    rawSelf.react(f)

 /**
   * Receives a message from the mailbox of
   * <code>self</code>. Blocks if no message matching any of the
   * cases of <code>f</code> can be received.
   *
   * @param  f a partial function specifying patterns and actions
   * @return   the result of processing the received message
   */
  def receive[A](f: PartialFunction[Any, A]): A =
    self.receive(f)

やってみよう

以下の Player、Game をそれぞれ Actor として定義して山手線ゲームをコメント例のように実行できるようにしてください。

import actors.Actor

case object Get
case class IsUsed(station: Station)
case class MarkAsUsed(station: Station)
case class MarkAsUsedResponse(isMarkedAsUsed: Boolean)
case class Station(name: String)

class Yamanotesen extends Actor {
  start()
  def act() = loop {
    react {
      case Get => sender ! randomStation
      case IsUsed(station) => sender ! used.contains(station)
      case MarkAsUsed(station) => {
        used += station
        sender ! MarkAsUsedResponse(used.contains(station))
      }
      case _ => throw new IllegalArgumentException
    }
  }
  def randomStation: Station = { 
    stations(new java.util.Random().nextInt(stations.length))
  }
  private val stations: List[Station] = List(
    Station("東京")
   ,Station("有楽町")
   ,Station("新橋")
   ,Station("浜松町")
   ,Station("田町")
   ,Station("品川")
   ,Station("大崎")
   ,Station("五反田")
   ,Station("目黒")
   ,Station("恵比寿")
   ,Station("渋谷")
   ,Station("原宿")
   ,Station("代々木")
   ,Station("新宿")
   ,Station("新大久保")
   ,Station("高田馬場")
   ,Station("目白")
   ,Station("池袋")
   ,Station("大塚")
   ,Station("巣鴨")
   ,Station("駒込")
   ,Station("田端")
   ,Station("西日暮里")
   ,Station("日暮里")
   ,Station("御徒町")
   ,Station("秋葉原")
   ,Station("神田")
  )
  import collection.mutable.{Set,HashSet}
  private var used: Set[Station] = new HashSet[Station]
}

// TODO Player、Gameを実装する

case object Start
val Andy = new Player("Andy")
val Brian = new Player("Brian")
new Game(Andy,Brian) ! Start

Thread.sleep(5000L)
System.exit(0)

/*
パンパン!
Andy「浜松町」
パンパン!
Brian「日暮里」
パンパン!
Andy「大塚」
パンパン!
Brian「目白」
パンパン!
Andy「品川」
パンパン!
Brian「恵比寿」
パンパン!
Andy「日暮里・・あっ!」
*/

回答例はこちらです。

https://gist.github.com/1108560

*1:メールボックスScala 独自の用語ではなく、一般にサイズ上限がないメッセージキューの事をメールボックスと呼びます

*2:元ネタはこちらです:http://wota.jp/ac/?date=20101014

*3:recieveWithin(Long)を使うとミリ秒のタイムアウト値でブロックし続ける時間を調整する事ができます