WebSocket on Android with OkHttp and RxJava

08/12/2019

Have you ever heard about WebSocket? I didn’t until a couple of years back and when I first started working with that I immediately saw the beneficial of using it. If we have a look on internet about what WebSocket are made for that’s what we find on Wikpedia:

WebSocket is a computer communications protocol, providing full-duplex communication channels over a single TCP connection.
The WebSocket protocol enables interaction between a web browser (or other client application) and a web server with lower overhead than half-duplex alternatives such as HTTP polling, facilitating real-time data transfer from and to the server. This is made possible by providing a standardized way for the server to send content to the client without being first requested by the client, and allowing messages to be passed back and forth while keeping the connection open. In this way, a two-way ongoing conversation can take place between the client and the server.

Just to explain it with an example, imagine that you want to write a client chat app and in the simplest implementation ever, every time your users send a message, the message is stored in a database server side and your app needs to read it from the server.

Imagine that the first user sends a message, this message is stored, now your client app needs to ask the server to give the message back and show it in a chat interface; but you don’t know when this information will be available on the server (so when the other user will answer) and the only way your client app has to know whether there are messages is it to constantly and periodically interrogate the server with “do you have messages I need to show?”, “do you have messages I need to show?”. This is called polling and it is very expensive. With WebSockets the server is able to send message to the client without being first requested from the client and allowing messages to be passed back and forth while keeping the connection open.

Now that we know how a WebSocket works, in this article I will explain how to write a WebSocket for an Android App using OkHttp, RxJava in Kotlin.

 

In Android we can use a library called OkHttp, which among other things can help create a WebSocket connection. So the first step is to add the library in the gradle build file and add internet permission in the Android Manifest.(When I first started the implementation I used version 3 but as far as I know there is also a version 4 of OkHttp)

compile ‘com.squareup.okhttp3:okhttp:3.14.0’

When creating the connection to the WebSocket we are required by the library to write a listener that will take care of opening/closing the web socket connection alongside managing the messages received. The main part of the implementation would look like that:

val webSocketListener = WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
Timber.i("Socket connection: onOpen: $response")
}

override fun onMessage(webSocket: WebSocket, text: String) {
Timber.i("Socket connection:onMessage (Text): $text")
}

override fun onMessage(webSocket: WebSocket,
bytes: ByteString) {
Timber.i("Socket connection:onMessage (ByteString): $bytes")
}

override fun onClosing(webSocket: WebSocket,
code: Int, reason: String) {
Timber.i("Socket connection:onClosing: Code: $code Reason:
$reason")
}

override fun onClosed(webSocket: WebSocket,
code: Int, reason: String) {
Timber.i("Socket connection:onClosed: Code: $code Reason:
$reason")
}

override fun onFailure(webSocket: WebSocket,
throwable:Throwable,
response: Response?) {
Timber.e("Socket connection:onFailure: Throwable:
${throwable.message} Response: $response")
}
}

When we open the connection we are then required to pass an OkHttp Request and the listener created.

val request = Request.Builder().url("yoururltothewebsocket").build()
okHttpClient.newWebSocket(request, webSocketListener)

 

And that’s all we need to do to manage the WebSocket connection as the library will take care of most of the communication.

The Android App I wrote follows an MVP architecture pattern and makes heavy use of RxJava so this means that the whole logic will be in the Presenter Layer and ideally every time I open/close a WebSocket connection or I receive message from the sever on the WebSocket my Presenter receives an event which will be then handled. The App will be super simple, I want to track when I receive a Welcome Message, which will be sent after the handshaking with the server, a Ping Message, which I will use to just check that I still have an active WebSocket connection and an Update Message which will be the sign for my Presenter to go and fetch new data (ideally my server will send an Update Message every time the client is required to update itself.)

 

In order to do that I created two sealed class. The first one will map the kind of message that I receive from the WebSocket into something that my App can understand and the second one is the list of all possible events that my WebSocket handler class will eventually send to the Presenter.

sealed class Message {
object Welcome : Message()
@JsonSerializable
data class Ping(@Json(name = "message") val timestamp: Instant) : Message()
object Update : Message()
object RestaurantUpdate : Message()
}

sealed class RxWebSocketEvent() {
data class RxWebSocketOpen(val socket: WebSocket):
RxWebSocketEvent()
data class RxWebSocketClosed(val socket: WebSocket, val
exception: RxWebSocketException?=null):
RxWebSocketEvent()
object RxWebSocketPingMessage : RxWebSocketEvent()
object RxWebSocketWelcomeMessage : RxWebSocketEvent()
object RxWebSocketUpdate : RxWebSocketEvent()
}

 

 

The WebSocket handler class will look like the following:

interface RxWebSocket {
fun onOpen(): Observable<RxWebSocketEvent>
fun onClose()
}
@Singleton
class RxWebSocketImpl(private val okHttpClient: OkHttpClient
private val request: Request) : RxWebSocket {
private var thisWebSocket: WebSocket? = nullval subscribe = mutableListOf<ObservableEmitter<RxWebSocketEvent>>()

override fun onOpen(): Observable<RxWebSocketEvent> {
if (thisWebSocket == null) {
thisWebSocket = okHttpClient.newWebSocket(request, object:
WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response:
Response) {
Timber.i("Socket connection: onOpen: $response")
emitEvent(RxWebSocketOpen(webSocket))
}

override fun onMessage(webSocket: WebSocket,
text: String) {
val data = getMessage(text)
if (data === Message.Welcome) {
sendSubscription(webSocket)
emitEvent(RxWebSocketWelcomeMessage)
}
if (data is Message.Ping) {
emitEvent(RxWebSocketPingMessage)
}
if (data is Message.Update) {
emitEvent(RxWebSocketOrderUpdate)
}
}

override fun onMessage(webSocket: WebSocket,
bytes: ByteString) {
}

override fun onClosing(webSocket: WebSocket,
code: Int, reason: String) {
emitEvent(RxWebSocketClosed(webSocket))
}

override fun onClosed(webSocket: WebSocket,
code: Int, reason: String) {
thisWebSocket = null
emitEvent(RxWebSocketClosed(webSocket))
}

override fun onFailure(webSocket: WebSocket,
throwable: Throwable,
response: Response?) {
..... // define exception
thisWebSocket = null
emitError(exception)
}
})
}
return Observable.create<RxWebSocketEvent> { emitter ->
subscribe.add(emitter)

emitter.setDisposable(Disposables.fromRunnable {
subscribe.removeAll { it.isDisposed }

if (subscribe.isEmpty()) {
thisWebSocket?.close(CLOSE_CODE_NORMAL,
CLOSE_REASON_DISCONNECTED)
thisWebSocket = null
}
})
}
override fun onClose() {
thisWebSocket?.cancel()
}

private fun emitEvent(rxWebSocketEvent: RxWebSocketEvent) {
subscribe.forEach {
it.onNext(rxWebSocketEvent)
}
}

private fun emitError(exception: RxWebSocketException) {
subscribe.forEach {
it.onError(exception)
}
}
private fun getMessage(json: String): Message? {
return when {
json.contains(MESSAGE_WELCOME, true) -> Message.Welcome
json.contains(MESSAGE_PING, true) -> Message.Ping
json.contains(MESSAGE_UPDATE, true) -> Message.Update
else -> null
}
}

companion object {

const val CLOSE_CODE_NORMAL = 1000
const val CLOSE_REASON_DISCONNECTED = "Disconnected"
private const val MESSAGE_WELCOME = "welcome"
private const val MESSAGE_PING = "ping"
private const val MESSAGE_UPDATE = "update"
}
}

The WebSocket handler class is a Singleton and as you can see every time the onOpen method is called I check whether there is already a WebSocket connection and eventually I will create a new ObservableEmitter for every event the Presenters are required to subscribe to (WebSocket open/close, ping or update). This emitter is then added to a list of ObservableEmitters, called subscribe. Every time an emitter is disposed a Runnable is run which will check and remove from the list all the disposed emitters and in case subscribe is empty it will close the WebSocket connection.

Once the WebSocket handler class has been defined the only thing remained is to describe how the presenter will react to WebSocket messages. The main part of Presenter that will handle the WebSocket will look something like:

networkConnectionManager.onNetworkConnectionChange()
.flatMap { isConnected ->
when (isConnected) {
true -> rxWebSocket.onOpen()
else -> {
rxWebSocket.onClose()
Observable.empty()
}
}
}
.doOnSubscribe {Timber.d("Trying to connect to the socket")}
.doOnError {
Timber.e("Socket connection: error: Connection Idle.")
}
.subscribeOn(Schedulers.io())
.subscribe({event ->
when (event) {
is RxWebSocketEvent.RxWebSocketUpdate -> // do
something
else -> // do something else
}
}, { throwable ->
Timber.e(throwable, "Error retrying socket connection")
})

 

In the example above OnNetworkConnectioChange will return an Observable<Boolean> to indicate whether there is or not a valid Internet connection and in case there is a WebSocket connection will be created. All the messages arrived on this connection will be handled in the subscribe section.

 

About the Author

 

Anna Del Prete, Software Engineer at Deliveroo. Passionate about IoT, Voice technologies, and Machine Learning, happy mother of one beautiful boy.