読者です 読者をやめる 読者になる 読者になる

seratch's weblog in Japanese

About Scala, Java and Ruby programming in Japaense. If you need English information, go to http://blog.seratch.net/

FinagleのServiceとNettyの関係(Finagle 1.2.3)

この記事は Finagle 1.2.3 について書かれたものです

外部インタフェースの互換性は維持されていますが、4.0.2 時点での実装をみると内部的にはかなり変更されているようです。

FinagleはNettyのラッパー

FinagleはNettyの薄いラッパーになっています。サーバ側でもクライアント側でも処理の主体となっているServiceは、実際のところ、Nettyのレイヤーではどうなっているのか調べました。Finagleのソースコードは、version 1.2.3のタグを読みました。

サーバ側のService

サーバ側で使うServiceはFinagleのユーザが生成してServerBuilder#build(Service)に渡します。

それが一体どのように使われるのか、処理の流れを追ってみました。

サンプル例

イメージがわきやすいように具体的なサンプルを挙げておきます。

単純に「OK」という文字列をボディとするHttpResponseで返すサービスです。

import com.twitter.finagle.Service
import com.twitter.finagle.http.Response
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse}

val okService = new Service[HttpRequest, HttpResponse] { 
  def apply(req: HttpRequest): Future[HttpResponse] = { 
    val rep = Response()
    rep.setContentString("OK\n")
    Future(rep) 
  }
}

import com.twitter.finagle.builder.ServerBuilder
import com.twitter.finagle.http.Http
import java.net.InetSocketAddress

val server = ServerBuilder().codec(Http()).bindTo(new InetSocketAddress(9000)).name("okServer").build(okService)

// curl http://localhost:9000/

import com.twitter.util.Duration
import java.util.concurrent.TimeUnit

server.close(Duration(1, TimeUnit.MICROSECONDS))

余談ですが、FinalgeのexampleだとNettyのDefaultHttpResponseを使っているところ

finagle-example/src/main/scala/com/twitter/finagle/example/http/HttpServer.scala

DefaultHttpResponseをラップしたResponseがfinagle-httpにあり、これを使うと記述を少しシンプルになります。

finagle-http/src/main/scala/com/twitter/finagle/http/Response.scala

このようにNettyのDefaultHttpResponseだと、かなり冗長になってしまいますが

import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
import org.jboss.netty.handler.codec.http._
val rep = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
rep.setContent(copiedBuffer("OK\n".getBytes))

今回のように200 OKで単に「OK」を返すだけなら、このように短く済ませることができます。

import com.twitter.finagle.http._
val rep = Response()
rep.setContentString("OK\n")

しかし、Serviceの外部インタフェースは、Nettyのorg.jboss.netty.handler.codec.http._を使わなければ動作しないようです。

HerokuのScala対応アナウンスで紹介されていたサンプルコードでも同じようになっています。

http://blog.heroku.com/archives/2011/10/3/scala/

ソースコードリーディング

ServerBuilder#build(Service)のソースをみると、中でNettyのServerBootstrapのインスタンスを生成して起動しています。

finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala#L180
finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala#L338

返されるServerはclose()でNettyのServerBootstrapを終了させる機能をもっているだけのようです。

finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala#L340-388


ServerBuilder#build(Service)に渡されたServiceはNettyのChannelHandlerに変換されて

finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala#L314-315

ChannelPipelineに"channelHandler"として追加されています。

finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala#L333


なお、このSerivceToChannelHandlerはNettyのChannelUpstreamHandlerです。

finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelClosingHandler.scala


また、ChannelPipelineはCodec#serverPipelineFactoryからgetPipelineで取得しているので

finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala#L222

例えばHttpの場合、以下で定義されているものです。

finagle-core/src/main/scala/com/twitter/finagle/builder/Http.scala#L27-49

ServiceとNettyの関係のまとめ

ServerBuilderは、NettyのServerBootstrapを生成して立ち上げます。

ServiceはNettyのChannelUpstreamHandlerとしてNettyサーバのChannelPipelineに追加される存在です。

クライアント側のSerivce

クライアント側はClientBuilder#build()を呼び出すと戻り値としてServiceが返ってきます。

これは一体どのようなものなのか調べました。

サンプル

先ほどのサーバにHTTPでリクエストするシンプルなクライアントの例です。

import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http
import java.net.InetSocketAddress

val clientService = ClientBuilder().codec(Http()).hosts(new InetSocketAddress(9000)).hostConnectionLimit(1).build()

import org.jboss.netty.handler.codec.http._
import org.jboss.netty.util.CharsetUtil.UTF_8

val req: HttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")
val future: Future[HttpResponse] = clientService(req) onSuccess {
  rep => println("response: " + rep.getContent.toString(UTF_8))
} onFailure {
  error => println("error: " + error.getClass.getName)
}
future.get()
ソースコードリーディング

ClientBuilder#build()は内部の#buildFactory()で作ったServiceFactoryをServiceに変換しています。

finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala#L312


FactoryToServiceの中身をみると、Service#apply(Request)のタイミングでServiceFactory#make()を呼んで、その戻り値のServiceのapplyメソッドにRequestを渡しています。

finagle-core/src/main/scala/com/twitter/finagle/Service.scala#L61-72


戻って、#buildFactory()では、ChannelServiceFactoryが生成されています。

finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala#L285-288

ChannelServiceFactory#make()は以下のような処理です。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala#L110-114

ClientBootstrap#connect()が成功したら、その戻り値のChannelFuture#getChannel()で取得したNettyのChannelを、ChannelService生成時に渡します。


ちなみに、ChannelServiceFactoryにはcom.twitter.finagle.util.Conversions._がimportされていて

finagle-core/src/main/scala/com/twitter/finagle/util/Conversions.scala
http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelFuture.html

ここで定義された暗黙の型変換によってNettyのChannelFutureがRichChannelFutureに変換されています。

そのRichChannelFutureは以下のようなapplyメソッドを持っているので

finagle-core/src/main/scala/com/twitter/finagle/util/ChannelFuture.scala#L26-32

このようにpartial functionでClientBootstrap#connect()から戻ってきたRichChannelFutureのStateを判定して処理しています。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala#L110-121
finagle-core/src/main/scala/com/twitter/finagle/util/ChannelFuture.scala#L34-40

bootstrap.connect() {
  case Ok(channel) =>
  ...


ChannelServiceの生成時にChannelPipeline(クライアント側のChannelPipeline)を取得して

http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelPipeline.html

"finagleBridge"という名前でChannelUpstreamHandlerを追加します。コードに書かれている場所から分かりづらいですが、ここはインスタンス生成時に実行されます。messageReceivedイベントなどで#reply(Try[Response])を呼び出すChannelHandlerです。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala#L53-64

#reply(Try[Response])の中で、次にみる#apply(Request)の戻り値として返すPromise(外部インタフェースとしてはFuture[Response]型)オブジェクトの#updateIfEmpty(Try[A])を呼び出しています。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala#L28-50

#apply(Request)では、インタンス生成時に渡したChannelオブジェクトと引数のRequestを使って、NettyのChannels.write(Channel, Object)を呼び出します。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala#L66-79
http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/Channels.html#write(org.jboss.netty.channel.Channel, java.lang.Object)

戻り値として、ChannelServiceの内部でも参照しているResponseのPromise(Futureのサブ型)を返します。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala#L75

ServiceとNettyの関係のまとめ

ClientBuilder#build()の時点でNettyのClientBootstrap#connect()が呼ばれます。この時点でサーバ側が立ち上がっていないと例外が発生します。

また、このタイミングで、クライアント側のChannelPipelineにFinagleとのブリッジとなるChannelUpstreamHandlerを登録して、非同期レスポンスを受け取れるようにします。

ClientBuilder#build()で返されるServiceは、上記の流れで確立したChannelを保持したChannelServiceです。

finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala

#apply(Request)を呼び出すと保持するChannelにメッセージをwriteしてその結果のFutureを戻り値として返します。