在跨平台开发中,实现实时消息通信的 Socket 连接有多种方式。对于 Web 端,通常使用 SockJS 结合 Spring 框架的 WebSocket 支持;而在 Kotlin Multiplatform(KMP)项目中,可以通过 Krossbow 库简化 SockJS 客户端的实现。本文将详细介绍在 Kotlin Compose Multiplatform 应用中集成 SockJS 进行稳定连接的方法。
实现方案选择
如果服务端已采用 Spring 的 SockJS 标准,使用 Krossbow 库是较为便捷的跨平台方案。不过,该库仍处于发展阶段,可能会遇到一些小问题。若服务端未固定使用 SockJS,或需要兼容多种客户端,也可考虑 WebSocket 配合消息订阅、Socket.IO、MQTT 等其他协议。
客户端实现
本文聚焦客户端实现,使用 Koin 进行依赖注入。首先,在项目的 commonMain 源集中添加必要的依赖包。
添加依赖
在构建脚本中配置以下依赖版本,以确保兼容性:
kotlin {
sourceSets {
commonMain.dependencies {
implementation("org.hildan.krossbow:krossbow-stomp-core:4.5.0")
implementation("org.hildan.krossbow:krossbow-websocket-sockjs:4.5.0")
implementation("org.glassfish.tyrus.bundles:tyrus-standalone-client:1.19")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.5")
}
}
}
推荐使用上述已验证的版本组合,避免出现类或方法找不到的兼容性问题。
连接场景
Socket 连接通常涉及三种场景:
- 主动登录连接:用户通过账号密码等凭证主动登录时建立连接。
- 被动登录连接:应用重启后,使用本地存储的 Token 自动重连。
- 断网重连:网络恢复后尝试重新连接。
被动登录连接实现
创建主屏幕模型 MainScreenModel,并通过 Koin 注入。在其中定义相关状态流:
class MainScreenModel : ScreenModel {
private val _firstTryLinkSocket = MutableStateFlow(true)
val firstTryLinkSocket = _firstTryLinkSocket.asStateFlow()
fun triedLinkSocket() {
_firstTryLinkSocket.value = false
}
private val _collectorJob = MutableStateFlow<Job?>(null)
private val _socketClient = MutableStateFlow(StompClient(SockJSClient()) {
instrumentation = object : KrossbowInstrumentation {
override suspend fun onWebSocketClosed(cause: Throwable?) {
globalDataModel.resetSocketConnected(false)
}
}
})
private val _socketSession = MutableStateFlow<StompSession?>(null)
val socketSession = _socketSession.asStateFlow()
}
在应用启动时,从本地存储读取用户信息。若存在有效 Token,则尝试连接:
val firstTryLinkSocket = mainModel.firstTryLinkSocket.value
val userDataStringDb = dataStorageManager.getNonFlowString(DataStorageManager.USER_DATA)
if (userDataStringDb.isNotBlank() && firstTryLinkSocket) {
mainModel.triedLinkSocket()
val userDataDb: UserDataModel = baseJsonConf.decodeFromString(userDataStringDb)
if (!userDataDb.token.isNullOrBlank()) {
commonApiCoroutine.launch {
mainModel.login(dbData = userDataDb, forceLogin = true)
}
}
}
登录连接逻辑首先检查 Token 有效性,若过期则清除本地状态;否则建立 Socket 连接并订阅消息:
suspend fun login(
account: String = "", passwd: String = "",
dbData: UserDataModel? = null,
forceLogin: Boolean = false,
) {
val isLogin = BaseApi().isLogin(_userState.value.token)
if (!isLogin) {
if (globalDataModel.netStatus.value) {
globalDataModel.clearLocalUserState()
} else {
globalDataModel.resetSocketConnected(false)
}
if (!forceLogin) return
} else {
chatScreenModel.updateChatData(_userState.value.token)
}
CoroutineScope(Dispatchers.IO).launch(socketExceptionHandlerWithReconnect) {
try {
_collectorJob.value?.cancel()
_socketSession.value?.disconnect()
_collectorJob.value = null
_socketSession.value = null
} catch (ignore: Exception) {}
_socketSession.value = _socketClient.value.connect("your_url")
val subscription: Flow<String> = _socketSession.value!!.subscribeText("your_subscribe")
globalDataModel.resetSocketConnected(true)
_collectorJob.value = _commonCoroutine.launch(socketExceptionHandlerWithReconnect) {
subscription.collect { msg ->
val chatRow: ChatRowModel = baseJsonConf.decodeFromString(msg)
chatScreenModel.pushChatMessage(_userState.value.token, chatRow)
}
}
}
}
连接成功后,通过 subscribeText 订阅消息流,并使用 JSON 反序列化处理收到的数据。注意,Socket 连接需在 IO 调度器中执行,避免主线程阻塞。
主动登录连接
主动登录时,通过账号密码获取新 Token,其余连接逻辑与被动登录相同:
suspend fun login(
account: String = "", passwd: String = "",
dbData: UserDataModel? = null,
forceLogin: Boolean = false,
) {
_userState.value.userData = BaseApi().login(account, passwd)
val thisToken = _userState.value.userData.token
if (!thisToken.isNullOrBlank()) {
_syncUserData.value = true
chatScreenModel.updateChatData(thisToken)
}
_userState.value.token = _userState.value.userData.token ?: ""
if (_userState.value.token.isBlank()) return
// 后续连接步骤与被动登录一致
// ...
}
断网重连机制
为实现稳定连接,需处理网络异常。定义两个协程异常处理器:
- 对于发送失败,仅提示网络问题:
val socketExceptionHandler = CoroutineExceptionHandler { _, exception ->
println("CoroutineException Caught $exception")
globalDataModel.resetSocketConnected(false)
globalDataModel.checkNetwork()
}
- 对于连接或订阅失败,进行定时重连:
private val socketExceptionHandlerWithReconnect = CoroutineExceptionHandler { _, exception ->
println("Reconnect CoroutineException Caught $exception")
globalDataModel.resetSocketConnected(false)
globalDataModel.checkNetwork()
CoroutineScope(Dispatchers.IO).launch {
delay(5000)
if (globalDataModel.userState.value.token.isNotBlank()) {
login(dbData = globalDataModel.userState.value.userData, forceLogin = true)
}
}
}
消息发送时也使用异常处理:
CoroutineScope(Dispatchers.IO).launch(mainModel.socketExceptionHandler) {
socketSession?.sendText("your_url", "your_message")
chatScreenModel.updateInputContent(chatId, "")
}
总结
通过上述步骤,可在 Kotlin Multiplatform 应用中建立稳定的 SockJS 连接,处理登录、重连及消息收发。关键点包括使用 Krossbow 库、协程进行异步处理、以及完善的错误恢复机制。实际开发中,需根据服务端接口调整连接 URL 和订阅路径。