From 3571f1c2b660d9c6eb86637bf06f36ca9d044e8f Mon Sep 17 00:00:00 2001 From: Denis Zaichenko Date: Fri, 26 Jul 2024 16:18:09 +0200 Subject: [PATCH] IJPL-89250 Migrate IndexDiagnosticRunner to coroutines to fix flaky tests. GitOrigin-RevId: 4e2198eabf3919ed83fd8f7225d064e149b625de --- .../com/intellij/vcs/log/data/VcsLogData.java | 1 - .../log/data/index/IndexDiagnosticRunner.kt | 116 +++++++++++------- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/platform/vcs-log/impl/src/com/intellij/vcs/log/data/VcsLogData.java b/platform/vcs-log/impl/src/com/intellij/vcs/log/data/VcsLogData.java index 5f07ad8c4c42..8e1fe6c6fe43 100644 --- a/platform/vcs-log/impl/src/com/intellij/vcs/log/data/VcsLogData.java +++ b/platform/vcs-log/impl/src/com/intellij/vcs/log/data/VcsLogData.java @@ -263,7 +263,6 @@ public final class VcsLogData implements Disposable, VcsLogDataProvider { listener.onDataPackChange(dataPack); } }, o -> myDisposableFlag.isDisposed()); - myIndexDiagnosticRunner.onDataPackChange(); } public void addDataPackChangeListener(final @NotNull DataPackChangeListener listener) { diff --git a/platform/vcs-log/impl/src/com/intellij/vcs/log/data/index/IndexDiagnosticRunner.kt b/platform/vcs-log/impl/src/com/intellij/vcs/log/data/index/IndexDiagnosticRunner.kt index 199b77ca946e..91beeaace5a9 100644 --- a/platform/vcs-log/impl/src/com/intellij/vcs/log/data/index/IndexDiagnosticRunner.kt +++ b/platform/vcs-log/impl/src/com/intellij/vcs/log/data/index/IndexDiagnosticRunner.kt @@ -8,52 +8,82 @@ import com.intellij.openapi.Disposable import com.intellij.openapi.diagnostic.Attachment import com.intellij.openapi.diagnostic.debug import com.intellij.openapi.diagnostic.thisLogger -import com.intellij.openapi.progress.util.BackgroundTaskUtil.BackgroundTask -import com.intellij.openapi.progress.util.BackgroundTaskUtil.submitTask +import com.intellij.openapi.progress.runBlockingCancellable import com.intellij.openapi.util.Disposer import com.intellij.openapi.vcs.VcsException import com.intellij.openapi.vfs.VirtualFile -import com.intellij.util.concurrency.AppExecutorUtil import com.intellij.util.concurrency.annotations.RequiresBackgroundThread +import com.intellij.vcs.log.data.* import com.intellij.vcs.log.data.AbstractDataGetter.Companion.getCommitDetails -import com.intellij.vcs.log.data.CommitDetailsGetter -import com.intellij.vcs.log.data.DataPack -import com.intellij.vcs.log.data.VcsLogStorage import com.intellij.vcs.log.data.index.IndexDiagnostic.getDiffFor import com.intellij.vcs.log.data.index.IndexDiagnostic.pickCommits import com.intellij.vcs.log.data.index.IndexDiagnostic.pickIndexedCommits import com.intellij.vcs.log.impl.VcsLogErrorHandler +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.callbackFlow import org.jetbrains.annotations.ApiStatus.Internal -import java.util.concurrent.TimeUnit +import kotlin.time.Duration.Companion.milliseconds + + +internal class IndexDiagnosticRunner( + private val index: VcsLogModifiableIndex, + private val storage: VcsLogStorage, + private val roots: Collection, + private val dataPackGetter: () -> DataPack, + private val commitDetailsGetter: CommitDetailsGetter, + private val errorHandler: VcsLogErrorHandler, + vcsLogData: VcsLogData +) : Disposable { + + @Suppress("SSBasedInspection") + private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO).also { + Disposer.register(this) { + it.cancel() + } + } -internal class IndexDiagnosticRunner(private val index: VcsLogModifiableIndex, - private val storage: VcsLogStorage, - private val roots: Collection, - private val dataPackGetter: () -> DataPack, - private val commitDetailsGetter: CommitDetailsGetter, - private val errorHandler: VcsLogErrorHandler, - parent: Disposable) : Disposable { - private val executor = AppExecutorUtil.createBoundedApplicationPoolExecutor("Index Diagnostic Runner", 1) private val bigRepositoriesList = VcsLogBigRepositoriesList.getInstance() - private val indexingListener = VcsLogIndex.IndexingFinishedListener { root -> runDiagnostic(listOf(root)) } - private val disposedFlag = Disposer.newCheckedDisposable() + private val rootsFlow = callbackFlow { + val indexListener = VcsLogIndex.IndexingFinishedListener { root -> + trySend(listOf(root)) + } + val bigRepoListener = object : VcsLogBigRepositoriesList.Listener { + override fun onRepositoryAdded(root: VirtualFile) { + trySend(listOf(root)) + } + } + val dataPackListener = DataPackChangeListener { + trySend(roots.filter { root -> index.isIndexed(root) || bigRepositoriesList.isBig(root) }) + } + + bigRepositoriesList.addListener(bigRepoListener, this@IndexDiagnosticRunner) + index.addListener(indexListener) + + awaitClose { + index.removeListener(indexListener) + vcsLogData.removeDataPackChangeListener(dataPackListener) + } + } + private val checkedRoots = ConcurrentCollectionFactory.createConcurrentSet() init { - index.addListener(indexingListener) - bigRepositoriesList.addListener(MyBigRepositoriesListListener(), this) - Disposer.register(parent, this) - Disposer.register(this, disposedFlag) + Disposer.register(vcsLogData, this) + coroutineScope.launch { + rootsFlow.collect(::runDiagnostic) + } } - private fun runDiagnostic(rootsToCheck: Collection) { - if (disposedFlag.isDisposed) return - - val backgroundTask = submitTask(executor, this) { - doRunDiagnostic(rootsToCheck) + private suspend fun runDiagnostic(rootsToCheck: Collection) { + try { + withTimeout(DIAGNOSTIC_TIMEOUT) { + doRunDiagnostic(rootsToCheck) + } } - backgroundTask.cancelAfter(3 * 60) { + catch (e: TimeoutCancellationException) { thisLogger().warn("Index diagnostic for $rootsToCheck is cancelled by timeout") + throw e } } @@ -101,30 +131,22 @@ internal class IndexDiagnosticRunner(private val index: VcsLogModifiableIndex, } } - fun onDataPackChange() { - runDiagnostic(roots.filter { root -> index.isIndexed(root) || bigRepositoriesList.isBig(root) }) - } + @Suppress("SSBasedInspection") override fun dispose() { - index.removeListener(indexingListener) - executor.shutdown() - try { - executor.awaitTermination(10, TimeUnit.MILLISECONDS) - } - catch (_: InterruptedException) { + runBlocking { + try { + withTimeout(10.milliseconds) { + coroutineScope.coroutineContext.job.join() + } + } + catch (e: TimeoutCancellationException) { + thisLogger().warn("Index diagnostic shutdown for $roots is cancelled by timeout") + } } } - private inner class MyBigRepositoriesListListener : VcsLogBigRepositoriesList.Listener { - override fun onRepositoryAdded(root: VirtualFile) = runDiagnostic(listOf(root)) + companion object { + private const val DIAGNOSTIC_TIMEOUT: Long = 3 * 60 * 1000 } } - -private fun BackgroundTask<*>.cancelAfter(delaySeconds: Long, onCancel: () -> Unit) { - val cancelTask = { - this.cancel() - onCancel() - } - val cancelFuture = AppExecutorUtil.getAppScheduledExecutorService().schedule(cancelTask, delaySeconds, TimeUnit.SECONDS) - this.future.whenComplete { _, _ -> cancelFuture.cancel(false) } -} \ No newline at end of file