refactor: 优化 PinService 代码结构和性能

main
mojo 1 month ago
parent 25ef38db3a
commit 65a9b18e93

@ -13,6 +13,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.withTimeoutOrNull
import java.nio.charset.StandardCharsets
import kotlin.time.DurationUnit import kotlin.time.DurationUnit
import kotlin.time.toDuration import kotlin.time.toDuration
@ -21,143 +22,271 @@ class PinService(
override val taskConfig: TaskConfig override val taskConfig: TaskConfig
) : BaseService(taskConfig) { ) : BaseService(taskConfig) {
companion object {
private const val HTTP_STATUS_OK = 200
private const val POLLING_INTERVAL_MS = 1000L
private const val MS_TO_SECONDS = 1000f
}
// ==================== 主执行方法 ====================
override suspend fun execute(onFinish: (List<ActionExec>) -> Unit) { override suspend fun execute(onFinish: (List<ActionExec>) -> Unit) {
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
val actionExecList: MutableList<ActionExec> = mutableListOf() val actionExecList = mutableListOf<ActionExec>()
var currentStep = taskConfig.currentStep val currentStep = taskConfig.currentStep
kotlin.runCatching {
val start = System.currentTimeMillis() runCatching {
var respCode = 200 val executionResult = executeWithTimeout(currentStep)
var cost = 0L processExecutionResult(executionResult, actionExecList, currentStep)
val messageLog: MutableList<NotificationMessage> = mutableListOf() }.onFailure { e ->
val nextList = action.next LogUtils.error(e)
LogUtils.info("next list: ${nextList.map { it.regexp + ", " + it.contain }}") handleException(e, actionExecList, currentStep)
withTimeoutOrNull(action.delay.toDuration(DurationUnit.SECONDS)) { }
while (isActive) {
val notificationCache = taskConfig.notificationCache onFinish(actionExecList)
if (nextList.isNotEmpty() && notificationCache.isNotEmpty()) { }
val notificationMessages = }
haveNextCatchTargetMessage(notificationCache, nextList)
if (notificationMessages.isNotEmpty()) { // ==================== 执行逻辑 ====================
LogUtils.info("catch target message...")
messageLog.addAll(notificationMessages) private data class ExecutionResult(
break val messageLog: MutableList<NotificationMessage>,
} val cost: Long,
} val nextList: List<Next>
cost = System.currentTimeMillis() - start )
LogUtils.info("waiting for target message... ${action.delay}, escape: ${cost / 1000f}")
delay(1000) private suspend fun executeWithTimeout(currentStep: Int): ExecutionResult {
} val start = System.currentTimeMillis()
} val messageLog = mutableListOf<NotificationMessage>()
cost = System.currentTimeMillis() - start val nextList = action.next
LogUtils.info("waiting for target message... finish ${cost/1000f}")
if (messageLog.isNotEmpty()) { LogUtils.info("PinService: next list: ${nextList.map { "${it.regexp}, ${it.contain}" }}")
val nextStep =
nextList.getNextStepIndex( withTimeoutOrNull(action.delay.toDuration(DurationUnit.SECONDS)) {
messageLog.toJsonString(), while (isActive) {
currentStep val notificationCache = taskConfig.notificationCache
)
taskConfig.currentStep = nextStep if (shouldCheckForMessages(nextList, notificationCache)) {
} else { val notificationMessages = haveNextCatchTargetMessage(notificationCache, nextList)
if (nextList.isNotEmpty()) { if (notificationMessages.isNotEmpty()) {
taskConfig.currentStep = Int.MAX_VALUE LogUtils.info("PinService: catch target message...")
respCode = ERROR_CODE_PIN_ACTION_TIMEOUT messageLog.addAll(notificationMessages)
if(taskConfig.notificationCache.isNotEmpty()) { break
messageLog += taskConfig.notificationCache.last()
}
} else {
taskConfig.currentStep = ++currentStep
messageLog += taskConfig.notificationCache
} }
} }
if (taskConfig.currentStep != Int.MAX_VALUE && messageLog.isNotEmpty()) {
val responseBody = messageLog.toJsonString()
extractBodyVariableToCache(action, responseBody, responseBody.toByteArray())
}
val actionExec = genActionExec(messageLog, currentStep, respCode, cost) val elapsed = System.currentTimeMillis() - start
actionExecList += actionExec LogUtils.info("PinService: waiting for target message... delay: ${action.delay}s, elapsed: ${elapsed / MS_TO_SECONDS}s")
}.onFailure { delay(POLLING_INTERVAL_MS)
LogUtils.error(it)
val actionExec = genExceptionActionExec(
action,
ERROR_CODE_PIN_ACTION_EXEC_FAILED,
Log.getStackTraceString(it)
)
actionExecList += actionExec
if (action.skipError) {
taskConfig.currentStep = ++currentStep
} else {
taskConfig.currentStep = Int.MAX_VALUE
}
} }
onFinish(actionExecList) }
val cost = System.currentTimeMillis() - start
LogUtils.info("PinService: waiting finished, cost: ${cost / MS_TO_SECONDS}s")
return ExecutionResult(messageLog, cost, nextList)
}
private fun shouldCheckForMessages(
nextList: List<Next>,
notificationCache: List<NotificationMessage>
): Boolean {
return nextList.isNotEmpty() && notificationCache.isNotEmpty()
}
private fun processExecutionResult(
result: ExecutionResult,
actionExecList: MutableList<ActionExec>,
currentStep: Int
) {
val respCode = updateTaskStep(result.messageLog, result.nextList, currentStep)
if (taskConfig.currentStep != Int.MAX_VALUE && result.messageLog.isNotEmpty()) {
extractResponseVariables(result.messageLog)
}
val actionExec = genActionExec(
result.messageLog,
currentStep,
respCode,
result.cost
)
actionExecList += actionExec
}
private fun extractResponseVariables(messageLog: MutableList<NotificationMessage>) {
val responseBody = messageLog.toJsonString()
extractBodyVariableToCache(
action,
responseBody,
responseBody.toByteArray(StandardCharsets.UTF_8)
)
}
// ==================== 步骤更新 ====================
private fun updateTaskStep(
messageLog: MutableList<NotificationMessage>,
nextList: List<Next>,
currentStep: Int
): Int {
return if (messageLog.isNotEmpty()) {
handleSuccessCase(messageLog, nextList, currentStep)
} else {
handleEmptyMessageLogCase(messageLog, nextList, currentStep)
}
}
private fun handleSuccessCase(
messageLog: MutableList<NotificationMessage>,
nextList: List<Next>,
currentStep: Int
): Int {
val nextStep = nextList.getNextStepIndex(
messageLog.toJsonString(),
currentStep
)
taskConfig.currentStep = nextStep
return HTTP_STATUS_OK
}
private fun handleEmptyMessageLogCase(
messageLog: MutableList<NotificationMessage>,
nextList: List<Next>,
currentStep: Int
): Int {
return if (nextList.isNotEmpty()) {
handleTimeoutCase(messageLog)
} else {
handleNoNextConditionCase(messageLog, currentStep)
}
}
private fun handleTimeoutCase(messageLog: MutableList<NotificationMessage>): Int {
taskConfig.currentStep = Int.MAX_VALUE
// 超时时,添加最后一个通知用于日志记录
if (taskConfig.notificationCache.isNotEmpty()) {
messageLog.add(taskConfig.notificationCache.last())
}
return ERROR_CODE_PIN_ACTION_TIMEOUT
}
private fun handleNoNextConditionCase(
messageLog: MutableList<NotificationMessage>,
currentStep: Int
): Int {
// 没有 next 条件时,添加所有通知并继续下一步
taskConfig.currentStep = currentStep + 1
messageLog.addAll(taskConfig.notificationCache)
return HTTP_STATUS_OK
}
// ==================== 异常处理 ====================
private fun handleException(
e: Throwable,
actionExecList: MutableList<ActionExec>,
currentStep: Int
) {
val actionExec = genExceptionActionExec(
action,
ERROR_CODE_PIN_ACTION_EXEC_FAILED,
Log.getStackTraceString(e)
)
actionExecList += actionExec
taskConfig.currentStep = if (action.skipError) {
currentStep + 1
} else {
Int.MAX_VALUE
} }
} }
// ==================== ActionExec 生成 ====================
private fun genActionExec( private fun genActionExec(
messageLog: MutableList<NotificationMessage>, messageLog: MutableList<NotificationMessage>,
currentStep: Int, currentStep: Int,
respCode: Int, respCode: Int,
cost: Long cost: Long
): ActionExec { ): ActionExec {
val actionExec = ActionExec().apply { return ActionExec().apply {
this.step = currentStep step = currentStep
this.index = 1 index = 1
this.respCode = respCode this.respCode = respCode
this.cost = cost this.cost = cost
this.time = System.currentTimeMillis() time = System.currentTimeMillis()
if (messageLog.isNotEmpty()) { if (messageLog.isNotEmpty()) {
this.url = messageLog.first().app url = messageLog.first().app
this.respData = messageLog.toJsonString() respData = messageLog.toJsonString()
} }
} }
return actionExec
} }
// ==================== 消息匹配 ====================
private fun haveNextCatchTargetMessage( private fun haveNextCatchTargetMessage(
notificationCache: List<NotificationMessage>, notificationCache: List<NotificationMessage>,
nextList: List<Next> nextList: List<Next>
): List<NotificationMessage> { ): List<NotificationMessage> {
var result: List<NotificationMessage> = mutableListOf() if (notificationCache.isEmpty() || nextList.isEmpty()) {
var targetMessage: NotificationMessage? = null return emptyList()
LogUtils.info("message size: ${notificationCache.size}") }
LogUtils.info("PinService: message size: ${notificationCache.size}")
// 预编译正则表达式以提高性能
val compiledPatterns = compileRegexPatterns(nextList)
val targetMessage = findTargetMessage(notificationCache, nextList, compiledPatterns)
// 如果找到目标消息,返回所有来自相同发送者的消息
return if (targetMessage != null) {
notificationCache.filter { it.from == targetMessage.from }
} else {
emptyList()
}
}
private fun compileRegexPatterns(nextList: List<Next>): Map<Next, Regex> {
return nextList
.filter { it.step > 0 && it.regexp.isNotBlank() }.associateWith { it.regexp.toRegex() }
}
private fun findTargetMessage(
notificationCache: List<NotificationMessage>,
nextList: List<Next>,
compiledPatterns: Map<Next, Regex>
): NotificationMessage? {
for (notify in notificationCache) { for (notify in notificationCache) {
LogUtils.info("notify: ${notify.content}") LogUtils.info("PinService: checking notify: ${notify.content}")
for (next in nextList) { for (next in nextList) {
if (next.step <= 0) continue if (next.step <= 0) continue
LogUtils.info("next:$next")
val contain = next.contain
if (contain.isNotBlank() && (notify.from.contains(contain) ||
notify.content.contains(contain))
) {
targetMessage = notify
break
}
if (next.regexp.isNotBlank()) { LogUtils.info("PinService: checking next: $next")
val pattern = next.regexp.toRegex()
var matcher = pattern.matches(notify.from)
if (matcher) {
targetMessage = notify
break
}
matcher = pattern.matches(notify.content) if (matchesContain(notify, next) || matchesRegex(notify, next, compiledPatterns)) {
if (matcher) { return notify
targetMessage = notify
break
}
} }
} }
if (targetMessage != null) {
break
}
}
if (targetMessage != null) {
result = notificationCache.filter { targetMessage.from == it.from }.toList()
} }
return null
}
private fun matchesContain(notify: NotificationMessage, next: Next): Boolean {
val contain = next.contain
return contain.isNotBlank() &&
(notify.from.contains(contain) || notify.content.contains(contain))
}
return result private fun matchesRegex(
notify: NotificationMessage,
next: Next,
compiledPatterns: Map<Next, Regex>
): Boolean {
val pattern = compiledPatterns[next] ?: return false
return pattern.matches(notify.from) || pattern.matches(notify.content)
} }
} }
Loading…
Cancel
Save