Kotlin Multiplatform 实现 SockJS 跨平台 Socket 连接

Viewed 0

在跨平台开发中,实现实时消息通信的 Socket 连接有多种方式。对于 Web 端,通常使用 SockJS 结合 Spring 框架的 WebSocket 支持;而在 Kotlin Multiplatform(KMP)项目中,可以通过 Krossbow 库简化 SockJS 客户端的实现。本文将详细介绍在 Kotlin Compose Multiplatform 应用中集成 SockJS 进行稳定连接的方法。

实现方案选择

如果服务端已采用 Spring 的 SockJS 标准,使用 Krossbow 库是较为便捷的跨平台方案。不过,该库仍处于发展阶段,可能会遇到一些小问题。若服务端未固定使用 SockJS,或需要兼容多种客户端,也可考虑 WebSocket 配合消息订阅、Socket.IO、MQTT 等其他协议。

客户端实现

本文聚焦客户端实现,使用 Koin 进行依赖注入。首先,在项目的 commonMain 源集中添加必要的依赖包。

添加依赖

在构建脚本中配置以下依赖版本,以确保兼容性:

kotlin {
    sourceSets {
        commonMain.dependencies {
            implementation("org.hildan.krossbow:krossbow-stomp-core:4.5.0")
            implementation("org.hildan.krossbow:krossbow-websocket-sockjs:4.5.0")
            implementation("org.glassfish.tyrus.bundles:tyrus-standalone-client:1.19")
            implementation("com.fasterxml.jackson.core:jackson-databind:2.13.5")
        }
    }
}

推荐使用上述已验证的版本组合,避免出现类或方法找不到的兼容性问题。

连接场景

Socket 连接通常涉及三种场景:

  • 主动登录连接:用户通过账号密码等凭证主动登录时建立连接。
  • 被动登录连接:应用重启后,使用本地存储的 Token 自动重连。
  • 断网重连:网络恢复后尝试重新连接。

被动登录连接实现

创建主屏幕模型 MainScreenModel,并通过 Koin 注入。在其中定义相关状态流:

class MainScreenModel : ScreenModel {
    private val _firstTryLinkSocket = MutableStateFlow(true)
    val firstTryLinkSocket = _firstTryLinkSocket.asStateFlow()
    fun triedLinkSocket() {
        _firstTryLinkSocket.value = false
    }

    private val _collectorJob = MutableStateFlow<Job?>(null)
    private val _socketClient = MutableStateFlow(StompClient(SockJSClient()) {
        instrumentation = object : KrossbowInstrumentation {
            override suspend fun onWebSocketClosed(cause: Throwable?) {
                globalDataModel.resetSocketConnected(false)
            }
        }
    })
    private val _socketSession = MutableStateFlow<StompSession?>(null)
    val socketSession = _socketSession.asStateFlow()
}

在应用启动时,从本地存储读取用户信息。若存在有效 Token,则尝试连接:

val firstTryLinkSocket = mainModel.firstTryLinkSocket.value
val userDataStringDb = dataStorageManager.getNonFlowString(DataStorageManager.USER_DATA)

if (userDataStringDb.isNotBlank() && firstTryLinkSocket) {
    mainModel.triedLinkSocket()
    val userDataDb: UserDataModel = baseJsonConf.decodeFromString(userDataStringDb)
    if (!userDataDb.token.isNullOrBlank()) {
        commonApiCoroutine.launch {
            mainModel.login(dbData = userDataDb, forceLogin = true)
        }
    }
}

登录连接逻辑首先检查 Token 有效性,若过期则清除本地状态;否则建立 Socket 连接并订阅消息:

suspend fun login(
    account: String = "", passwd: String = "",
    dbData: UserDataModel? = null,
    forceLogin: Boolean = false,
) {
    val isLogin = BaseApi().isLogin(_userState.value.token)
    if (!isLogin) {
        if (globalDataModel.netStatus.value) {
            globalDataModel.clearLocalUserState()
        } else {
            globalDataModel.resetSocketConnected(false)
        }
        if (!forceLogin) return
    } else {
        chatScreenModel.updateChatData(_userState.value.token)
    }

    CoroutineScope(Dispatchers.IO).launch(socketExceptionHandlerWithReconnect) {
        try {
            _collectorJob.value?.cancel()
            _socketSession.value?.disconnect()
            _collectorJob.value = null
            _socketSession.value = null
        } catch (ignore: Exception) {}

        _socketSession.value = _socketClient.value.connect("your_url")
        val subscription: Flow<String> = _socketSession.value!!.subscribeText("your_subscribe")
        globalDataModel.resetSocketConnected(true)
        _collectorJob.value = _commonCoroutine.launch(socketExceptionHandlerWithReconnect) {
            subscription.collect { msg ->
                val chatRow: ChatRowModel = baseJsonConf.decodeFromString(msg)
                chatScreenModel.pushChatMessage(_userState.value.token, chatRow)
            }
        }
    }
}

连接成功后,通过 subscribeText 订阅消息流,并使用 JSON 反序列化处理收到的数据。注意,Socket 连接需在 IO 调度器中执行,避免主线程阻塞。

主动登录连接

主动登录时,通过账号密码获取新 Token,其余连接逻辑与被动登录相同:

suspend fun login(
    account: String = "", passwd: String = "",
    dbData: UserDataModel? = null,
    forceLogin: Boolean = false,
) {
    _userState.value.userData = BaseApi().login(account, passwd)
    val thisToken = _userState.value.userData.token
    if (!thisToken.isNullOrBlank()) {
        _syncUserData.value = true
        chatScreenModel.updateChatData(thisToken)
    }
    _userState.value.token = _userState.value.userData.token ?: ""
    if (_userState.value.token.isBlank()) return

    // 后续连接步骤与被动登录一致
    // ...
}

断网重连机制

为实现稳定连接,需处理网络异常。定义两个协程异常处理器:

  • 对于发送失败,仅提示网络问题:
val socketExceptionHandler = CoroutineExceptionHandler { _, exception ->
    println("CoroutineException Caught $exception")
    globalDataModel.resetSocketConnected(false)
    globalDataModel.checkNetwork()
}
  • 对于连接或订阅失败,进行定时重连:
private val socketExceptionHandlerWithReconnect = CoroutineExceptionHandler { _, exception ->
    println("Reconnect CoroutineException Caught $exception")
    globalDataModel.resetSocketConnected(false)
    globalDataModel.checkNetwork()
    CoroutineScope(Dispatchers.IO).launch {
        delay(5000)
        if (globalDataModel.userState.value.token.isNotBlank()) {
            login(dbData = globalDataModel.userState.value.userData, forceLogin = true)
        }
    }
}

消息发送时也使用异常处理:

CoroutineScope(Dispatchers.IO).launch(mainModel.socketExceptionHandler) {
    socketSession?.sendText("your_url", "your_message")
    chatScreenModel.updateInputContent(chatId, "")
}

总结

通过上述步骤,可在 Kotlin Multiplatform 应用中建立稳定的 SockJS 连接,处理登录、重连及消息收发。关键点包括使用 Krossbow 库、协程进行异步处理、以及完善的错误恢复机制。实际开发中,需根据服务端接口调整连接 URL 和订阅路径。

0 Answers