seratch's weblog in Japanese

About Scala, Java and Ruby programming in Japaense. If you need English information, go to http://blog.seratch.net/

Scala School意訳(Concurrency in Scala)

http://twitter.github.com/scala_school/concurrency.html

以下は私の方でtypoや表示崩れを直したものです。

https://github.com/seratch/scala_school/blob/master/web/_posts/2011-05-10-lesson.textile

誤訳などありましたら、お手数ですが、ご指摘いただければ幸いです。

Runnable/Callable

Runnableは値を返さないシングルスレッド処理です。

trait Runnable {
  def run(): Unit
}

CallableはRunnableに似ていますが、値を返します。

trait Callable[V] {
  def call(): V
}

Threads

Scalaでの並行処理は、Javaの並行処理モデルをベースにしています。

Sun JVMであれば、一台のマシン上でI/Oヘビーな処理を何万スレッドという単位で実行することができます。

スレッドはRunnableを受け取ります。
RunnableをrunさせるためにThreadのstartメソッドを呼び出さなければなりません。

scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start
hello world

Runnableインタフェースを実装するクラスをみるとき、それがどこか別のスレッドで実行されることを目的としていることはご存知でしょう。

Something single-threaded

これは動作はするものの問題のあるコードスニペットです。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)

  def run() {
    while (true) {
      // ここはコネクションが来るまでブロックします
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

各リクエストは、カレントスレッドの名前を応答しますが、それは常にmainスレッドになります。
このコードの主な問題点は、同時に一つのリクエストしか応答できないという点です!

各リクエストをそれぞれ別々のスレッドにするには、ただこのように変更するだけです。

(new Handler(socket)).run()

を

(new Thread(new Handler(socket))).start()

に変更します。

しかし、スレッドを再利用したり、スレッドの振る舞いについて他のポリシーを持ちたい場合はどうするのでしょうか?

Executors

Java 5のリリースで、スレッドをより抽象化したインタフェースが必要であるという決定がなされました。

Executorsオブジェクトのstaticメソッドを使って、ExecutorServiceを得ることができます。
それらのメソッドは、スレッドのプーリングなどExecutorServiceの様々な設定変更を可能にします。

これは並列リクエストを処理できるように書いた昔ながらのブロッキングネットワークサーバです。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {
    try {
      while (true) {
        // ここはコネクションが来るまでブロックします
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

これは内部のスレッドが再利用されていることを示す接続の記録です。

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

Futures

Futureは非同期での計算処理を意味します。

Futureに計算処理をラップすることができ、その計算結果が必要になったら、ブロックする処理であるFuture#get()を呼び出します。

ExecutorはFutureを返します。

FutureTaskはRunnableの一種で、Excecutorによって実行するよう設計されています。

val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

さて、結果が必要になったので、処理が終わるまでブロックしましょう。

val blockingResult = future.get()

Thread Safety

class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

// (訳者追記)例が間違っているので私の方でサンプルを載せます

class Adder {
  var i: Int = 0
  def add() = i = i + 1
}

このプログラムはマルチスレッド環境では安全ではありません。
もし2つのスレッドが同じAdderのインスタンスへの参照を持っていてaddメソッドを呼び出した場合
両方のスレッドの呼び出しが終わった後で変数iの値がどのようになっているかは予測できません。
2かもしれないし、1かもしれません!

Javaのメモリモデルでは、それぞれのCPUはL1キャッシュやL2キャッシュに値をキャッシュすることができるので、
2つのスレッドが別々のCPUで実行されている場合、それぞれのスレッドが別々のデータを見ている可能性があります。

マルチスレッドに一貫したデータアクセスを強制させるためのツールについて説明しましょう。

Three tools

synchronization
ミューテックスによって所有者の意味を持たせます。
(あるスレッドが)ミューテックスに入ると、そのスレッドがそれを所有します。
JVMでミューテックスを使う最もありふれたやり方は何かをsynchronizeすることです。
今回のケースでは、userMapをsynchronizeします。

JVMではnullでない全てのインスタンスをsynchronizeすることができます。

class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

// (訳者追記)これも例と文章がずれているので私の方で簡単なサンプルを載せます

object Service {
  val userMap = new collection.mutable.HashMap[Int, String]
  def addUser(id: Int, name: String) = {
    userMap.synchronized {
      userMap.update(id, name)
    }
  }
}
volatile
Java 5でのメモリモデルの変更により、volatileとsynchronizedはvolatileではnullが許容される点を除いて基本的に同じになりました。

synchronizedはよりきめ細かいロックをかけることができますが、volatileは毎回のアクセスを同期します。

class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}
AtomicReference
またJava 5では多くのローレベルの並行基本命令が追加されました。
そのうちの一つがAtomicReferenceクラスです。

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}
Does this cost anything?
AtotmicReferenceは、値へのアクセスをメソッドによるディスパッチを経由する必要があるので、3つの選択肢の中で最もコストがかかります。

volatileとsynchromizedはJavaの組み込みのモニター機構を利用しています。
このモニターは競合がない場合にはコストは極めて小さくなります。
synchronizedはいつ同期するかについてよりきめ細かい制御が可能なので、
競合が少なければ、synchronizedが最もコストの小さい選択肢になる傾向があります、

synchronizedを使った箇所に入るとき、volatileな参照へアクセスするとき、あるいはAtomicReferenceに従うとき、
JavaはCPUに自身のキャッシュラインをフラッシュすることを強制し、データアクセスの一貫性を確保します。

もし私が間違っていたら訂正してください。
複雑な題材なので、きっと長いクラス討議になるに違いないですが。

Other neat tools from Java 5

既にAtomicReferenceで触れた通り、Java 5は多くの優れたツールをもたらしました。

CountDownLatch

CountDownLatchはマルチスレッドが互いに通信し合うためのシンプルな仕組みです。

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)

doneSignal.await()
println("both workers finished!")

これはとりわけユニットテストに最適です。
ある非同期処理をやっているときに、それが(非同期処理をやっているスレッド側で)完了したかどうかを確認したい場合を仮定してみましょう。
ただ、関数にラッチをカウントダウンさせるようにして、テストの中でそれを待ち受ければよいのです。

AtomicInteger/Long

IntやLongの値をインクリメントすることはありふれたタスクなので、AtomicIntegerやAtomicLongが追加されました。

AtomicBoolean

たぶんこれが何であるかを説明する必要はないと思います。

ReadWriteLocks

ReadWiteLockはreaderとしてのロックとwriterとしてのロックを提供します。
readerロックはwriterロックが取得されているときだけブロックします。

Let’s build an unsafe search engine

スレッドセーフではない単純な転置インデックス(inverted index)です。
この転置インデックスは与えられたUserに名前の一部分をマッピングします。

シングルスレッドでのアクセスだけを仮定した考えの甘いやり方で書かれています。

留意すべき点は、mutable.HashMapを使ったデフォルトコンストラクタthis()です。

import scala.collection.mutable
case class User(name: String, id: Int)
class InvertedIndex(val userMap: mutable.Map[String, User]) {
  def this() = this(new mutable.HashMap[String, User])
  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }
  def add(term: String, user: User) {
    userMap += term -> user
  }
  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

さしあたり、どうやってUserをこのインデックスから取得するかは省略します。
それはあとでとりかかるとしましょう。

Let’s make it safe

先の転置インデックスの例では、userMapがスレッドセーフであることが保証されていませんでした。
複数のクライアントが同時にアイテムを追加しようとすると、最初のPersonの例で見たのと同じ類いのアクセス権エラーが発生します。
userMapがスレッドセーフでないなら、どうやってそれを変更するときにシングルスレッドだけにすればいいでしょうか?

まずあなたはアイテムを追加する時にuserMapをロックすることを考えるかもしれません。

def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

残念ながらこれはあまりにお粗末です。
常に、ミューテックスの外側で可能な限りとてもコストの大きい処理をしようとしています。
私がロックは競合が少なければコストが小さくなると言ったことを思い出してください。
ブロックの中でやることが少なければ、きっと競合も小さくなるでしょう。

def add(user: User) {
  // tokenizeName was measured to be the most expensive operation.
  val tokens = tokenizeName(user.name)
  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

SynchronizedMap

SynchronizedMapトレイトを使えば、mutableなHashMapにシンクロナイゼイションをミックスインすることができます。

synchronizedなインデックスを構築するための簡単な方法を提供するように転置インデックスを拡張することができます。

import scala.collection.mutable.SynchronizedMap
class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

これの実装を見ると単に全てのメソッドで同期しているだけということに気がつくでしょう。
あなたが望むパフォーマンスは出ないかもしれません。

Java ConcurrentHashMap

Javaには素晴らしいスレッドセーフなConcurrentHashMapがあります。
ありがたいことに私たちはScalaのセマンティックスを与えるためにJavaConversionsを使うことができます。

実際には、私たちはスレッドセーフな転置インデックスを古いセーフでなかったものの拡張としてシームレスに重ね着させることができます。

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User])
}

Let’s load our InvertedIndex

The naive way
trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}
// (訳者追記)このコードはコンパイルできません

私たちの(読み込んだ)ファイルの各行でmakeUserを呼び出して転置インデックスに追加します。
concurrentな転置インデックスを使ったら、並行してaddを呼び出せます。
makeUserには副作用がないので既にスレッドセーフです。

ファイル読み込みは並行処理にはできませんが、userデータの組み立てやそれをインデックスに追加するのはパラレルに行うことができます。
A solution: Producer/Consumer
非同期処理のよくある実装パターンはProducerとConsumerを分離してQueueを介してコミュニケートさせるというものです。
では、それがどのように我々の検索エンジンのインデクサに役立つか見てみましょう。

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Producerの具象クラス
class Producer[T](path: String, queue: BlockingQueue[T]) implements Runnable {
  public void run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}

// Consumerの抽象クラス
abstract class Consumer[T](queue: BlockingQueue[T]) implements Runnable {
  public void run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }
  def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()

// Consumerのための一つのスレッド
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}

// このマシンに8個のCPUコアがあるふりをしてみましょう 
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

// Submit one consumer per core.
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}