[platform] Do not leak context into MergingUpdateQueue

`Update`s should be decoupled from the calling code, otherwise the context of the caller would prevent the updates from merging.

GitOrigin-RevId: dae65fc3f0604d08f0dcb8427788e1886fe6605d
This commit is contained in:
Konstantin Nisht
2024-06-19 12:02:16 +02:00
committed by intellij-monorepo-bot
parent 85f82e59d8
commit bcf7e2e283
9 changed files with 176 additions and 189 deletions

View File

@@ -16,6 +16,7 @@ import com.intellij.openapi.externalSystem.autoimport.ExternalSystemProjectTrack
import com.intellij.openapi.externalSystem.autoimport.ExternalSystemRefreshStatus.SUCCESS
import com.intellij.openapi.externalSystem.autoimport.update.PriorityEatUpdate
import com.intellij.openapi.externalSystem.model.ProjectSystemId
import com.intellij.openapi.externalSystem.util.ExternalSystemActivityKey
import com.intellij.openapi.observable.operation.core.AtomicOperationTrace
import com.intellij.openapi.observable.operation.core.isOperationInProgress
import com.intellij.openapi.observable.operation.core.whenOperationFinished
@@ -27,10 +28,12 @@ import com.intellij.openapi.progress.impl.CoreProgressManager
import com.intellij.openapi.project.Project
import com.intellij.openapi.util.Disposer
import com.intellij.openapi.util.registry.Registry
import com.intellij.platform.backend.observation.trackActivityBlocking
import com.intellij.util.LocalTimeCounter.currentTime
import com.intellij.util.application
import com.intellij.util.concurrency.AppExecutorUtil
import com.intellij.util.ui.update.MergingUpdateQueue
import com.intellij.util.ui.update.queueTracked
import kotlinx.serialization.Serializable
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.TestOnly
@@ -119,14 +122,16 @@ class AutoImportProjectTracker(
currentActivity.updateAndGet {
it ?: ProjectInitializationDiagnosticService.registerTracker(project, "AutoImportProjectTracker.schedule")
}
dispatcher.queue(PriorityEatUpdate(priority) {
if (dispatchIterations - 1 > 0) {
schedule(priority, dispatchIterations - 1, action)
}
else {
action()
if (dispatcher.isEmpty) {
currentActivity.getAndSet(null)?.activityFinished()
dispatcher.queueTracked(PriorityEatUpdate(priority) {
project.trackActivityBlocking(ExternalSystemActivityKey) {
if (dispatchIterations - 1 > 0) {
schedule(priority, dispatchIterations - 1, action)
}
else {
action()
if (dispatcher.isEmpty) {
currentActivity.getAndSet(null)?.activityFinished()
}
}
}
})

View File

@@ -5,6 +5,9 @@
<extensionPoint name="notificationGroup" beanClass="com.intellij.notification.impl.NotificationGroupEP" dynamic="true"/>
</extensionPoints>
<extensions defaultExtensionNs="com.intellij">
<activityTracker implementation="com.intellij.util.MergingUpdateQueueActivityTracker"/>
<applicationService serviceInterface="com.intellij.util.ui.update.MergingUpdateQueueTracker"
serviceImplementation="com.intellij.util.MergingUpdateQueueTrackerImpl"/>
<applicationService serviceInterface="com.intellij.util.download.DownloadableFileService"
serviceImplementation="com.intellij.util.download.impl.DownloadableFileServiceImpl"/>
<applicationService serviceInterface="com.intellij.notification.NotificationGroupManager"

View File

@@ -0,0 +1,44 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.components.serviceAsync
import com.intellij.openapi.project.Project
import com.intellij.platform.backend.observation.ActivityTracker
import com.intellij.util.ui.update.MergingUpdateQueueTracker
import kotlinx.coroutines.delay
import org.jetbrains.annotations.ApiStatus
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.milliseconds
@ApiStatus.Internal
class MergingUpdateQueueActivityTracker : ActivityTracker {
override val presentableName: String
get() = "Queries for MergingUpdateQueue"
override suspend fun isInProgress(project: Project): Boolean {
return (ApplicationManager.getApplication().serviceAsync<MergingUpdateQueueTracker>() as MergingUpdateQueueTrackerImpl).counter.get() > 0 // if true, someone queued a task
}
override suspend fun awaitConfiguration(project: Project) {
while (isInProgress(project)) {
delay(100.milliseconds)
}
}
}
@ApiStatus.Internal
class MergingUpdateQueueTrackerImpl : MergingUpdateQueueTracker {
internal val counter = AtomicInteger(0)
override fun registerEnter() {
counter.incrementAndGet()
}
override fun registerExit() {
counter.decrementAndGet()
}
}

View File

@@ -3195,6 +3195,8 @@ com.intellij.util.ui.update.Activatable
c:com.intellij.util.ui.update.Activatable$Adapter
- com.intellij.util.ui.update.Activatable
- <init>():V
f:com.intellij.util.ui.update.MergingQueueUtil
- sf:queueTracked(com.intellij.util.ui.update.MergingUpdateQueue,com.intellij.util.ui.update.Update):V
c:com.intellij.util.ui.update.MergingUpdateQueue
- com.intellij.openapi.Disposable
- com.intellij.util.ui.update.Activatable

View File

@@ -1,53 +0,0 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.ui.update
import com.intellij.concurrency.getContextSkeleton
import com.intellij.concurrency.installThreadContext
import com.intellij.util.concurrency.ChildContext
import kotlin.coroutines.CoroutineContext
/**
* [Some clients][com.intellij.database.autoconfig.DatabaseConfigFileWatcher.queueUpdate]
* [queue][com.intellij.util.ui.update.MergingUpdateQueue.queue] the same instance multiple times
* => we have to re-capture context each time.
*/
internal class ContextAwareUpdate(
private val original: Update,
private val childContext: ChildContext,
) : Update(original) {
// we have to delegate ALL overrideable methods because we don't know which ones are overridden in the original Update
override fun isDisposed(): Boolean = original.isDisposed
override fun isExpired(): Boolean = original.isExpired
override fun wasProcessed(): Boolean = original.wasProcessed()
override fun setProcessed() = original.setProcessed()
override fun executeInWriteAction(): Boolean = original.executeInWriteAction()
override fun isRejected(): Boolean = original.isRejected
private val contextSkeleton: Set<CoroutineContext.Element> = getContextSkeleton(childContext.context)
private val equalityObjects = arrayOf(*original.equalityObjects, contextSkeleton)
override fun getEqualityObjects(): Array<Any> = equalityObjects
override fun canEat(update: Update): Boolean {
if (update !is ContextAwareUpdate) {
return false
}
return contextSkeleton == update.contextSkeleton &&
original.canEat(update.original)
}
override fun setRejected() {
original.setRejected()
childContext.job?.cancel(null)
}
override fun run() {
childContext.runAsCoroutine {
installThreadContext(childContext.context, true).use { _ ->
original.run()
}
}
}
}

View File

@@ -0,0 +1,61 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:JvmName("MergingQueueUtil")
package com.intellij.util.ui.update
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.components.service
import org.jetbrains.annotations.ApiStatus.Internal
/**
* Queues an [Update] and additionally notifies the platform about its completion
*
* This behavior is necessary when [MergingUpdateQueue] is used for the control-flow reason,
* and the platform may be interested in undelivered updates.
* An example is project configuration process, where configuration starts via a [MergingUpdateQueue],
* hence the platform needs to know about all undelivered updates in order to proper track the configuration of a project.
*/
fun MergingUpdateQueue.queueTracked(update: Update) {
ApplicationManager.getApplication().service<MergingUpdateQueueTracker>().registerEnter()
queue(TrackedUpdate(update))
}
@Internal
interface MergingUpdateQueueTracker {
fun registerEnter()
fun registerExit()
}
private class TrackedUpdate(
private val original: Update,
) : Update(original) {
// we have to delegate ALL overrideable methods because we don't know which ones are overridden in the original Update
// also Update is an abstract class, so we cannot use Kotlin Delegation
override fun isDisposed(): Boolean = original.isDisposed
override fun isExpired(): Boolean = original.isExpired
override fun wasProcessed(): Boolean = original.wasProcessed()
override fun setProcessed() = original.setProcessed()
override fun executeInWriteAction(): Boolean = original.executeInWriteAction()
override fun isRejected(): Boolean = original.isRejected
override fun getEqualityObjects(): Array<Any> = original.equalityObjects
override fun canEat(update: Update): Boolean {
val unwrappedUpdate = (update as? TrackedUpdate)?.original ?: update
return original.canEat(unwrappedUpdate)
}
override fun setRejected() {
ApplicationManager.getApplication().service<MergingUpdateQueueTracker>().registerExit()
original.setRejected()
}
override fun run() {
try {
original.run()
} finally {
ApplicationManager.getApplication().service<MergingUpdateQueueTracker>().registerExit()
}
}
}

View File

@@ -2,9 +2,11 @@
package com.intellij.util.ui.update;
import com.intellij.concurrency.ConcurrentCollectionFactory;
import com.intellij.concurrency.ThreadContext;
import com.intellij.ide.UiActivity;
import com.intellij.ide.UiActivityMonitor;
import com.intellij.openapi.Disposable;
import com.intellij.openapi.application.AccessToken;
import com.intellij.openapi.application.Application;
import com.intellij.openapi.application.ApplicationManager;
import com.intellij.openapi.application.ModalityState;
@@ -12,11 +14,11 @@ import com.intellij.openapi.progress.ProcessCanceledException;
import com.intellij.openapi.util.Disposer;
import com.intellij.util.Alarm;
import com.intellij.util.SystemProperties;
import com.intellij.util.concurrency.ChildContext;
import com.intellij.util.concurrency.Propagation;
import com.intellij.util.containers.ConcurrentIntObjectMap;
import com.intellij.util.containers.ContainerUtil;
import com.intellij.util.ui.EdtInvocationManager;
import kotlin.jvm.functions.Function1;
import kotlinx.coroutines.flow.Flow;
import org.jetbrains.annotations.*;
import javax.swing.*;
@@ -25,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static com.intellij.util.concurrency.AppExecutorUtil.propagateContext;
/**
* Use this class to postpone task execution and optionally merge identical tasks. This is needed, e.g., to reflect in UI status of some
@@ -33,6 +34,11 @@ import static com.intellij.util.concurrency.AppExecutorUtil.propagateContext;
* task execution for e.g., 500ms and if new updates are added during this period, they can be simply ignored.
* <p>
* Create instance of this class and use {@link #queue(Update)} method to add new tasks.
* <p>
* Sometimes {@link MergingUpdateQueue} can be used for control flow operations. <b>This kind of usage is discouraged</b>, in favor of
* {@link kotlinx.coroutines.flow.Flow} and {@link kotlinx.coroutines.flow.FlowKt#debounce(Flow, Function1)}.
* If you are still using {@link MergingUpdateQueue}, you can consider queuing via {@link MergingQueueUtil#queueTracked(MergingUpdateQueue, Update)}
* in order to notify the platform about scheduled updates.
*
* @see com.intellij.util.concurrency.QueueProcessor
*/
@@ -238,11 +244,15 @@ public class MergingUpdateQueue implements Runnable, Disposable, Activatable {
clearWaiter();
if (myExecuteInDispatchThread) {
myWaiterForMerge.addRequest(this, mergingTimeSpanMillis, getMergerModalityState());
}
else {
myWaiterForMerge.addRequest(this, mergingTimeSpanMillis);
try (AccessToken ignored = ThreadContext.resetThreadContext()) {
// MergingUpdateQueue is considered to be a Flow + debounce
// The updates must be executed independently of the caller; so here we forcefully release them from the context
if (myExecuteInDispatchThread) {
myWaiterForMerge.addRequest(this, mergingTimeSpanMillis, getMergerModalityState());
}
else {
myWaiterForMerge.addRequest(this, mergingTimeSpanMillis);
}
}
}
@@ -373,16 +383,6 @@ public class MergingUpdateQueue implements Runnable, Disposable, Activatable {
return;
}
ChildContext context = propagateContext() ? Propagation.createChildContext() : null;
if (context == null) {
queue2(update);
}
else {
queue2(new ContextAwareUpdate(update, context));
}
}
private void queue2(@NotNull Update update) {
boolean active = myActive;
synchronized (myScheduledUpdates) {
try {

View File

@@ -2,106 +2,58 @@
package com.intellij.util.concurrency
import com.intellij.concurrency.currentThreadContext
import com.intellij.openapi.progress.assertCurrentJobIsChildOf
import com.intellij.openapi.progress.blockingContextScope
import com.intellij.openapi.progress.withRootJob
import com.intellij.mock.MockProject
import com.intellij.testFramework.common.timeoutRunBlocking
import com.intellij.testFramework.junit5.TestApplication
import com.intellij.util.getValue
import com.intellij.util.setValue
import com.intellij.util.MergingUpdateQueueActivityTracker
import com.intellij.util.application
import com.intellij.util.ui.update.MergingUpdateQueue
import com.intellij.util.ui.update.Update
import com.intellij.util.ui.update.queueTracked
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.job
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.EmptyCoroutineContext
@TestApplication
class MergingUpdateQueuePropagationTest {
@Test
fun `waits its children`(): Unit = timeoutRunBlocking {
fun testNoContext(schedule: MergingUpdateQueue.(Update) -> Unit) = timeoutRunBlocking {
val queue = MergingUpdateQueue("test queue", 200, true, null)
val allowCompleteUpdate = arrayOf(AtomicBoolean(false), AtomicBoolean(false))
val updateCompleted = arrayOf(AtomicBoolean(false), AtomicBoolean(false))
val completionJob = Job()
queue.schedule(Update.create(null) {
assertEquals(currentThreadContext(), EmptyCoroutineContext)
completionJob.complete()
})
completionJob.join()
}
val queueingDone = Job()
val blockingScopeJob = withRootJob {
val currentJob = currentThreadContext().job
for (i in 0..1) {
queue.queue(Update.create(i) {
while (!allowCompleteUpdate[i].get()) {
// wait for permission
}
assertTrue(currentJob.isActive) // parent is not finished
assertCurrentJobIsChildOf(currentJob)
updateCompleted[i].set(true)
})
fun testWaitCompletion(schedule: MergingUpdateQueue.(Update) -> Unit, shouldBeTracked: Boolean) = timeoutRunBlocking {
val queue = MergingUpdateQueue("test queue", 200, true, null)
val proceedJob = Job()
val completionJob = Job()
queue.schedule(Update.create(null) {
while (!proceedJob.isCompleted) {
// spin lock
}
queueingDone.complete()
}
queueingDone.join()
assertTrue(blockingScopeJob.isActive)
repeat(2) { assertFalse(updateCompleted[it].get()) } // updates are not finished
delay(400)
assertTrue(blockingScopeJob.isActive)
repeat(2) { assertFalse(updateCompleted[it].get()) } // updates are not finished even after the queue starts processing
allowCompleteUpdate[0].set(true)
delay(100)
assertTrue(updateCompleted[0].get()) // the first activity should be finished
assertFalse(updateCompleted[1].get()) // the second activity is running
assertTrue(blockingScopeJob.isActive)
allowCompleteUpdate[1].set(true)
blockingScopeJob.join()
assertTrue(updateCompleted[1].get())
assertTrue(queue.isEmpty)
completionJob.complete()
})
val tracker = MergingUpdateQueueActivityTracker()
assertEquals(tracker.isInProgress(MockProject(null, application)), shouldBeTracked)
proceedJob.complete()
completionJob.join()
}
@Test
fun `eating cancels tasks`(): Unit = timeoutRunBlocking {
val queue = MergingUpdateQueue("test queue", 100, true, null)
var firstExecuted by AtomicReference(false)
var secondExecuted by AtomicReference(false)
blockingContextScope {
queue.queue(Update.create("id") {
firstExecuted = true
})
queue.queue(Update.create("id") {
secondExecuted = true
})
}
// so `blockingContextScope` exists after all its spawned tasks exit
assertTrue(queue.isEmpty)
assertFalse(firstExecuted) // eaten by the second
assertTrue(secondExecuted) // not eaten and executed
}
fun `no context in queue`(): Unit = testNoContext(MergingUpdateQueue::queue)
@Test
fun `re-queueing same update instance`(): Unit = timeoutRunBlocking {
val queue = MergingUpdateQueue("test queue", 100, true, null)
val executed = AtomicInteger()
blockingContextScope {
queue.queue(object : Update("id") {
override fun run() {
if (executed.incrementAndGet() < 10) {
queue.queue(this)
}
}
})
}
assertTrue(queue.isEmpty)
assertEquals(10, executed.get())
}
fun `no context in tracking queue`(): Unit = testNoContext(MergingUpdateQueue::queueTracked)
@Test
fun `normal queuing is not tracked`() : Unit = testWaitCompletion(MergingUpdateQueue::queue, false)
@Test
fun `tracked queuing is tracked`() : Unit = testWaitCompletion(MergingUpdateQueue::queueTracked, true)
}

View File

@@ -13,8 +13,6 @@ import com.intellij.testFramework.junit5.SystemProperty
import com.intellij.testFramework.junit5.TestApplication
import com.intellij.util.getValue
import com.intellij.util.setValue
import com.intellij.util.ui.update.MergingUpdateQueue
import com.intellij.util.ui.update.Update
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertFalse
@@ -319,29 +317,4 @@ class ThreadContextPropagationTest {
finished.await()
}
@Test
fun `merging update queue`() = timeoutRunBlocking {
val queue = MergingUpdateQueue("test", 100, true, null)
val semaphore = Semaphore(2)
val element = TestElement("e1")
val element2 = TestElement2("e2")
withContext(element) {
blockingContext {
queue.queue(Update.create("id") {
assertEquals(element, currentThreadContext()[TestElementKey])
semaphore.up()
})
}
withContext(element2) {
blockingContext {
queue.queue(Update.create("id") {
// no eating occurs since the contexts are different
assertEquals(element2, currentThreadContext()[TestElement2Key])
semaphore.up()
})
}
}
}
semaphore.timeoutWaitUp()
}
}