From 65a9b18e9395afd373696900c7766534a07f1dba Mon Sep 17 00:00:00 2001 From: mojo Date: Wed, 5 Nov 2025 16:11:53 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=20PinService=20?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=E5=92=8C=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/example/service/PinServer.kt | 341 ++++++++++++------ 1 file changed, 235 insertions(+), 106 deletions(-) diff --git a/lib/src/main/java/com/example/service/PinServer.kt b/lib/src/main/java/com/example/service/PinServer.kt index 4b39c45..5faef78 100644 --- a/lib/src/main/java/com/example/service/PinServer.kt +++ b/lib/src/main/java/com/example/service/PinServer.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull +import java.nio.charset.StandardCharsets import kotlin.time.DurationUnit import kotlin.time.toDuration @@ -21,143 +22,271 @@ class PinService( override val taskConfig: TaskConfig ) : BaseService(taskConfig) { + companion object { + private const val HTTP_STATUS_OK = 200 + private const val POLLING_INTERVAL_MS = 1000L + private const val MS_TO_SECONDS = 1000f + } + + // ==================== 主执行方法 ==================== + override suspend fun execute(onFinish: (List) -> Unit) { withContext(Dispatchers.IO) { - val actionExecList: MutableList = mutableListOf() - var currentStep = taskConfig.currentStep - kotlin.runCatching { - val start = System.currentTimeMillis() - var respCode = 200 - var cost = 0L - val messageLog: MutableList = mutableListOf() - val nextList = action.next - LogUtils.info("next list: ${nextList.map { it.regexp + ", " + it.contain }}") - withTimeoutOrNull(action.delay.toDuration(DurationUnit.SECONDS)) { - while (isActive) { - val notificationCache = taskConfig.notificationCache - if (nextList.isNotEmpty() && notificationCache.isNotEmpty()) { - val notificationMessages = - haveNextCatchTargetMessage(notificationCache, nextList) - if (notificationMessages.isNotEmpty()) { - LogUtils.info("catch target message...") - messageLog.addAll(notificationMessages) - break - } - } - cost = System.currentTimeMillis() - start - LogUtils.info("waiting for target message... ${action.delay}, escape: ${cost / 1000f}") - delay(1000) - } - } - cost = System.currentTimeMillis() - start - LogUtils.info("waiting for target message... finish ${cost/1000f}") - if (messageLog.isNotEmpty()) { - val nextStep = - nextList.getNextStepIndex( - messageLog.toJsonString(), - currentStep - ) - taskConfig.currentStep = nextStep - } else { - if (nextList.isNotEmpty()) { - taskConfig.currentStep = Int.MAX_VALUE - respCode = ERROR_CODE_PIN_ACTION_TIMEOUT - if(taskConfig.notificationCache.isNotEmpty()) { - messageLog += taskConfig.notificationCache.last() - } - } else { - taskConfig.currentStep = ++currentStep - messageLog += taskConfig.notificationCache - } - } - if (taskConfig.currentStep != Int.MAX_VALUE && messageLog.isNotEmpty()) { - val responseBody = messageLog.toJsonString() - extractBodyVariableToCache(action, responseBody, responseBody.toByteArray()) - } + val actionExecList = mutableListOf() + val currentStep = taskConfig.currentStep + + runCatching { + val executionResult = executeWithTimeout(currentStep) + processExecutionResult(executionResult, actionExecList, currentStep) + }.onFailure { e -> + LogUtils.error(e) + handleException(e, actionExecList, currentStep) + } + + onFinish(actionExecList) + } + } + + // ==================== 执行逻辑 ==================== - val actionExec = genActionExec(messageLog, currentStep, respCode, cost) - actionExecList += actionExec - }.onFailure { - LogUtils.error(it) - val actionExec = genExceptionActionExec( - action, - ERROR_CODE_PIN_ACTION_EXEC_FAILED, - Log.getStackTraceString(it) - ) - actionExecList += actionExec - if (action.skipError) { - taskConfig.currentStep = ++currentStep - } else { - taskConfig.currentStep = Int.MAX_VALUE + private data class ExecutionResult( + val messageLog: MutableList, + val cost: Long, + val nextList: List + ) + + private suspend fun executeWithTimeout(currentStep: Int): ExecutionResult { + val start = System.currentTimeMillis() + val messageLog = mutableListOf() + val nextList = action.next + + LogUtils.info("PinService: next list: ${nextList.map { "${it.regexp}, ${it.contain}" }}") + + withTimeoutOrNull(action.delay.toDuration(DurationUnit.SECONDS)) { + while (isActive) { + val notificationCache = taskConfig.notificationCache + + if (shouldCheckForMessages(nextList, notificationCache)) { + val notificationMessages = haveNextCatchTargetMessage(notificationCache, nextList) + if (notificationMessages.isNotEmpty()) { + LogUtils.info("PinService: catch target message...") + messageLog.addAll(notificationMessages) + break + } } + + val elapsed = System.currentTimeMillis() - start + LogUtils.info("PinService: waiting for target message... delay: ${action.delay}s, elapsed: ${elapsed / MS_TO_SECONDS}s") + delay(POLLING_INTERVAL_MS) } - onFinish(actionExecList) } + + val cost = System.currentTimeMillis() - start + LogUtils.info("PinService: waiting finished, cost: ${cost / MS_TO_SECONDS}s") + + return ExecutionResult(messageLog, cost, nextList) + } + + private fun shouldCheckForMessages( + nextList: List, + notificationCache: List + ): Boolean { + return nextList.isNotEmpty() && notificationCache.isNotEmpty() + } + + private fun processExecutionResult( + result: ExecutionResult, + actionExecList: MutableList, + currentStep: Int + ) { + val respCode = updateTaskStep(result.messageLog, result.nextList, currentStep) + + if (taskConfig.currentStep != Int.MAX_VALUE && result.messageLog.isNotEmpty()) { + extractResponseVariables(result.messageLog) + } + + val actionExec = genActionExec( + result.messageLog, + currentStep, + respCode, + result.cost + ) + actionExecList += actionExec + } + + private fun extractResponseVariables(messageLog: MutableList) { + val responseBody = messageLog.toJsonString() + extractBodyVariableToCache( + action, + responseBody, + responseBody.toByteArray(StandardCharsets.UTF_8) + ) + } + + // ==================== 步骤更新 ==================== + + private fun updateTaskStep( + messageLog: MutableList, + nextList: List, + currentStep: Int + ): Int { + return if (messageLog.isNotEmpty()) { + handleSuccessCase(messageLog, nextList, currentStep) + } else { + handleEmptyMessageLogCase(messageLog, nextList, currentStep) + } + } + + private fun handleSuccessCase( + messageLog: MutableList, + nextList: List, + currentStep: Int + ): Int { + val nextStep = nextList.getNextStepIndex( + messageLog.toJsonString(), + currentStep + ) + taskConfig.currentStep = nextStep + return HTTP_STATUS_OK + } + + private fun handleEmptyMessageLogCase( + messageLog: MutableList, + nextList: List, + currentStep: Int + ): Int { + return if (nextList.isNotEmpty()) { + handleTimeoutCase(messageLog) + } else { + handleNoNextConditionCase(messageLog, currentStep) + } + } + + private fun handleTimeoutCase(messageLog: MutableList): Int { + taskConfig.currentStep = Int.MAX_VALUE + // 超时时,添加最后一个通知用于日志记录 + if (taskConfig.notificationCache.isNotEmpty()) { + messageLog.add(taskConfig.notificationCache.last()) + } + return ERROR_CODE_PIN_ACTION_TIMEOUT + } + + private fun handleNoNextConditionCase( + messageLog: MutableList, + currentStep: Int + ): Int { + // 没有 next 条件时,添加所有通知并继续下一步 + taskConfig.currentStep = currentStep + 1 + messageLog.addAll(taskConfig.notificationCache) + return HTTP_STATUS_OK } + // ==================== 异常处理 ==================== + + private fun handleException( + e: Throwable, + actionExecList: MutableList, + currentStep: Int + ) { + val actionExec = genExceptionActionExec( + action, + ERROR_CODE_PIN_ACTION_EXEC_FAILED, + Log.getStackTraceString(e) + ) + actionExecList += actionExec + + taskConfig.currentStep = if (action.skipError) { + currentStep + 1 + } else { + Int.MAX_VALUE + } + } + + // ==================== ActionExec 生成 ==================== + private fun genActionExec( messageLog: MutableList, currentStep: Int, respCode: Int, cost: Long ): ActionExec { - val actionExec = ActionExec().apply { - this.step = currentStep - this.index = 1 + return ActionExec().apply { + step = currentStep + index = 1 this.respCode = respCode this.cost = cost - this.time = System.currentTimeMillis() + time = System.currentTimeMillis() + if (messageLog.isNotEmpty()) { - this.url = messageLog.first().app - this.respData = messageLog.toJsonString() + url = messageLog.first().app + respData = messageLog.toJsonString() } } - return actionExec } + // ==================== 消息匹配 ==================== + private fun haveNextCatchTargetMessage( notificationCache: List, nextList: List ): List { - var result: List = mutableListOf() - var targetMessage: NotificationMessage? = null - LogUtils.info("message size: ${notificationCache.size}") + if (notificationCache.isEmpty() || nextList.isEmpty()) { + return emptyList() + } + + LogUtils.info("PinService: message size: ${notificationCache.size}") + + // 预编译正则表达式以提高性能 + val compiledPatterns = compileRegexPatterns(nextList) + + val targetMessage = findTargetMessage(notificationCache, nextList, compiledPatterns) + + // 如果找到目标消息,返回所有来自相同发送者的消息 + return if (targetMessage != null) { + notificationCache.filter { it.from == targetMessage.from } + } else { + emptyList() + } + } + + private fun compileRegexPatterns(nextList: List): Map { + return nextList + .filter { it.step > 0 && it.regexp.isNotBlank() }.associateWith { it.regexp.toRegex() } + } + + private fun findTargetMessage( + notificationCache: List, + nextList: List, + compiledPatterns: Map + ): NotificationMessage? { for (notify in notificationCache) { - LogUtils.info("notify: ${notify.content}") + LogUtils.info("PinService: checking notify: ${notify.content}") + for (next in nextList) { if (next.step <= 0) continue - LogUtils.info("next:$next") - val contain = next.contain - if (contain.isNotBlank() && (notify.from.contains(contain) || - notify.content.contains(contain)) - ) { - targetMessage = notify - break + + LogUtils.info("PinService: checking next: $next") + + if (matchesContain(notify, next) || matchesRegex(notify, next, compiledPatterns)) { + return notify } - - if (next.regexp.isNotBlank()) { - val pattern = next.regexp.toRegex() - var matcher = pattern.matches(notify.from) - if (matcher) { - targetMessage = notify - break - } - - matcher = pattern.matches(notify.content) - if (matcher) { - targetMessage = notify - break - } - } - } - if (targetMessage != null) { - break } } - if (targetMessage != null) { - result = notificationCache.filter { targetMessage.from == it.from }.toList() - } + return null + } + + private fun matchesContain(notify: NotificationMessage, next: Next): Boolean { + val contain = next.contain + return contain.isNotBlank() && + (notify.from.contains(contain) || notify.content.contains(contain)) + } - return result + private fun matchesRegex( + notify: NotificationMessage, + next: Next, + compiledPatterns: Map + ): Boolean { + val pattern = compiledPatterns[next] ?: return false + return pattern.matches(notify.from) || pattern.matches(notify.content) } } \ No newline at end of file