refactor: 重构 WebSocketService,提升代码可读性和可维护性

main
mojo 1 month ago
parent 65a9b18e93
commit dba98fcc4c

@ -0,0 +1,287 @@
package com.example.service
import android.util.Log
import com.example.action.BaseAction
import com.example.action.NameValue
import com.example.action.WebSocketActionRequest
import com.example.action.WsRequestParam
import com.example.logger.LogUtils
import com.example.report.ActionExec
import com.example.task.TaskConfig
import com.example.utils.WebSocketUtil
import com.example.utils.WebSocketUtil.callRequest
import com.example.utils.toJsonString1
import com.example.web_socket.WsRequest
import com.example.web_socket.WsResponse
import kotlinx.coroutines.launch
import java.nio.charset.StandardCharsets
class WebSocketService(
private val action: BaseAction.WebSocketAction,
override val taskConfig: TaskConfig
) : BaseService(taskConfig) {
companion object {
private const val WS_PROTOCOL = "ws://"
private const val WSS_PROTOCOL = "wss://"
private const val HTTP_PROTOCOL = "http://"
private const val HTTPS_PROTOCOL = "https://"
}
// ==================== 主执行方法 ====================
override suspend fun execute(onFinish: (List<ActionExec>) -> Unit) {
val actionExecList = mutableListOf<ActionExec>()
val currentStep = taskConfig.currentStep
runCatching {
val actionRequest = action.request
?: throw NullPointerException("request is null")
amendActionRequest(actionRequest)
val wsRequest = buildWsRequest(actionRequest)
val result = if (action.async) {
handleAsyncRequest(wsRequest, actionExecList)
} else {
handleSyncRequest(wsRequest, actionExecList, currentStep)
}
updateTaskStep(result, currentStep)
}.onFailure { e ->
LogUtils.error(e)
handleException(e, actionExecList, currentStep)
}
onFinish(actionExecList)
}
// ==================== 请求构建 ====================
private fun buildWsRequest(actionRequest: WebSocketActionRequest): WsRequest {
return WsRequest(
url = actionRequest.url,
headers = actionRequest.headers.nameValueToMap().toMutableMap(),
method = WebSocketUtil.WEB_SOCKET_REQUEST_METHOD,
delay = action.delay,
messages = actionRequest.params
)
}
// ==================== 请求处理 ====================
private data class RequestResult(
val wsResponse: WsResponse?,
val wsReceiveMsg: String
)
private suspend fun handleAsyncRequest(
wsRequest: WsRequest,
actionExecList: MutableList<ActionExec>
): RequestResult {
scope.launch {
callRequest(wsRequest)
}
val actionExec = wsRequest.genActionExec(null)
actionExec.respCode = ASYNC_EXEC_CODE
actionExecList += actionExec
return RequestResult(null, "")
}
private suspend fun handleSyncRequest(
wsRequest: WsRequest,
actionExecList: MutableList<ActionExec>,
currentStep: Int
): RequestResult {
val wsResponse = callRequest(wsRequest)
val actionExec = wsRequest.genActionExec(wsResponse)
actionExecList += actionExec
val wsReceiveMsg = wsResponse.data?.toString() ?: ""
if (wsReceiveMsg.isNotEmpty()) {
extractBodyVariableToCache(
action,
wsReceiveMsg,
wsReceiveMsg.toByteArray(StandardCharsets.UTF_8)
)
}
return RequestResult(wsResponse, wsReceiveMsg)
}
// ==================== 步骤更新 ====================
private fun updateTaskStep(result: RequestResult, currentStep: Int) {
val nextStep = result.wsResponse?.let { response ->
calculateNextStep(response, result.wsReceiveMsg, currentStep)
} ?: run {
// 异步请求或没有响应时,继续下一步
currentStep + 1
}
taskConfig.currentStep = nextStep
}
private fun calculateNextStep(
wsResponse: WsResponse,
wsReceiveMsg: String,
currentStep: Int
): Int {
return when (wsResponse.code) {
WebSocketUtil.WEB_SOCKET_CODE -> {
action.next.getNextStepIndex(wsReceiveMsg, currentStep)
}
else -> {
if (action.skipError) {
currentStep + 1
} else {
Int.MAX_VALUE
}
}
}
}
// ==================== 异常处理 ====================
private fun handleException(
e: Throwable,
actionExecList: MutableList<ActionExec>,
currentStep: Int
) {
val actionExec = genExceptionActionExec(
action,
ERROR_CODE_WS_ACTION_EXEC_FAILED,
Log.getStackTraceString(e)
)
actionExecList += actionExec
taskConfig.currentStep = if (action.skipError) {
currentStep + 1
} else {
Int.MAX_VALUE
}
}
// ==================== 请求修改 ====================
private fun amendActionRequest(actionRequest: WebSocketActionRequest) {
// 替换变量
actionRequest.headers.replaceVariableData()
actionRequest.cookies.replaceVariableData()
actionRequest.params.replaceWsParamVariableData()
// 处理 data 字段
handleRequestData(actionRequest)
// 添加基础 Header
actionRequest.headers = actionRequest.headers.amendBaseHeader(false)
// 处理 URL 和 Cookie
processUrlAndCookie(actionRequest)
}
private fun handleRequestData(actionRequest: WebSocketActionRequest) {
if (actionRequest.data.isNotBlank()) {
actionRequest.data = actionRequest.data.toVariableData()
// 如果 data 存在且 params 不为空,将 data 作为新的 param
if (actionRequest.params.isNotEmpty()) {
actionRequest.params = listOf(
WsRequestParam(value = actionRequest.data)
)
}
}
}
private fun processUrlAndCookie(actionRequest: WebSocketActionRequest) {
val normalizedUrl = actionRequest.url.toVariableData()
runCatching {
val cookieUrl = normalizeUrlProtocol(normalizedUrl)
actionRequest.url = cookieUrl
amendCookie(actionRequest, cookieUrl)
}.onFailure {
LogUtils.error(it, "Failed to process URL and cookie")
actionRequest.url = normalizedUrl
}
}
private fun normalizeUrlProtocol(url: String): String {
return when {
url.startsWith(WSS_PROTOCOL, ignoreCase = true) -> {
url.replace(WSS_PROTOCOL, HTTPS_PROTOCOL, ignoreCase = true)
}
url.startsWith(WS_PROTOCOL, ignoreCase = true) -> {
url.replace(WS_PROTOCOL, HTTP_PROTOCOL, ignoreCase = true)
}
else -> url
}
}
private fun amendCookie(
actionRequest: WebSocketActionRequest,
cookieUrl: String
) {
val cookies = mutableSetOf<NameValue>()
// 从 CookieManager 获取 Cookie
if (actionRequest.autoCookie) {
cookies += cookieFromCookieManager(cookieUrl)
}
// 添加手动指定的 Cookie覆盖同名 Cookie
actionRequest.cookies.forEach { cookie ->
cookies.remove(cookie)
cookies += cookie
}
// 构建 Cookie Header
if (cookies.isNotEmpty()) {
actionRequest.headers += cookies.toList().buildCookie()
}
}
// ==================== 变量替换 ====================
private fun List<WsRequestParam>.replaceWsParamVariableData() {
forEach { param ->
param.value = param.value.toVariableData()
param.interrupt = param.interrupt.toVariableData()
}
}
// ==================== ActionExec 生成 ====================
private fun WsRequest.genActionExec(wsResponse: WsResponse?): ActionExec {
val actionExec = ActionExec(
step = taskConfig.currentStep,
index = 1,
time = System.currentTimeMillis(),
url = url,
method = method,
reqHeader = headers.toJsonString1()
)
// 构建请求数据
if (messages.isNotEmpty()) {
actionExec.reqData = messages.joinToString(",") { it.value }
}
// 填充响应数据
wsResponse?.let { response ->
fillResponseData(actionExec, response)
}
return actionExec
}
private fun fillResponseData(actionExec: ActionExec, response: WsResponse) {
actionExec.respCode = response.code
actionExec.respData = response.data?.toString() ?: ""
actionExec.respHeader = response.headers.toJsonString1()
actionExec.cost = response.endTime - response.startTime
}
}
Loading…
Cancel
Save