ScalaとSwiftでスマホからしゃべるぜWebSocket Android編(1)

スマホアプリの開発にあたり、これまで個人的に触ったことがなかった技術をいくつか使うことになりましたが、その1つがWebSocket
名前のとおり、主にWeb技術で使うソケット通信です。クライアントとサーバーのハンドシェークが終われば、後はだらだらと双方向にデータを垂れ流せる。はるか昔のまだ若いころ、Unixマシンにはじめて触って、TCPソケットに普通のread/writeシステムコールで読み書きできると知ってかなり驚いたあのソケット。その後技術の進歩とともに生のまま触ることはほとんどなくなっていたソケット。それが現代に蘇ったというのは懐かしくもあり、逆にWebページ内部からの通信がこれまで本当に原始的だったんだという気付きでもあります。

マイトでは、サーバー側は言うまでもなく賢人大宮がScala+Playで構築しており、Actorモデルバリバリで処理を行っています。当然に、Webチャットサービスを作ろうとしたら、クライアント側はJavaScriptでWebSocketしゃべっておしまい、となるに決まってます。
そしてその中に放り込まれた老人米澤。あぁなんて整然とした世界なんだここは。
そして、ということは、やっぱりスマホアプリもWebSocketしゃべらないとだめだよねー。
そんなわけで、ScalaとSwiftで、AndroidとiOSからWebSocketをしゃべってみましたのでご報告です。なんと3回に分けての連載となりまして、今回はAndroid編(1)となります。

要求仕様

以下の通信をサーバーと行います。

  • クライアントのUIスレッドからプレーンテキストのリクエストデータを投げる
  • サーバーから、リクエストに基づいたレスポンスデータがプレーンテキストで返される
  • リクエストデータの先頭には”ユニークな整数値+空白1文字”が含まれ、サーバーはリクエストに対するレスポンスデータの先頭に同じものを付加して返す
  • リクエストデータの大きさは考慮しなくてよい

なんだかゆるゆるというか実用性に乏しく感じられますが、まぁJSONでやりとりする場合でもこれで行けることは行けます(^^ゞ。そしてサーバー側での処理が前後しても大丈夫。プレーンテキスト偉大。

Javaライブラリー選び

突然ですが、Androidには「メソッド64k制限」というモノが存在します。これは、Dalvik VM専用の実行バイナリーである.dexファイル(.apkの中にリソースなどとともにアーカイヴされています)の中で、invokeできるメソッドに16bit整数で通し番号がつけられている、というとんでもなく低い限界です。まるで8086のアドレッシングと似た壁で、さらにAndroid 5からはマルチdexという機能がつきまして、これまた8086のラージモデルじゃん! …とまた年寄り話になってしまいすみませんが、4.x系をサポートする場合はこの64k制限は回避できません。
そして、この制限などから、Androidで使えるWebSocketライブラリーは限られているようです。
さらに、ScalaでAndroid開発をする場合、この制限がキツく効いてきます。標準ライブラリーだけでもこの上限に近い数のメソッドがあるからです。また、しばしば、JSONライブラリーとしてAndroidビルトインのorg.jsonを使うのも同じ理由です。
最終的には、Proguardという難読化(これがヘタレなんですが…)ツールで参照されていないメソッドを削除しますが、逆にここで削除しすぎ現象、リフレクションを踏まえない問題が発生しやすいのもまた罠となります。
そういうトラブルに対処するのも時間がもったいない、ということで、今回は、Androidでの実績がある、nv-webbsocket-clientを使うことにしました。
さぁ、あとはこれを、Scaloidと似た感じでうすーくラップして使えればよいところ。
しかしここでまた悩まされるのがAndroidなのでした。

最強のAkka Actorを使う…orz

まさにこれがベストプラクティスに違いありません。組み込めれば、ですが。
しかし実際には、まさに前述した64k制限の対処がなかなか大変なようで…ぐぐるといくつか有用な情報は出てきましたが、そもそも私は個人的に「新しい技術にチャレンジして必ず地雷を踏む」という事例が明らかに他の技術者より多いんです!(涙) なので、こういうのは趣味の範疇で転がしてから実戦投入したいと思い、泣く泣く回避しました。

じゃ、Promise/Future!

次の策としてPromise/Futureで自力でなんとかするプレイでしょう。WebSocketクライアント側が、他の部品からの要求に答えてPromiseを発行、そこから取り出したFutureを部品に返します。そしてWebSocketに書き込み、レスポンスをPromise経由で返せばよいわけです。
これが優れているのは、呼び出し元と呼び出し側が疎結合になる、という一点に尽きます。一方で、一般的なメリットである「スレッドを意識しないでよい」というのは、Androidはじめとするウィンドウシステムにありがちな「UI操作は常にメインスレッドからでなければならない」制約がある場合は、あまりメリットにならなかったりもしますが。

というわけでPromise/Future版WebSocket通信クライアント

case class WebSocketParameters(uri: String, timeout: Int, cookie: String)

class WebSocketPF(parameters: WebSocketParameters)
  extends WebSocketAdapter with TagUtil {

  implicit val exec = ExecutionContext.exec

  private val sequencer = new java.util.concurrent.atomic.AtomicLong
  private val promises = new scala.collection.mutable.LongMap[Promise[String]]

  private val socket: Future[WebSocket] = Future {
    val factory = new WebSocketFactory
    factory.createSocket(parameters.uri, parameters.timeout)
      .addHeader("cookie", parameters.cookie)
      .addListener(this)
      .connect()
  }

  def request(requestString: String): Future[String] = {
    val promise = Promise[String]()
    val sequence = sequencer.incrementAndGet()
    val request = s"${sequence.toString} $requestString"
    promises.synchronized(promises += (sequence, promise))
    socket.map { socket =>
      if (socket.isOpen) Future { socket.sendText(request) }
      else throw new IllegalStateException
    }.onFailure {
      case t =>
        info(s"socket failed $t")
        promises.synchronized(promises -= sequence)
        promise.failure(t)
    }
    promise.future
  }

  private def isOpen(t: Try[WebSocket]): Boolean = t.map(s => s.isOpen).getOrElse(false)

  def isReady: Boolean = socket.value.fold(true)(isOpen)

  def isOpen: Boolean = socket.value.fold(false)(isOpen)

  def close(): Unit = socket.foreach(_.disconnect)

  protected def parseFrame(message: String): Option[(Promise[String], String)] = {
    try {
      val responses = message.split(" ", 2)
      if (responses.length < 2) {
        None
      } else {
        val sequence = java.lang.Long.parseLong(responses(0))
        val body = responses(1)
        val promise = promises.get(sequence)
        promise match {
          case Some(promise_) =>
            promises.synchronized(promises -= sequence)
            Some(promise_, body)
          case None =>
            None
        }
      }
    } catch {
      case e: Throwable => None
    }
  }

  override def onTextMessage(s: WebSocket, message: String): Unit = {
    parseFrame(message) match {
      case Some((promise, body)) =>
        promise.success(body)
      case None =>
        error(s"unknown message $message")
    }
  }

  override def onDisconnected(s: WebSocket, sf: WebSocketFrame, cf: WebSocketFrame, closedByServer: Boolean): Unit = {
    promises.synchronized {
      val promises_ = promises.toList
      promises.clear()
      promises_
    }.foreach(_._2.failure(new IllegalStateException))
  }

  override def onSendError(s: WebSocket, cause: WebSocketException, frame: WebSocketFrame): Unit =
    parseFrame(frame.getPayloadText).foreach(_._1.failure(cause))

  override def onError(s: WebSocket, cause: WebSocketException): Unit = {
    error(cause.getMessage)
  }
}

…Scala初心者の、なんだかどうにもなってないコードですみませんm(__)m
とにかく、Promise/Futureの仕組みに極力乗っかるように努力はしてみたつもりです。

コネクション

Androidでは(というかウィンドウシステムならたいていは)、UIスレッドでブロックする行為はご法度。とりわけAndroidの場合はウォッチドッグがきっちり発動するようになっています。
一方、nv-websocket-clientは、読み書きは別スレッドで行いますが、最初の接続は同期/非同期と選べます。そして非同期を選ぶと、コネクション完了の通知手段はリスナーコールバック。せっかくPromise/Futureで行くというのに、これはいけません!(^^ゞ
というわけで、WebSocketじたいの接続は同期モードを使い、それをFutureで実行させるようにしてみました。
こうすると、やはり非同期になるリクエスト/レスポンスの扱いを、mapで逐次処理できるようになります(requestメソッドはあまりよいmapの使い方になってないんですが…)。

リクエスト/レスポンス

データだだ流しのWebSocket、当然その上でパケット的なものを実現するには、通し番号が必要です。あぁホントに原始的な話。まぁ状態を抱え込むことにはなりますが、それはAtomicLongとmutableなLongMapの2つに封じ込めています。AtomicLong#incrementAndGetで通し番号を発行し、それをキーとして個々のリクエストに対するPromiseをLongMapに保持し、レスポンスに応じてsuccessさせるという。
なお、LongMapを完全自力でsynchronizedさせていますが、本来ならconcurrent Mapでスマートにできそうなもの。しかし、onDisconnectedで、待ち行列に溜まっているリクエストをすべて失敗させないといけなくて、この処理をアトミックにするにはこうするしかないかなと。

…あれ? 切断の通知は?

このクラスには、切断を伝える口がありません。あくまでも、リクエストの戻り値となるFutureが失敗していることで切断を検知することになります。isOpen/isReadyとか作りましたが、あまり使いみちはなさそうです。
ただ、自力で書く限り、これ以上の仕組みは必要ない、ともいえます。
というのは、もし切断通知の仕組みをWebSocket側に実装したとして、それをAndroidのアクティビティーやフラグメントが自処理のためにリッスンした場合、典型的な循環参照が発生してしまうからです。各部品がWebSocketインスタンスを抱え、WebSocketインスタンスに紐付いた切断通知はその部品への通知を抱えることになります。これはプロセスと部品の寿命が異なり、また古いバージョンでのGC性能に問題があるAndroidでは見過ごせない問題です。
ただ、UI上、切断後ただちにアラートをユーザーに上げる、というのも、アプリによっては必要です。
次回は、これらの問題について、極めて後ろ向きに対処することにします(^^ゞ

おまけ ExecutionContext問題

WebSocketPFクラスの冒頭に、謎の

implicit val exec = ExecutionContext.exec

という行があります。これは、ScalaでAndroid開発をする場合の鬼門です。
http://blog.scaloid.org/2013/11/using-scalaconcurrentfuture-in-android.html
私はとりあえず専用のExecutionContextオブジェクトを機械的に置いて、機械的に参照するようにしています

おまけ2 runOnUIThread

Futureに登録するコールバックはメインスレッド以外から呼び出される可能性が強くあります。そこでScaloidの拡張されたrunOnUiThread! これで気軽にUIスレッドで処理を行えます。

Scalaエンジニアの求人