IJPL-158075 Alarm - reuse coroutine dispatcher thread pool, do not use application executor pool

GitOrigin-RevId: 874d6a5786a817c36af12c97d4bbf90d243bf896
This commit is contained in:
Vladimir Krivosheev
2024-07-08 18:23:45 +02:00
committed by intellij-monorepo-bot
parent 2f4622ceea
commit ee7c3ccd9d
14 changed files with 299 additions and 306 deletions

View File

@@ -2940,7 +2940,6 @@ c:com.intellij.util.Alarm
- <init>(com.intellij.openapi.Disposable):V
- <init>(com.intellij.util.Alarm$ThreadToUse):V
- <init>(com.intellij.util.Alarm$ThreadToUse,com.intellij.openapi.Disposable):V
- b:<init>(com.intellij.util.Alarm$ThreadToUse,com.intellij.openapi.Disposable,I,kotlin.jvm.internal.DefaultConstructorMarker):V
- <init>(javax.swing.JComponent,com.intellij.openapi.Disposable):V
- f:addComponentRequest(java.lang.Runnable,I):V
- f:addComponentRequest(java.lang.Runnable,J):V
@@ -2956,7 +2955,6 @@ c:com.intellij.util.Alarm
- f:getActiveRequestCount():I
- f:isDisposed():Z
- f:isEmpty():Z
- f:setActivationComponent(javax.swing.JComponent):com.intellij.util.Alarm
- f:waitForAllExecuted(J,java.util.concurrent.TimeUnit):V
e:com.intellij.util.Alarm$ThreadToUse
- java.lang.Enum
@@ -2966,7 +2964,7 @@ e:com.intellij.util.Alarm$ThreadToUse
- s:getEntries():kotlin.enums.EnumEntries
- s:valueOf(java.lang.String):com.intellij.util.Alarm$ThreadToUse
- s:values():com.intellij.util.Alarm$ThreadToUse[]
c:com.intellij.util.AlarmFactory
f:com.intellij.util.AlarmFactory
- <init>():V
- create():com.intellij.util.Alarm
- create(com.intellij.util.Alarm$ThreadToUse):com.intellij.util.Alarm

View File

@@ -46,5 +46,6 @@
<orderEntry type="library" name="kotlinx-coroutines-core" level="project" />
<orderEntry type="module" module-name="intellij.platform.diagnostic" />
<orderEntry type="module" module-name="intellij.platform.settings" />
<orderEntry type="module" module-name="intellij.platform.util.coroutines" />
</component>
</module>

View File

@@ -1,5 +1,6 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("ReplaceGetOrSet")
@file:OptIn(ExperimentalCoroutinesApi::class)
package com.intellij.util
@@ -9,68 +10,69 @@ import com.intellij.concurrency.ContextAwareRunnable
import com.intellij.concurrency.installThreadContext
import com.intellij.diagnostic.PluginException
import com.intellij.openapi.Disposable
import com.intellij.openapi.application.ApplicationActivationListener
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.application.ModalityState
import com.intellij.openapi.application.*
import com.intellij.openapi.components.ComponentManagerEx
import com.intellij.openapi.components.serviceOrNull
import com.intellij.openapi.diagnostic.Logger
import com.intellij.openapi.diagnostic.logger
import com.intellij.openapi.progress.blockingContext
import com.intellij.openapi.util.Disposer
import com.intellij.openapi.wm.IdeFrame
import com.intellij.util.concurrency.*
import com.intellij.platform.util.coroutines.childScope
import com.intellij.util.concurrency.AppExecutorUtil
import com.intellij.util.concurrency.ChildContext
import com.intellij.util.concurrency.ThreadingAssertions
import com.intellij.util.concurrency.annotations.RequiresEdt
import com.intellij.util.concurrency.createChildContext
import com.intellij.util.ui.update.Activatable
import com.intellij.util.ui.update.UiNotifyConnector.Companion.installOn
import kotlinx.coroutines.*
import org.jetbrains.annotations.ApiStatus.Internal
import org.jetbrains.annotations.Async
import org.jetbrains.annotations.TestOnly
import java.util.concurrent.*
import java.awt.EventQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.swing.JComponent
import kotlin.concurrent.Volatile
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
private val LOG: Logger = logger<Alarm>()
/**
* Use a [kotlinx.coroutines.flow.Flow] with [kotlinx.coroutines.flow.debounce] and [kotlinx.coroutines.flow.sample] instead.
* Alarm is deprecated.
*
* Allows scheduling `Runnable` instances (requests) to be executed after a specific time interval on a specific thread.
* Use [.addRequest] methods to schedule the requests.
* Two requests scheduled with the same delay are executed sequentially, one after the other.
* [.cancelAllRequests] and [.cancelRequest] allow canceling already scheduled requests.
*/
open class Alarm @JvmOverloads constructor(
private val threadToUse: ThreadToUse = ThreadToUse.SWING_THREAD,
parentDisposable: Disposable? = null,
open class Alarm @Internal constructor(
private val threadToUse: ThreadToUse,
parentDisposable: Disposable?,
// accessed in EDT only
private val activationComponent: JComponent?,
) : Disposable {
@Volatile
var isDisposed: Boolean = false
private set
// it is a supervisor coroutine scope
private val taskCoroutineScope: CoroutineScope
// requests scheduled to myExecutorService
// scheduled requests scheduled
private val requests = SmartList<Request>() // guarded by LOCK
// requests not yet scheduled to myExecutorService (because, e.g., the corresponding component isn't active yet)
// not yet scheduled requests (because, e.g., the corresponding component isn't active yet)
// guarded by LOCK
private val pendingRequests = SmartList<Request>()
private val executorService: ScheduledExecutorService
private val LOCK = Any()
// accessed in EDT only
private var activationComponent: JComponent? = null
override fun dispose() {
if (!isDisposed) {
isDisposed = true
if (taskCoroutineScope.isActive) {
taskCoroutineScope.cancel()
cancelAllRequests()
if (executorService !== EdtExecutorService.getScheduledExecutorInstance()) {
executorService.shutdownNow()
}
}
}
private fun checkDisposed() {
LOG.assertTrue(!isDisposed, "Already disposed")
}
enum class ThreadToUse {
/**
* Run request in Swing event dispatch thread; this is the default.
@@ -92,11 +94,18 @@ open class Alarm @JvmOverloads constructor(
*/
constructor(parentDisposable: Disposable) : this(threadToUse = ThreadToUse.SWING_THREAD, parentDisposable = parentDisposable)
constructor(threadToUse: ThreadToUse) : this(threadToUse = threadToUse, parentDisposable = null, activationComponent = null)
constructor() : this(threadToUse = ThreadToUse.SWING_THREAD, parentDisposable = null, activationComponent = null)
constructor(threadToUse: ThreadToUse, parentDisposable: Disposable?)
: this(threadToUse = threadToUse, parentDisposable = parentDisposable, activationComponent = null)
/**
* Creates alarm for EDT which executes its requests only when the {@param activationComponent} is shown on screen
*/
constructor(activationComponent: JComponent, parent: Disposable) : this(ThreadToUse.SWING_THREAD, parent) {
this.activationComponent = activationComponent
constructor(activationComponent: JComponent, parent: Disposable)
: this(threadToUse = ThreadToUse.SWING_THREAD, parentDisposable = parent, activationComponent = activationComponent) {
installOn(activationComponent, object : Activatable {
override fun showNotify() {
flushPending()
@@ -113,16 +122,7 @@ open class Alarm @JvmOverloads constructor(
PluginException.reportDeprecatedUsage("Alarm.ThreadToUse#SHARED_THREAD", "Please use `POOLED_THREAD` instead")
}
executorService = if (threadToUse == ThreadToUse.SWING_THREAD) {
// pass straight to EDT
EdtExecutorService.getScheduledExecutorInstance()
}
else {
// or pass to app pooled thread.
// have to restrict the number of running tasks because otherwise the (implicit) contract
// "addRequests with the same delay are executed in order" will be broken
AppExecutorUtil.createBoundedScheduledExecutorService("Alarm Pool", 1)
}
taskCoroutineScope = createCoroutineScope(inEdt = threadToUse == ThreadToUse.SWING_THREAD)
if (parentDisposable == null) {
if (threadToUse != ThreadToUse.SWING_THREAD) {
@@ -150,15 +150,16 @@ open class Alarm @JvmOverloads constructor(
}
}
private val modalityState: ModalityState?
get() = if (threadToUse == ThreadToUse.SWING_THREAD) ApplicationManager.getApplication()?.defaultModalityState else null
private fun getModalityState(): ModalityState? {
return if (threadToUse == ThreadToUse.SWING_THREAD) ApplicationManager.getApplication()?.defaultModalityState else null
}
fun addRequest(request: Runnable, delayMillis: Long) {
doAddRequest(request = request, delayMillis = delayMillis, modalityState = modalityState)
doAddRequest(request = request, delayMillis = delayMillis, modalityState = getModalityState())
}
open fun addRequest(request: Runnable, delayMillis: Int) {
doAddRequest(request = request, delayMillis = delayMillis.toLong(), modalityState = modalityState)
doAddRequest(request = request, delayMillis = delayMillis.toLong(), modalityState = getModalityState())
}
fun addComponentRequest(request: Runnable, delayMillis: Int) {
@@ -176,7 +177,7 @@ open class Alarm @JvmOverloads constructor(
doAddRequest(
request = request,
delayMillis = delayMillis,
modalityState = ModalityState.stateForComponent(activationComponent!!),
modalityState = ModalityState.stateForComponent(activationComponent),
)
}
@@ -192,7 +193,8 @@ open class Alarm @JvmOverloads constructor(
internal fun cancelAllAndAddRequest(request: Runnable, delayMillis: Int, modalityState: ModalityState?) {
synchronized(LOCK) {
cancelAllRequests()
cancelAllRequests(requests)
cancelAllRequests(pendingRequests)
doAddRequest(request = request, delayMillis = delayMillis.toLong(), modalityState = modalityState)
}
}
@@ -200,14 +202,14 @@ open class Alarm @JvmOverloads constructor(
internal fun doAddRequest(request: Runnable, delayMillis: Long, modalityState: ModalityState?) {
val childContext = if (request !is ContextAwareRunnable && AppExecutorUtil.propagateContext()) createChildContext() else null
val requestToSchedule = Request(
owner = this,
task = request,
modalityState = modalityState,
delayMillis = delayMillis,
childContext = childContext,
)
synchronized(LOCK) {
checkDisposed()
LOG.assertTrue(!isDisposed, "Already disposed")
if (activationComponent == null || isActivationComponentShowing) {
add(requestToSchedule)
}
@@ -222,15 +224,15 @@ open class Alarm @JvmOverloads constructor(
get() = activationComponent!!.isShowing
// must be called under LOCK
private fun add(requestToSchedule: Request) {
requestToSchedule.schedule()
requests.add(requestToSchedule)
private fun add(request: Request) {
requests.add(request)
request.schedule(owner = this)
}
private fun flushPending() {
synchronized(LOCK) {
for (each in pendingRequests) {
add(each)
for (request in pendingRequests) {
add(request)
}
pendingRequests.clear()
}
@@ -261,7 +263,7 @@ open class Alarm @JvmOverloads constructor(
}
}
private fun cancelAllRequests(list: MutableList<out Request>): Int {
private fun cancelAllRequests(list: MutableList<Request>): Int {
val count = list.size
for (request in list) {
request.cancel()
@@ -292,130 +294,83 @@ open class Alarm @JvmOverloads constructor(
* and then wait for the execution to finish.
*/
@TestOnly
@Throws(InterruptedException::class, ExecutionException::class, TimeoutException::class)
fun waitForAllExecuted(timeout: Long, unit: TimeUnit) {
@Throws(TimeoutException::class)
fun waitForAllExecuted(timeout: Long, timeUnit: TimeUnit) {
assert(ApplicationManager.getApplication().isUnitTestMode)
var futures: List<Future<*>>
synchronized(LOCK) {
futures = requests.mapNotNull { it.future }
val jobs = taskCoroutineScope.coroutineContext.job.children.toList()
if (jobs.isEmpty()) {
return
}
val deadline = System.nanoTime() + unit.toNanos(timeout)
for (future in futures) {
val toWait = deadline - System.nanoTime()
if (toWait < 0) {
throw TimeoutException()
}
@Suppress("RAW_RUN_BLOCKING")
runBlocking {
try {
future.get(toWait, TimeUnit.NANOSECONDS)
withTimeout(timeUnit.toMillis(timeout)) {
jobs.joinAll()
}
}
catch (ignored: CancellationException) {
catch (e: TimeoutCancellationException) {
// compatibility - throw TimeoutException as before
throw TimeoutException(e.message)
}
}
}
val activeRequestCount: Int
get() {
return synchronized(LOCK) {
requests.size
}
}
get() = synchronized(LOCK) { requests.size }
val isEmpty: Boolean
get() {
return synchronized(LOCK) {
requests.isEmpty()
}
}
get() = synchronized(LOCK) { requests.isEmpty() }
val isDisposed: Boolean
get() = !taskCoroutineScope.isActive
private class Request @Async.Schedule constructor(
private val owner: Alarm,
task: Runnable,
modalityState: ModalityState?,
delayMillis: Long,
childContext: ChildContext?,
) : Runnable {
@JvmField var task: Runnable?,
private val modalityState: ModalityState?,
private val delayMillis: Long,
private val childContext: ChildContext?,
) {
@JvmField
var task: Runnable? = null // guarded by LOCK
private var modalityState: ModalityState? = null
@JvmField
var future: Future<*>? = null // guarded by LOCK
private var delayMillis: Long = 0
private var clientId: String? = null
private var childContext: ChildContext? = null
init {
synchronized(owner.LOCK) {
this.task = task
this.childContext = childContext
this.modalityState = modalityState
this.delayMillis = delayMillis
clientId = getCurrentValue()
}
}
override fun run() {
try {
if (owner.isDisposed) {
return
}
val task = synchronized(owner.LOCK) {
task?.also { task = null }
}
if (task != null) {
runSafely(task)
}
}
catch (ignored: CancellationException) {
}
}
var job: Job? = null // guarded by LOCK
private val clientId = getCurrentValue()
@Async.Execute
fun runSafely(task: Runnable?) {
try {
if (task == null || owner.isDisposed) {
return
private fun runSafely(task: Runnable) {
withClientId(clientId).use { _ ->
if (childContext == null) {
doRunSafely(task)
}
val childContext = childContext
withClientId(clientId!!).use { _ ->
if (childContext == null) {
QueueProcessor.runSafely(task)
}
else {
QueueProcessor.runSafely {
childContext.runAsCoroutine(
Runnable {
installThreadContext(childContext.context, true).use { _ ->
task.run()
}
})
else {
childContext.runAsCoroutine(completeOnFinish = true) {
installThreadContext(coroutineContext = childContext.context, replace = true).use { _ ->
doRunSafely(task)
}
}
}
}
finally {
// remove from the list after execution to be able for {@link #waitForAllExecuted(long, TimeUnit)} to wait for completion
synchronized(owner.LOCK) {
owner.requests.remove(this)
future = null
}
}
}
// must be called under LOCK
fun schedule() {
val modalityState = modalityState
future = if (modalityState == null) {
owner.executorService.schedule(contextAwareCallable(this), delayMillis, TimeUnit.MILLISECONDS)
}
else {
EdtScheduledExecutorService.getInstance().schedule(
ContextAwareRunnable { this.run() }, modalityState, delayMillis, TimeUnit.MILLISECONDS)
fun schedule(owner: Alarm) {
assert(job == null)
job = owner.taskCoroutineScope.launch(modalityState?.asContextElement() ?: EmptyCoroutineContext) {
delay(delayMillis)
val task = synchronized(owner.LOCK) {
task?.also { task = null }
} ?: return@launch
blockingContext {
runSafely(task)
}
}.also {
it.invokeOnCompletion {
synchronized(owner.LOCK) {
owner.requests.remove(this@Request)
task = null
job = null
}
}
}
}
@@ -424,36 +379,66 @@ open class Alarm @JvmOverloads constructor(
* Returns a task, if not yet executed.
*/
fun cancel(): Runnable? {
val future = future
job?.let {
it.cancel(null)
job = null
}
childContext?.job?.cancel(null)
if (future != null) {
future.cancel(false)
this.future = null
return task?.let {
task = null
it
}
val task = task
this.task = null
return task
}
override fun toString(): String {
var task: Runnable?
synchronized(owner.LOCK) {
task = this.task
}
return super.toString() + (if (task == null) "" else ": $task") + "; delay=" + delayMillis + "ms"
}
}
@Deprecated("use {@link #Alarm(JComponent, Disposable)} instead ")
@RequiresEdt
fun setActivationComponent(component: JComponent): Alarm {
PluginException.reportDeprecatedUsage("Alarm#setActivationComponent", "Please use `#Alarm(JComponent, Disposable)` instead")
activationComponent = component
installOn(component, object : Activatable {
override fun showNotify() {
flushPending()
}
})
return this
override fun toString(): String = "${super.toString()} $task; delay=${delayMillis}ms"
}
}
// todo next step - support passing coroutine scope
private fun createCoroutineScope(inEdt: Boolean): CoroutineScope {
val app = ApplicationManager.getApplication()
@Suppress("SSBasedInspection")
if (inEdt) {
// maybe not defined in tests
val edtDispatcher = app?.serviceOrNull<CoroutineSupport>()?.edtDispatcher()
if (edtDispatcher == null) {
// cannot be as error - not clear what to do in case of `RangeTimeScrollBarTest`
logger<Alarm>().warn("Do not use an alarm in an early executing code")
return CoroutineScope(object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
EventQueue.invokeLater(block)
}
override fun toString() = "Swing"
} + SupervisorJob())
}
else {
@Suppress("UsagesOfObsoleteApi")
return (app as ComponentManagerEx).getCoroutineScope().childScope("Alarm", edtDispatcher)
}
}
else {
val dispatcher = Dispatchers.Default.limitedParallelism(1)
if (app == null) {
logger<Alarm>().error("Do not use an alarm in an early executing code")
return CoroutineScope(SupervisorJob() + dispatcher)
}
else {
@Suppress("UsagesOfObsoleteApi")
return (app as ComponentManagerEx).getCoroutineScope().childScope("Alarm", dispatcher)
}
}
}
private fun doRunSafely(run: Runnable) {
try {
run.run()
}
catch (e: CancellationException) {
throw e
}
catch (e: Throwable) {
LOG.error(e)
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2022 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util;
import com.intellij.openapi.Disposable;
@@ -7,7 +7,8 @@ import org.jetbrains.annotations.NotNull;
/**
* @deprecated Use {@link Alarm} directly.
*/
public class AlarmFactory {
@Deprecated(forRemoval = true)
public final class AlarmFactory {
private static final @NotNull AlarmFactory ourInstance = new AlarmFactory();
public static @NotNull AlarmFactory getInstance() {

View File

@@ -6,18 +6,16 @@ import com.intellij.openapi.application.ModalityState
import com.intellij.openapi.util.Disposer
import com.intellij.util.Alarm.ThreadToUse
import org.jetbrains.annotations.TestOnly
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
/**
* Use a [kotlinx.coroutines.flow.Flow] with [kotlinx.coroutines.flow.debounce] and [kotlinx.coroutines.flow.sample] instead.
* Alarm is deprecated.
* Allows scheduling a single `Runnable` instance ([task]) to be executed after a specific time interval on a specific thread.
* [request] adds a request if it's not scheduled yet, i.e., it does not delay execution of the request
* [cancelAndRequest] cancels the current request and schedules a new one instead, i.e., it delays execution of the request
*
* Consider using a [kotlinx.coroutines.flow.Flow] with [kotlinx.coroutines.flow.debounce] and [kotlinx.coroutines.flow.sample] instead
*/
class SingleAlarm @JvmOverloads constructor(
private val task: Runnable,
private val delay: Int,
@@ -25,7 +23,7 @@ class SingleAlarm @JvmOverloads constructor(
threadToUse: ThreadToUse = ThreadToUse.SWING_THREAD,
private val modalityState: ModalityState? = if (threadToUse == ThreadToUse.SWING_THREAD) ModalityState.nonModal() else null
) : Disposable {
private val impl = Alarm(threadToUse, parentDisposable)
private val impl = Alarm(threadToUse = threadToUse, parentDisposable = parentDisposable)
constructor(task: Runnable, delay: Int, threadToUse: ThreadToUse, parentDisposable: Disposable)
: this(
@@ -65,14 +63,15 @@ class SingleAlarm @JvmOverloads constructor(
Disposer.dispose(impl)
}
val isDisposed: Boolean get() = impl.isDisposed
val isDisposed: Boolean
get() = impl.isDisposed
val isEmpty: Boolean get() = impl.isEmpty
val isEmpty: Boolean
get() = impl.isEmpty
@TestOnly
@Throws(InterruptedException::class, ExecutionException::class, TimeoutException::class)
fun waitForAllExecuted(timeout: Long, unit: TimeUnit) {
return impl.waitForAllExecuted(timeout, unit)
impl.waitForAllExecuted(timeout, unit)
}
@JvmOverloads
@@ -101,11 +100,9 @@ class SingleAlarm @JvmOverloads constructor(
@JvmOverloads
fun cancelAndRequest(forceRun: Boolean = false) {
if (!impl.isDisposed) {
impl.cancelAllAndAddRequest(task, if (forceRun) 0 else delay, modalityState)
impl.cancelAllAndAddRequest(request = task, delayMillis = if (forceRun) 0 else delay, modalityState = modalityState)
}
}
fun cancelAllRequests(): Int {
return impl.cancelAllRequests()
}
fun cancelAllRequests(): Int = impl.cancelAllRequests()
}

View File

@@ -23,7 +23,6 @@ import org.jetbrains.annotations.*;
import javax.swing.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -529,7 +528,7 @@ public class MergingUpdateQueue implements Runnable, Disposable, Activatable {
}
@TestOnly
public void waitForAllExecuted(long timeout, @NotNull TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
public void waitForAllExecuted(long timeout, @NotNull TimeUnit unit) throws TimeoutException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
if (!myWaiterForMerge.isEmpty()) {
restart(0); // to not wait for myMergingTimeSpan ms in tests

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.codeInsight.daemon.impl;
import com.intellij.ide.plugins.DynamicPluginListener;
@@ -24,7 +24,6 @@ import org.jetbrains.annotations.TestOnly;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -137,7 +136,7 @@ final class WolfListeners {
try {
invalidateFileQueue.waitForAllExecuted(1, TimeUnit.MINUTES);
}
catch (ExecutionException | InterruptedException | TimeoutException e) {
catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

View File

@@ -567,7 +567,7 @@ public class ConsoleViewImpl extends JPanel implements ConsoleView, ObservableCo
catch (CancellationException e) {
//try again
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.util;
import com.intellij.openapi.Disposable;
@@ -9,7 +9,6 @@ import com.intellij.util.Alarm;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -89,7 +88,7 @@ public final class ZipperUpdater {
try {
myAlarm.waitForAllExecuted(timeout, unit);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

View File

@@ -11,7 +11,6 @@ import com.intellij.testFramework.LightPlatformTestCase;
import com.intellij.testFramework.LoggedErrorProcessor;
import com.intellij.testFramework.PlatformTestUtil;
import com.intellij.tools.ide.metrics.benchmark.PerformanceTestUtil;
import com.intellij.util.Alarm;
import com.intellij.util.ExceptionUtil;
import com.intellij.util.ReflectionUtil;
import com.intellij.util.TestTimeOut;
@@ -149,12 +148,6 @@ public class IdeEventQueueTest extends LightPlatformTestCase {
}
}
public void testExceptionInAlarmMustThrowImmediatelyInTests() {
Alarm alarm = new Alarm();
alarm.addRequest(()-> throwMyException(), 1);
checkMyExceptionThrownImmediately();
}
public void testExceptionInInvokeLateredRunnableMustThrowImmediatelyInTests() {
SwingUtilities.invokeLater(() -> throwMyException());
checkMyExceptionThrownImmediately();

View File

@@ -2,20 +2,28 @@
package com.intellij.util
import com.intellij.concurrency.ConcurrentCollectionFactory
import com.intellij.concurrency.ContextAwareRunnable
import com.intellij.diagnostic.PerformanceWatcher.Companion.printStacktrace
import com.intellij.openapi.Disposable
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.application.EDT
import com.intellij.openapi.application.ModalityState
import com.intellij.openapi.application.impl.LaterInvocator
import com.intellij.testFramework.LoggedErrorProcessor
import com.intellij.testFramework.UsefulTestCase
import com.intellij.testFramework.assertions.Assertions.assertThat
import com.intellij.testFramework.junit5.TestApplication
import com.intellij.testFramework.junit5.TestDisposable
import com.intellij.util.ui.UIUtil
import org.assertj.core.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Test
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
@@ -23,60 +31,73 @@ import java.util.concurrent.atomic.AtomicInteger
@TestApplication
class AlarmTest {
@Test
fun twoAddsWithZeroDelayMustExecuteSequentially(@TestDisposable disposable: Disposable) {
val alarm = Alarm(disposable)
assertRequestsExecuteSequentially(alarm)
fun alarmRequestsShouldExecuteSequentiallyInEdt(@TestDisposable disposable: Disposable) {
assertRequestsExecuteSequentially(Alarm(Alarm.ThreadToUse.SWING_THREAD, disposable))
}
@Test
fun alarmRequestsShouldExecuteSequentiallyEvenInPooledThread(@TestDisposable disposable: Disposable) {
fun alarmRequestsShouldExecuteSequentiallyInPooledThread(@TestDisposable disposable: Disposable) {
assertRequestsExecuteSequentially(Alarm(Alarm.ThreadToUse.POOLED_THREAD, disposable))
}
@Test
fun `alarm with short delay executed first`(@TestDisposable disposable: Disposable) {
val alarm = Alarm(Alarm.ThreadToUse.POOLED_THREAD, disposable)
assertRequestsExecuteSequentially(alarm)
val list = ConcurrentLinkedQueue<String>()
alarm.addRequest(ContextAwareRunnable { list.add("B") }, 10)
alarm.addRequest(ContextAwareRunnable { list.add("A") }, 1)
alarm.waitForAllExecuted(20, TimeUnit.MILLISECONDS)
assertThat(list).containsExactly("A", "B")
}
@Test
fun oneAlarmDoesNotStartTooManyThreads(@TestDisposable disposable: Disposable) {
val alarm = Alarm(disposable)
val alarm = Alarm(threadToUse = Alarm.ThreadToUse.POOLED_THREAD, parentDisposable = disposable)
val executed = AtomicInteger()
val count = 100000
val used = ConcurrentCollectionFactory.createConcurrentSet<Thread>()
val count = 100_000
val used = ConcurrentHashMap.newKeySet<Thread>()
val delay = 10
for (i in 0 until count) {
alarm.addRequest({
executed.incrementAndGet()
used.add(Thread.currentThread())
}, 10)
}
while (executed.get() != count) {
UIUtil.dispatchAllInvocationEvents()
}
if (used.size > 10) {
throw AssertionError(used.size.toString() + " threads created: " + used.joinToString { printStacktrace("", it, it.stackTrace) })
alarm.addRequest(ContextAwareRunnable {
executed.incrementAndGet()
used.add(Thread.currentThread())
}, delay)
}
alarm.waitForAllExecuted(30, TimeUnit.SECONDS)
assertThat(used.size)
.describedAs {
"${used.size} threads created: ${used.joinToString { printStacktrace("", it, it.stackTrace) }}"
}
.isLessThanOrEqualTo(Runtime.getRuntime().availableProcessors() + 64 /* IO dispatcher is also reused */)
}
@Test
fun manyAlarmsDoNotStartTooManyThreads(@TestDisposable disposable: Disposable) {
val used = ConcurrentCollectionFactory.createConcurrentSet<Thread>()
val executed = AtomicInteger()
val count = 100000
val alarms = Array(count) { Alarm(disposable) }.toList()
val count = 100_000
val alarms = Array(count) { Alarm(threadToUse = Alarm.ThreadToUse.POOLED_THREAD, disposable) }
for (alarm in alarms) {
alarm.addRequest({
executed.incrementAndGet()
used.add(Thread.currentThread())
}, 10)
alarm.addRequest(ContextAwareRunnable {
executed.incrementAndGet()
used.add(Thread.currentThread())
}, 10)
}
while (executed.get() != count) {
UIUtil.dispatchAllInvocationEvents()
}
if (used.size > 10) {
throw AssertionError(used.size.toString() + " threads created: " + used.joinToString { printStacktrace("", it, it.stackTrace) })
for (alarm in alarms) {
alarm.waitForAllExecuted(1, TimeUnit.SECONDS)
}
assertThat(used.size)
.describedAs {
"${used.size} threads created: ${used.joinToString { printStacktrace("", it, it.stackTrace) }}"
}
.isLessThanOrEqualTo(Runtime.getRuntime().availableProcessors() + 1 + 64 /* IO-pool thread is reused */)
}
@Test
fun testOrderIsPreservedAfterModalitySwitching() {
fun orderIsPreservedAfterModalitySwitching(): Unit = runBlocking(Dispatchers.EDT) {
val alarm = Alarm()
val sb = StringBuilder()
val modal = Any()
@@ -87,7 +108,7 @@ class AlarmTest {
alarm.addRequest({ sb.append("1") }, 0, ModalityState.nonModal())
alarm.addRequest({ sb.append("2") }, 5, ModalityState.nonModal())
UIUtil.dispatchAllInvocationEvents()
org.junit.jupiter.api.Assertions.assertEquals("", sb.toString())
assertThat(sb).isEmpty()
}
finally {
LaterInvocator.leaveModal(modal)
@@ -97,19 +118,19 @@ class AlarmTest {
UIUtil.dispatchAllInvocationEvents()
}
org.junit.jupiter.api.Assertions.assertEquals("12", sb.toString())
assertThat(sb.toString()).isEqualTo("12")
}
@Test
fun flushImmediately() {
val alarm = Alarm()
val sb = StringBuilder()
fun flushImmediately(@TestDisposable disposable: Disposable): Unit = runBlocking(Dispatchers.EDT) {
val alarm = Alarm(threadToUse = Alarm.ThreadToUse.SWING_THREAD, disposable)
val list = ConcurrentLinkedQueue<String>()
alarm.addRequest({ sb.append("1") }, 0, ModalityState.nonModal())
alarm.addRequest({ sb.append("2") }, 5, ModalityState.nonModal())
org.junit.jupiter.api.Assertions.assertEquals("", sb.toString())
alarm.addRequest(ContextAwareRunnable { list.add("1") }, 0, ModalityState.nonModal())
alarm.addRequest(ContextAwareRunnable { list.add("2") }, 5, ModalityState.nonModal())
assertThat(list).isEmpty()
alarm.drainRequestsInTest()
org.junit.jupiter.api.Assertions.assertEquals("12", sb.toString())
assertThat(list).containsExactly("1", "2")
}
@Test
@@ -118,14 +139,14 @@ class AlarmTest {
val sb = StringBuffer()
val start = System.currentTimeMillis()
val delay = 100
alarm.addRequest({
TimeoutUtil.sleep(1000)
sb.append("1")
}, delay)
alarm.addRequest({
TimeoutUtil.sleep(1000)
sb.append("2")
}, delay * 2)
alarm.addRequest(ContextAwareRunnable {
TimeoutUtil.sleep(1000)
sb.append('1')
}, delay)
alarm.addRequest(ContextAwareRunnable {
TimeoutUtil.sleep(1000)
sb.append('2')
}, delay * 2)
val s = sb.toString()
val elapsed = System.currentTimeMillis() - start
@@ -133,8 +154,9 @@ class AlarmTest {
System.err.println("No no no no this agent is so overloaded I quit")
return
}
org.junit.jupiter.api.Assertions.assertEquals(2, alarm.activeRequestCount)
org.junit.jupiter.api.Assertions.assertEquals("", s)
assertThat(alarm.activeRequestCount).isEqualTo(2)
assertThat(s).isEmpty()
try {
// started to execute but not finished yet
alarm.waitForAllExecuted(1000, TimeUnit.MILLISECONDS)
@@ -145,61 +167,51 @@ class AlarmTest {
alarm.waitForAllExecuted(3000, TimeUnit.MILLISECONDS)
org.junit.jupiter.api.Assertions.assertEquals(2, sb.length)
assertThat(sb).hasSize(2)
}
@Test
fun exceptionDuringAlarmExecutionMustManifestItselfInTests(@TestDisposable disposable: Disposable) {
val alarm = Alarm(disposable)
val alarm = Alarm(threadToUse = Alarm.ThreadToUse.POOLED_THREAD, disposable)
val errorMessage = "_catch_me_"
val error = LoggedErrorProcessor.executeAndReturnLoggedError {
alarm.addRequest({
throw RuntimeException("wtf")
}, 1)
var caught = false
while (!alarm.isEmpty) {
try {
UIUtil.dispatchAllInvocationEvents()
}
catch (e: RuntimeException) {
caught = caught or ("wtf" == e.message)
}
}
assertTrue(caught)
alarm.addRequest(ContextAwareRunnable { throw RuntimeException(errorMessage) }, 1)
alarm.waitForAllExecuted(1000, TimeUnit.MILLISECONDS)
}
org.junit.jupiter.api.Assertions.assertEquals("wtf", error.message)
assertThat(error).hasMessage(errorMessage)
}
@Test
fun singleAlarmMustRefuseToInstantiateWithWrongModality() {
UsefulTestCase.assertThrows(IllegalArgumentException::class.java) {
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
SingleAlarm(task = {}, delay = 1, parentDisposable = null, threadToUse = Alarm.ThreadToUse.SWING_THREAD, modalityState = null)
}
}
}
private fun assertRequestsExecuteSequentially(alarm: Alarm) {
val count = 10000
val count = 10_000
val log = StringBuffer(count * 4)
val expected = StringBuilder(count * 4)
for (i in 0 until count) {
alarm.addRequest({ log.append(i).append(" ") }, 0)
alarm.addRequest(ContextAwareRunnable { log.append(i).append(' ') }, 0)
}
for (i in 0 until count) {
expected.append(i).append(" ")
}
val future = ApplicationManager.getApplication().executeOnPooledThread {
try {
alarm.waitForAllExecuted(100, TimeUnit.SECONDS)
}
catch (e: Exception) {
throw RuntimeException(e)
@Suppress("OPT_IN_USAGE")
val future = GlobalScope.async {
alarm.waitForAllExecuted(100, TimeUnit.SECONDS)
}
runBlocking(Dispatchers.EDT) {
while (!future.isCompleted) {
UIUtil.dispatchAllInvocationEvents()
}
}
while (!future.isDone) {
UIUtil.dispatchAllInvocationEvents()
}
future.get()
Assertions.assertThat(alarm.isEmpty).isTrue()
org.junit.jupiter.api.Assertions.assertEquals(expected.toString(), log.toString())
future.asCompletableFuture().join()
assertThat(alarm.isEmpty).isTrue()
assertThat(log.toString()).isEqualTo(expected.toString())
}

View File

@@ -1,8 +1,9 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.testFramework
import com.intellij.openapi.progress.blockingContext
import com.intellij.openapi.project.Project
import com.intellij.platform.ide.progress.ModalTaskOwner
import com.intellij.platform.ide.progress.runWithModalProgressBlocking
import com.intellij.testFramework.common.DEFAULT_TEST_TIMEOUT
import com.intellij.testFramework.common.timeoutRunBlocking
@@ -25,6 +26,17 @@ fun executeSomeCoroutineTasksAndDispatchAllInvocationEvents(project: Project) {
}
}
@RequiresEdt
fun executeSomeCoroutineTasksAndDispatchAllInvocationEvents() {
repeat(3) {
PlatformTestUtil.dispatchAllInvocationEventsInIdeEventQueue()
runWithModalProgressBlocking(ModalTaskOwner.guess(), "") {
yield()
}
PlatformTestUtil.dispatchAllInvocationEventsInIdeEventQueue()
}
}
@Deprecated("use com.intellij.testFramework.common.waitUnti")
@TestOnly
suspend fun waitUntil(message: String? = null, timeout: Duration = DEFAULT_TEST_TIMEOUT, condition: suspend CoroutineScope.() -> Boolean) {

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.vcs.changes;
import com.intellij.ide.dnd.DnDEvent;
@@ -18,9 +18,7 @@ public abstract class DnDActivateOnHoldTarget implements DnDTarget {
boolean isDropPossible = isDropPossible(event);
event.setDropPossible(isDropPossible);
if (isDropPossible) {
if (myAlarm.isEmpty()) {
myAlarm.request();
}
myAlarm.request();
}
else {
myAlarm.cancelAllRequests();

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package git4idea.repo;
import com.intellij.dvcs.ignore.IgnoredToExcludedSynchronizer;
@@ -36,7 +36,6 @@ import org.jetbrains.annotations.TestOnly;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -397,7 +396,7 @@ public class GitUntrackedFilesHolder implements Disposable {
try {
myQueue.waitForAllExecuted(10, TimeUnit.SECONDS);
}
catch (ExecutionException | InterruptedException | TimeoutException e) {
catch (TimeoutException e) {
throw new RuntimeException(e);
}
}