From 08fbbb6eabc95c3af9422cb414290dd3e92132c2 Mon Sep 17 00:00:00 2001 From: Konstantin Nisht Date: Fri, 2 Aug 2024 15:26:46 +0200 Subject: [PATCH] [platform] IJPL-158752: Migrate warmup to `IntelliJContextElement` GitOrigin-RevId: d238c33a841ee395d677131dd63c61626b02dcb3 --- .../backend/observation/Observation.kt | 11 ++++ .../PlatformActivityTrackerService.kt | 52 +++++++++++++------ .../platform/backend/observation/util.kt | 9 ++-- .../util/resources/misc/registry.properties | 3 ++ .../com/intellij/concurrency/threadContext.kt | 34 ------------ .../intellij/util/concurrency/propagation.kt | 1 - .../warmup/util/ActivityBasedWarmup.kt | 24 +++++++++ .../server/MavenIndexingConnectorImpl.kt | 18 ++++--- .../maven/server/MavenServerConnectorImpl.kt | 22 +++++--- 9 files changed, 104 insertions(+), 70 deletions(-) diff --git a/platform/backend/observation/src/com/intellij/platform/backend/observation/Observation.kt b/platform/backend/observation/src/com/intellij/platform/backend/observation/Observation.kt index 08be28cccc67..d92e5bbda94b 100644 --- a/platform/backend/observation/src/com/intellij/platform/backend/observation/Observation.kt +++ b/platform/backend/observation/src/com/intellij/platform/backend/observation/Observation.kt @@ -3,6 +3,7 @@ package com.intellij.platform.backend.observation import com.intellij.openapi.project.Project import kotlinx.coroutines.flow.StateFlow +import org.jetbrains.annotations.ApiStatus object Observation { @@ -52,6 +53,16 @@ object Observation { return isModificationOccurred } + /** + * Used for debugging purposes. + * Returns stacktraces of the computations that are currently awaited by [awaitConfiguration] + * This method affects only those computations that use [ActivityKey], whereas [ActivityTracker] is out of reach for the platform. + */ + @ApiStatus.Internal + fun getAllAwaitedActivities(): Set { + return dumpCurrentlyObservedComputations() + } + private interface GenericActivityTracker { val name: String suspend fun isInProgress(): Boolean diff --git a/platform/backend/observation/src/com/intellij/platform/backend/observation/PlatformActivityTrackerService.kt b/platform/backend/observation/src/com/intellij/platform/backend/observation/PlatformActivityTrackerService.kt index 5a0f88c6edbe..058dea3e1712 100644 --- a/platform/backend/observation/src/com/intellij/platform/backend/observation/PlatformActivityTrackerService.kt +++ b/platform/backend/observation/src/com/intellij/platform/backend/observation/PlatformActivityTrackerService.kt @@ -1,6 +1,7 @@ // Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. package com.intellij.platform.backend.observation +import com.intellij.concurrency.IntelliJContextElement import com.intellij.concurrency.currentThreadContext import com.intellij.concurrency.installThreadContext import com.intellij.openapi.components.Service @@ -8,7 +9,7 @@ import com.intellij.openapi.components.service import com.intellij.openapi.components.serviceAsync import com.intellij.openapi.diagnostic.thisLogger import com.intellij.openapi.project.Project -import com.intellij.util.concurrency.BlockingJob +import com.intellij.openapi.util.registry.Registry import com.intellij.util.concurrency.annotations.RequiresBlockingContext import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow @@ -16,7 +17,8 @@ import kotlinx.coroutines.flow.StateFlow import org.jetbrains.annotations.ApiStatus.Internal import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger -import kotlin.jvm.Throws +import kotlin.coroutines.AbstractCoroutineContextElement +import kotlin.coroutines.CoroutineContext /** * Allows tracking subsystem activities and getting a "dumb mode" with respect to tracked computations. @@ -65,8 +67,8 @@ internal class PlatformActivityTrackerService(private val scope: CoroutineScope) * This method is cheap to use: it does not perform any complex computations, and it is essentially equivalent to `withContext`. */ suspend fun trackConfigurationActivity(kind: ActivityKey, action: suspend () -> T): T { - return withBlockingJob(kind) { blockingJob -> - withContext(blockingJob) { + return withObservationTracker(kind) { observationTracker -> + withContext(observationTracker) { action() } } @@ -79,24 +81,24 @@ internal class PlatformActivityTrackerService(private val scope: CoroutineScope) @RequiresBlockingContext fun trackConfigurationActivityBlocking(kind: ActivityKey, action: () -> T): T { val currentContext = currentThreadContext() - return withBlockingJob(kind) { blockingJob -> - installThreadContext(currentContext + blockingJob, true).use { + return withObservationTracker(kind) { observationTracker -> + installThreadContext(currentContext + observationTracker, true).use { action() } } } - private inline fun withBlockingJob(kind: ActivityKey, consumer: (BlockingJob) -> T): T { + private inline fun withObservationTracker(kind: ActivityKey, consumer: (ObservationTracker) -> T): T { val key = enterConfiguration(kind) // new job here to track those and only those computations which are invoked under explicit `trackConfigurationActivity` val tracker = Job() tracker.invokeOnCompletion { leaveConfiguration(kind, key) } - val blockingJob = BlockingJob(tracker) + val trackingElement = ObservationTracker(tracker, tracker) try { - return consumer(blockingJob) + return consumer(trackingElement) } finally { scope.launch { @@ -107,6 +109,25 @@ internal class PlatformActivityTrackerService(private val scope: CoroutineScope) } } + + internal class ObservationTracker(private val mainJob: Job, val currentJob: CompletableJob) : AbstractCoroutineContextElement(Key), IntelliJContextElement { + companion object Key : CoroutineContext.Key + + override fun produceChildElement(oldContext: CoroutineContext, isStructured: Boolean): IntelliJContextElement { + // we would like to know about all child computations, regardless of their relation to the current process + val newJob = Job(mainJob) + if (Registry.`is`("ide.activity.tracking.enable.debug", false)) { + computationMap[newJob] = Throwable() + } + return ObservationTracker(mainJob, newJob) + } + + override fun afterChildCompleted(context: CoroutineContext) { + computationMap.remove(currentJob) + currentJob.complete() + } + } + private fun enterConfiguration(kind: ActivityKey) : Any { val sentinel = Any() while (true) { @@ -187,14 +208,13 @@ internal class PlatformActivityTrackerService(private val scope: CoroutineScope) // currentCounter != null => isInProgress == true => either currentCounter.job corresponds to the current configuration process, // or its configuration was completed earlier, and in this case join() will immediately return // currentCounter == null => isInProgress == false => immediately return, since no configuration process is here currently - while (true) { - if (currentCounter.job.isCompleted) { - break - } - delay(1000) - } - //currentCounter.job.join() + currentCounter.job.join() } } } +private val computationMap : MutableMap = ConcurrentHashMap() + +internal fun dumpCurrentlyObservedComputations(): Set { + return computationMap.values.mapNotNullTo(HashSet()) { it } +} diff --git a/platform/backend/observation/src/com/intellij/platform/backend/observation/util.kt b/platform/backend/observation/src/com/intellij/platform/backend/observation/util.kt index 681ec48d63ce..c568ea23ae33 100644 --- a/platform/backend/observation/src/com/intellij/platform/backend/observation/util.kt +++ b/platform/backend/observation/src/com/intellij/platform/backend/observation/util.kt @@ -7,7 +7,6 @@ package com.intellij.platform.backend.observation import com.intellij.concurrency.currentThreadContext import com.intellij.openapi.extensions.ExtensionPointName import com.intellij.openapi.project.Project -import com.intellij.util.concurrency.BlockingJob import com.intellij.util.concurrency.annotations.RequiresBlockingContext import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart @@ -58,10 +57,10 @@ fun Project.trackActivity(key: ActivityKey, action: Runnable): Unit { */ @RequiresBlockingContext fun CoroutineScope.launchTracked(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> Unit) { - val blockingJob = currentThreadContext()[BlockingJob] ?: EmptyCoroutineContext - // since the `launch` is executed with the Job of `this`, we need to mimic the awaiting for the execution of `block` for `BlockingJob` - val childJob = Job(currentThreadContext()[BlockingJob]?.blockingJob) - launch(context + blockingJob, CoroutineStart.DEFAULT) { + val tracker = currentThreadContext()[PlatformActivityTrackerService.ObservationTracker] + // since the `launch` is executed with the Job of `this`, we need to mimic the awaiting for the execution of `block` for `ObservationTracker` + val childJob = Job(tracker?.currentJob) + launch(context + (tracker ?: EmptyCoroutineContext), CoroutineStart.DEFAULT) { try { block() } diff --git a/platform/util/resources/misc/registry.properties b/platform/util/resources/misc/registry.properties index ed6002e73d5b..b7df292ae890 100644 --- a/platform/util/resources/misc/registry.properties +++ b/platform/util/resources/misc/registry.properties @@ -2174,6 +2174,9 @@ ide.cancellation.check.threshold.description=Report error if checkCanceled invok ide.cancellation.check.trace.all=false ide.cancellation.check.trace.all.description=Enable recording of each cancellation check to show the last if assertion is triggered +ide.activity.tracking.enable.debug=false +ide.activity.tracking.enable.debug.description=Record debug information for `Observation.awaitConfiguration` + light.edit.file.open.enabled=true light.edit.file.open.enabled.description=Open files in off-project editor instead of creating a temporary project system.file.type.associations.enabled=true diff --git a/platform/util/src/com/intellij/concurrency/threadContext.kt b/platform/util/src/com/intellij/concurrency/threadContext.kt index c3725aa33b5f..7dcd318f332c 100644 --- a/platform/util/src/com/intellij/concurrency/threadContext.kt +++ b/platform/util/src/com/intellij/concurrency/threadContext.kt @@ -275,40 +275,6 @@ fun resetThreadContext(): AccessToken { } } -/** - * Runs [action] with the cancellation guarantees of [job]. - * - * Consider the following example: - * ```kotlin - * fun computeSomethingUseful() { - * preComputation() - * application.executeOnPooledThread(::executeInLoop) - * postComputation() - * } - * - * fun executeInLoop() { - * doSomething() - * ProgressManager.checkCancelled() - * Thread.sleep(1000) - * executeInLoop() - * } - * ``` - * - * If someone wants to track the execution of `computeSomethingUseful`, most likely they are not interested in `executeInLoop`, - * as it is a daemon computation that can be only canceled. - * It can be a launch of an external process, or some periodic diagnostic check. - * - * In this case, the platform offers to weaken the cancellation guarantees for the computation: - * it still would be cancellable on project closing or component unloading, but it would not be bound to the context cancellation. - */ -@Experimental -@ApiStatus.Internal -fun escapeCancellation(job: Job, action: () -> T): T { - return installThreadContext(currentThreadContext() + job + BlockingJob(job), true).use { - action() - } -} - /** * Installs [coroutineContext] as the current thread context. * If [replace] is `false` (default) and the current thread already has context, then this function logs an error. diff --git a/platform/util/src/com/intellij/util/concurrency/propagation.kt b/platform/util/src/com/intellij/util/concurrency/propagation.kt index 88ed06d0d761..d219896dea94 100644 --- a/platform/util/src/com/intellij/util/concurrency/propagation.kt +++ b/platform/util/src/com/intellij/util/concurrency/propagation.kt @@ -388,7 +388,6 @@ internal fun capturePropagationContext(function: Function): Functio return f } -@OptIn(DelicateCoroutinesApi::class) internal fun capturePropagationContext(wrapper: SchedulingWrapper, c: Callable, ns: Long): MyScheduledFutureTask { val callable = captureClientIdInCallable(c) if (isContextAwareComputation(c)) { diff --git a/platform/warmup/src/com/intellij/warmup/util/ActivityBasedWarmup.kt b/platform/warmup/src/com/intellij/warmup/util/ActivityBasedWarmup.kt index 2a7f18706f4c..8d975c008de7 100644 --- a/platform/warmup/src/com/intellij/warmup/util/ActivityBasedWarmup.kt +++ b/platform/warmup/src/com/intellij/warmup/util/ActivityBasedWarmup.kt @@ -6,7 +6,13 @@ import com.intellij.ide.impl.ProjectUtil import com.intellij.openapi.project.Project import com.intellij.openapi.project.configuration.ConfigurationResult import com.intellij.openapi.project.configuration.awaitCompleteProjectConfiguration +import com.intellij.platform.backend.observation.Observation import com.intellij.util.asSafely +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlin.time.Duration.Companion.minutes internal suspend fun configureProjectByActivities(args: OpenProjectArgs): Project { val projectFile = getProjectFile(args) @@ -18,7 +24,9 @@ internal suspend fun configureProjectByActivities(args: OpenProjectArgs): Projec } ?: throw RuntimeException("Failed to open project, null is returned") val configurationError = runTaskAndLogTime("awaiting completion predicates") { + val loggerJob = launchActivityLogger() val result = project.awaitCompleteProjectConfiguration(WarmupLogger::logInfo) + loggerJob.cancel() dumpThreadsAfterConfiguration() result.asSafely()?.message } @@ -32,3 +40,19 @@ internal suspend fun configureProjectByActivities(args: OpenProjectArgs): Projec WarmupLogger.logInfo("Project is ready for the import") return project } + +private fun CoroutineScope.launchActivityLogger(): Job { + return launch { + while (true) { + delay(10.minutes) + val currentComputations = Observation.getAllAwaitedActivities() + buildString { + appendLine("Currently awaited activities:") + for (trace in currentComputations) { + appendLine(trace.stackTraceToString()) + } + } + WarmupLogger.logInfo(currentComputations.toString()) + } + } +} \ No newline at end of file diff --git a/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenIndexingConnectorImpl.kt b/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenIndexingConnectorImpl.kt index 1b94253cbb2f..fa7098d7bc2c 100644 --- a/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenIndexingConnectorImpl.kt +++ b/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenIndexingConnectorImpl.kt @@ -1,12 +1,14 @@ // Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. package org.jetbrains.idea.maven.server -import com.intellij.concurrency.escapeCancellation import com.intellij.openapi.diagnostic.Logger import com.intellij.openapi.progress.EmptyProgressIndicator import com.intellij.openapi.progress.ProgressIndicator import com.intellij.openapi.projectRoots.Sdk -import kotlinx.coroutines.job +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.future.asCompletableFuture import org.jetbrains.idea.maven.server.MavenServerManager.Companion.getInstance import org.jetbrains.idea.maven.utils.MavenCoroutineScopeProvider import org.jetbrains.idea.maven.utils.MavenLog @@ -57,10 +59,14 @@ class MavenIndexingConnectorImpl(jdk: Sdk, // the computation below spawns an immortal server that will not terminate // if someone is interested in the termination of the current computation, they do not need to wait for maven to terminate. // hence, we spawn the server in the context of maven plugin, so that it has cancellation of all other maven processes - escapeCancellation(MavenCoroutineScopeProvider.getCoroutineScope(project).coroutineContext.job) { - val server = mySupport!!.acquire(this, "", indicator) - myServerPromise.setResult(server) - } + MavenCoroutineScopeProvider.getCoroutineScope(project).async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) { + runCatching { // we need to avoid killing the coroutine by the thrown exception + val server = mySupport!!.acquire(this, "", indicator) + myServerPromise.setResult(server) + } + }.asCompletableFuture() + .get() // there are no suspensions inside, so this code will not block + .getOrThrow() MavenLog.LOG.debug("[connector] in " + dirForLogs + " has been connected " + this@MavenIndexingConnectorImpl) } catch (e: Throwable) { diff --git a/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenServerConnectorImpl.kt b/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenServerConnectorImpl.kt index d08d9a495693..d59d0b339757 100644 --- a/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenServerConnectorImpl.kt +++ b/plugins/maven/src/main/java/org/jetbrains/idea/maven/server/MavenServerConnectorImpl.kt @@ -1,7 +1,6 @@ // Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. package org.jetbrains.idea.maven.server -import com.intellij.concurrency.escapeCancellation import com.intellij.openapi.application.ApplicationManager import com.intellij.openapi.diagnostic.Logger import com.intellij.openapi.progress.EmptyProgressIndicator @@ -10,7 +9,10 @@ import com.intellij.openapi.project.Project import com.intellij.openapi.projectRoots.Sdk import com.intellij.openapi.util.text.StringUtil import com.intellij.util.concurrency.AppExecutorUtil -import kotlinx.coroutines.job +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.future.asCompletableFuture import org.jetbrains.idea.maven.utils.MavenCoroutineScopeProvider import org.jetbrains.idea.maven.utils.MavenLog import java.io.File @@ -91,12 +93,16 @@ open class MavenServerConnectorImpl(project: Project, // the computation below spawns an immortal server that will not terminate // if someone is interested in the termination of the current computation, they do not need to wait for maven to terminate. // hence, we spawn the server in the context of maven plugin, so that it has cancellation of all other maven processes - escapeCancellation(MavenCoroutineScopeProvider.getCoroutineScope(project).coroutineContext.job) { - val server = mySupport!!.acquire(this, "", indicator) - startPullingDownloadListener(server) - startPullingLogger(server) - myServerPromise.setResult(server) - } + MavenCoroutineScopeProvider.getCoroutineScope(project).async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) { + runCatching { + val server = mySupport!!.acquire(this, "", indicator) + startPullingDownloadListener(server) + startPullingLogger(server) + myServerPromise.setResult(server) + } + }.asCompletableFuture() + .get() // there are no suspensions inside, so this code will not block + .getOrThrow() MavenLog.LOG.debug("[connector] in " + dirForLogs + " has been connected " + this@MavenServerConnectorImpl) } catch (e: Throwable) {