ScalaとSwiftでスマホからしゃべるぜWebSocket Android編(2)

前回ご紹介した、Promise/Future版のWebSocketクラスですが、その最後でも触れたとおり、Android開発においては、各部品のライフサイクルと循環参照に留意しなければなりません。
Androidには、プロセス、タスク、アクティビティー、サービス、フラグメント、ビューといった各概念に、それぞれ異なるライフサイクルが割り当てられています(とりわけ、「プロセスがなくなっても存在するタスク」という概念には度肝を抜かれたものでした)。そしてWebSocket通信に限らず、バックグラウンドで継続的に動作を行うためには、どの概念に自らのライフサイクルを紐付けるのかが重要です。WebSocket通信をプロセスに紐付けることはあまりないと思いますが、アクティビティーやフラグメントが消滅しても通信を維持したいのであれば、サービスを作ってそこで通信するのがベストでしょう。
一方、画面表示やユーザー操作と密結合する通信なのであれば、フラグメントに紐付けるのが簡便です。とりわけ、setRetainInstance(true)としたフラグメントであれば、メモリー不足にならない限りはプロセスレベルに近いライフサイクルになりますので、コードがとても書きやすくなります。
またもう1点、Promise/Futureは非同期遅延実行のモナディックな超便利道具ですが、逆に言えば、スレッドの多用を引き起こします。ExecutionContextでこのあたりのパフォーマンス低下にはぬかりなく対処しているScalaですが、WebSocketライブラリーの外部でスレッド操作を極力避けられれば、それだけ構造もシンプルになるというものです。

軽量、かつAndroidの部品のライフサイクルに密結合したバージョンを後ろ向きに書いてみる

というわけで、スレッドを極力使わずに、また親のライフサイクルに密着したWebSocket通信も書いてみました。
言うまでもなく、

  • Promise/Futureではなく、コールバックによる管理
  • リクエストに対してはコールバックでレスポンスを返す
  • WebSocketじたいの状態変化を返すコールバック登録ができるのは1インスタンスのみ
  • WeakReferenceによる消極的自爆

という後ろ向きな話です。
これ、ちょっと工夫すれば後ろ2つは解決できます。が、今回の目標は「画面表示やユーザー操作と密結合」。これらを扱うのは主に「現在表示中のアクティビティーかフラグメント」、つまり1インスタンスなので、これらの表示/非表示とライフサイクルが一致していればそれで足りるはずです。

だがしかし…ノースレッドは無理筋だったorz

前回のPromise/Future利用ですと、WebSocketの接続確立という非同期、リクエスト送信→レスポンス受信という非同期、この2つの非同期をmapでうまく処理できました。
が、nv-websocket-clientを使う限り、ライブラリーの呼び出し元が「いま、接続確立しているかどうか」という「状態」を保持することなしには、その後のリクエスト送信を開始できません。
というのも、最終的なリクエスト送信のエンドポイントとなるWebSocket#sendFrameは、内部変数による同期をとりながら、接続がない場合のリクエストを黙って捨てるだけ、という実装なのです。つまり、「接続されているかどうか」と「送信をキューイングするかそのまま送るか」という判断を、シンプルかつアトミックには行えません(もしやるなら、WebSocketインスタンス自体でロックするという非効率な方法しかない)。接続開始のコールバックまでリクエスト送信を待ち、それからリクエストを開始、となるわけです。
これを、通常のAndroid開発のまま、あるいはScaloidのような薄いラッパーを通して扱う場合、必然的に「接続確立かどうか」を呼び出し元がフラグとして持たざるを得なくなります。

とりあえずワーカースレッドを1つ立ててなんとかします(T_T)

というわけで、より一段と原始的になりまして(涙)、ワーカースレッドを立ててその中で同期接続→確立後はキューイングされたリクエストを送信する→終了リクエストが来たら自爆する、という実装をとることにしました。これにより、接続確立かどうかはワーカースレッドの内部状態に封じ込めることができます。また、常時キューイングすることでリクエストの送信がいつでもできることになりました。

まずシャットダウンできるブロッキングデックをつくってみる

別スレッドでリクエストを受け付ける、という今回の目的には、JavaのBlockingDequeを使うのが最も優れています。しかしそれだけでは決定的によろしくありません。そのままですと、接続がなくなった≒スレッドが終了した以後、キューにリクエストを追加することができてしまいます。そしてそのキューから誰も何も取り出さないため、リクエストは永遠に闇に葬られることになります。どころかメモリリークします。
そこでまずこんなクラスを書いてみました。

class ShutdownableBlockingDeque[T]
extends java.util.concurrent.LinkedBlockingDeque[T] {
private var enabled: Boolean = true
def shutdown(): Option[List[T]] = synchronized {
if (enabled) {
enabled = false
val list = iterator().asScala.toList
clear()
Some(list)
} else {
None
}
}
override def offer(d: T): Boolean = synchronized {
if (enabled) super.offer(d)
else false
}
override def offerFirst(d: T): Boolean = synchronized {
if (enabled) super.offerFirst(d)
else false
}
}

内部利用なんでoffer/offerFirstの2つしか対策していませんが、きちんとやるなら全メソッドでチェックになります。
これを書くときに苦労したのは、shutdownが返す、未処理のリクエストの集合です(これがないと、処理できなかったリクエストにエラーを返せない)。なにせJava Collectionなので、直接コピーを取る方法がtoArrayしかない。そしてtoArrayにはなぜか引数でJava配列を渡さなければならないというJavaの理不尽、さらにScala配列が闇処理をしまくってJava配列に寄せられているというScalaのダークサイド。これを直接乗り越えるためにはClassTagを暗黙引数にし、かつTがAnyRefであると保証した上で、Array[T]()を渡す、という話になってしまいます。なのでここは発想を変えて、まずJavaのイテレーターを取り、それをasScalaでScalaイテレーターに変換、さらにtoListでコピーを取る、という作戦としました。

さらにFutureっぽいことをスレッドなしで実装する

リクエストに対するレスポンスまたはエラーは、Futureならそこで素直に返せます。が、今回はそれを使わない、しかもスレッドも使わない、ということで、これまた制限ができるわけです。
Futureの何がすごいって、Promise側で成功をマークした後で発行されたFutureインスタンスに、後付けでonSuccessを設定すると、その後でちゃんとonSuccessが呼ばれてくるというミラクルです。こんな魔法はスレッドが裏方で動いてるおかげですが、今回はそういうことをせずになんとかするっていう話なので、ここでまた安易な方向に逃げました。
以下、リクエストを表すWebSocketRequestクラスです。

class WebSocketRequest(val request: String) {
def onSuccess(response: String): Unit = {}
def onFailure(error: Throwable): Unit = {}
}
case object WebSocketFinalizeRequest extends WebSocketRequest("")

ご覧のとおり、onSuccess/onFailureコールバックつきです。これとリクエストを一体化することで、WebSocketにリクエストを送る時点で必ずコールバックを呼び出せるようになりました。
あとついでに、クローズ命令もcase objectで作っておきました。

というわけでコールバック版WebSocket通信クライアント

trait WebSocketStateListener {
def onSocketResult(result: Throwable): Unit
}
case object AlreadyClosedException extends RuntimeException
case object ClosingException extends RuntimeException
case class SendException(frame: String) extends RuntimeException
class WebSocketCB(parameters: WebSocketParameters, listener: scala.ref.WeakReference[WebSocketStateListener])
extends WebSocketAdapter
with Runnable
with TagUtil {
private val queue = new ShutdownableBlockingDeque[WebSocketRequest]
private val waitings = new scala.collection.mutable.HashMap[Int, WebSocketRequest]
new Thread(this).start()
def request(data: WebSocketRequest): Boolean = {
if (queue.offer(data)) {
true
} else {
data.onFailure(AlreadyClosedException)
false
}
}
def close(): Boolean = queue.offerFirst(WebSocketFinalizeRequest)
def run(): Unit = {
val factory = new WebSocketFactory
try {
val socket = factory.createSocket(parameters.uri, parameters.timeout)
.addHeader("cookie", parameters.cookie)
.addListener(this)
.connect()
Iterator.continually(queue.take()).forall({
case WebSocketFinalizeRequest =>
queue.shutdown().get.foreach(_.onFailure(ClosingException))
socket.disconnect()
false
case data: WebSocketRequest =>
waitings.synchronized(waitings += ((data.hashCode, data)))
socket.sendText(s"${data.hashCode} ${data.request}")
true
})
} catch {
case e: Exception =>
queue.shutdown().get.foreach(_.onFailure(ClosingException))
listener.get.foreach(_.onSocketResult(e))
}
}
protected def parseFrame(message: String): Option[(WebSocketRequest, String)] = {
try {
val responses = message.split(" ", 2)
if (responses.length < 2) {
None
} else {
val hashCode = java.lang.Integer.parseInt(responses(0))
waitings.get(hashCode) match {
case Some(data) =>
waitings.synchronized(waitings -= hashCode)
Some(data, responses(1))
case None =>
None
}
}
} catch {
case e: Throwable => None
}
}
override def onTextMessage(s: WebSocket, message: String): Unit = {
parseFrame(message) match {
case Some((data, response)) =>
data.onSuccess(response)
case None =>
error(s"unknown message $message")
}
}
override def onDisconnected(s: WebSocket, sf: WebSocketFrame, cf: WebSocketFrame, closedByServer: Boolean): Unit = {
waitings.synchronized {
val list = waitings.toList
waitings.clear()
list
}.foreach(_._2.onFailure(ClosingException))
if (closedByServer) listener.get.foreach(_.onSocketResult(ClosingException))
}
override def onSendError(s: WebSocket, cause: WebSocketException, frame: WebSocketFrame): Unit =
listener.get.foreach(_.onSocketResult(SendException(if (frame != null) frame.getPayloadText else "")))
}

例によって初心者丸出しのコードで申し訳ありませんが、ご覧ください。
nv-websocket-clientのソケットはワーカースレッドの寿命とほぼ一致していて、ブロッキングデックから受け取った終了命令やリクエストに対応します。終了命令がない限りは無限にループするIterator.continuallyが大活躍です。
前回のPromise/Future版ではリクエストの通し番号をAtomicLongでご丁寧に発行してましたが、よく考えたら、PromiseのhashCode値を通し番号に流用できますね(^^ゞ、ということで今回はWebSocketRequest#hashCodeを通し番号にしています。
また切断通知や例外発生時は、onSocketResultコールバックを通じて行います。もちろんリスナーは弱参照にして循環参照を抑止しています。

という感じで、2回にわたって、ScalaでAndroidからWebSocketをしゃべるための無駄な努力を綴ってみました。そんなドジでのろまな亀をやっている間に、既に時代はReactiveに流れていることは明らかで、ていうかそもそもScaloidの作者がReactoidとかいうステキげなフレームワークを目論んでるっぽいじゃないですか!
そういう意味では既に何周も周回遅れしている塩梅のワタクシなのですが、Scala学び始めて4ヶ月の身としては、これも何らかの実にはなっているのかな、というところで、お目汚しすみませんでありますm(__)m

次回は、SwiftでiOSからWebSocketをしゃべってみます。

Scalaエンジニアの求人