IJPL-89250 Migrate IndexDiagnosticRunner to coroutines to fix flaky tests.

GitOrigin-RevId: 4e2198eabf3919ed83fd8f7225d064e149b625de
This commit is contained in:
Denis Zaichenko
2024-07-26 16:18:09 +02:00
committed by intellij-monorepo-bot
parent 04e9be386c
commit 3571f1c2b6
2 changed files with 69 additions and 48 deletions

View File

@@ -263,7 +263,6 @@ public final class VcsLogData implements Disposable, VcsLogDataProvider {
listener.onDataPackChange(dataPack); listener.onDataPackChange(dataPack);
} }
}, o -> myDisposableFlag.isDisposed()); }, o -> myDisposableFlag.isDisposed());
myIndexDiagnosticRunner.onDataPackChange();
} }
public void addDataPackChangeListener(final @NotNull DataPackChangeListener listener) { public void addDataPackChangeListener(final @NotNull DataPackChangeListener listener) {

View File

@@ -8,52 +8,82 @@ import com.intellij.openapi.Disposable
import com.intellij.openapi.diagnostic.Attachment import com.intellij.openapi.diagnostic.Attachment
import com.intellij.openapi.diagnostic.debug import com.intellij.openapi.diagnostic.debug
import com.intellij.openapi.diagnostic.thisLogger import com.intellij.openapi.diagnostic.thisLogger
import com.intellij.openapi.progress.util.BackgroundTaskUtil.BackgroundTask import com.intellij.openapi.progress.runBlockingCancellable
import com.intellij.openapi.progress.util.BackgroundTaskUtil.submitTask
import com.intellij.openapi.util.Disposer import com.intellij.openapi.util.Disposer
import com.intellij.openapi.vcs.VcsException import com.intellij.openapi.vcs.VcsException
import com.intellij.openapi.vfs.VirtualFile import com.intellij.openapi.vfs.VirtualFile
import com.intellij.util.concurrency.AppExecutorUtil
import com.intellij.util.concurrency.annotations.RequiresBackgroundThread import com.intellij.util.concurrency.annotations.RequiresBackgroundThread
import com.intellij.vcs.log.data.*
import com.intellij.vcs.log.data.AbstractDataGetter.Companion.getCommitDetails import com.intellij.vcs.log.data.AbstractDataGetter.Companion.getCommitDetails
import com.intellij.vcs.log.data.CommitDetailsGetter
import com.intellij.vcs.log.data.DataPack
import com.intellij.vcs.log.data.VcsLogStorage
import com.intellij.vcs.log.data.index.IndexDiagnostic.getDiffFor import com.intellij.vcs.log.data.index.IndexDiagnostic.getDiffFor
import com.intellij.vcs.log.data.index.IndexDiagnostic.pickCommits import com.intellij.vcs.log.data.index.IndexDiagnostic.pickCommits
import com.intellij.vcs.log.data.index.IndexDiagnostic.pickIndexedCommits import com.intellij.vcs.log.data.index.IndexDiagnostic.pickIndexedCommits
import com.intellij.vcs.log.impl.VcsLogErrorHandler import com.intellij.vcs.log.impl.VcsLogErrorHandler
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
import org.jetbrains.annotations.ApiStatus.Internal import org.jetbrains.annotations.ApiStatus.Internal
import java.util.concurrent.TimeUnit import kotlin.time.Duration.Companion.milliseconds
internal class IndexDiagnosticRunner(
private val index: VcsLogModifiableIndex,
private val storage: VcsLogStorage,
private val roots: Collection<VirtualFile>,
private val dataPackGetter: () -> DataPack,
private val commitDetailsGetter: CommitDetailsGetter,
private val errorHandler: VcsLogErrorHandler,
vcsLogData: VcsLogData
) : Disposable {
@Suppress("SSBasedInspection")
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO).also {
Disposer.register(this) {
it.cancel()
}
}
internal class IndexDiagnosticRunner(private val index: VcsLogModifiableIndex,
private val storage: VcsLogStorage,
private val roots: Collection<VirtualFile>,
private val dataPackGetter: () -> DataPack,
private val commitDetailsGetter: CommitDetailsGetter,
private val errorHandler: VcsLogErrorHandler,
parent: Disposable) : Disposable {
private val executor = AppExecutorUtil.createBoundedApplicationPoolExecutor("Index Diagnostic Runner", 1)
private val bigRepositoriesList = VcsLogBigRepositoriesList.getInstance() private val bigRepositoriesList = VcsLogBigRepositoriesList.getInstance()
private val indexingListener = VcsLogIndex.IndexingFinishedListener { root -> runDiagnostic(listOf(root)) } private val rootsFlow = callbackFlow {
private val disposedFlag = Disposer.newCheckedDisposable() val indexListener = VcsLogIndex.IndexingFinishedListener { root ->
trySend(listOf(root))
}
val bigRepoListener = object : VcsLogBigRepositoriesList.Listener {
override fun onRepositoryAdded(root: VirtualFile) {
trySend(listOf(root))
}
}
val dataPackListener = DataPackChangeListener {
trySend(roots.filter { root -> index.isIndexed(root) || bigRepositoriesList.isBig(root) })
}
bigRepositoriesList.addListener(bigRepoListener, this@IndexDiagnosticRunner)
index.addListener(indexListener)
awaitClose {
index.removeListener(indexListener)
vcsLogData.removeDataPackChangeListener(dataPackListener)
}
}
private val checkedRoots = ConcurrentCollectionFactory.createConcurrentSet<VirtualFile>() private val checkedRoots = ConcurrentCollectionFactory.createConcurrentSet<VirtualFile>()
init { init {
index.addListener(indexingListener) Disposer.register(vcsLogData, this)
bigRepositoriesList.addListener(MyBigRepositoriesListListener(), this) coroutineScope.launch {
Disposer.register(parent, this) rootsFlow.collect(::runDiagnostic)
Disposer.register(this, disposedFlag) }
} }
private fun runDiagnostic(rootsToCheck: Collection<VirtualFile>) { private suspend fun runDiagnostic(rootsToCheck: Collection<VirtualFile>) {
if (disposedFlag.isDisposed) return try {
withTimeout(DIAGNOSTIC_TIMEOUT) {
val backgroundTask = submitTask(executor, this) { doRunDiagnostic(rootsToCheck)
doRunDiagnostic(rootsToCheck) }
} }
backgroundTask.cancelAfter(3 * 60) { catch (e: TimeoutCancellationException) {
thisLogger<IndexDiagnosticRunner>().warn("Index diagnostic for $rootsToCheck is cancelled by timeout") thisLogger<IndexDiagnosticRunner>().warn("Index diagnostic for $rootsToCheck is cancelled by timeout")
throw e
} }
} }
@@ -101,30 +131,22 @@ internal class IndexDiagnosticRunner(private val index: VcsLogModifiableIndex,
} }
} }
fun onDataPackChange() {
runDiagnostic(roots.filter { root -> index.isIndexed(root) || bigRepositoriesList.isBig(root) })
}
@Suppress("SSBasedInspection")
override fun dispose() { override fun dispose() {
index.removeListener(indexingListener) runBlocking {
executor.shutdown() try {
try { withTimeout(10.milliseconds) {
executor.awaitTermination(10, TimeUnit.MILLISECONDS) coroutineScope.coroutineContext.job.join()
} }
catch (_: InterruptedException) { }
catch (e: TimeoutCancellationException) {
thisLogger().warn("Index diagnostic shutdown for $roots is cancelled by timeout")
}
} }
} }
private inner class MyBigRepositoriesListListener : VcsLogBigRepositoriesList.Listener { companion object {
override fun onRepositoryAdded(root: VirtualFile) = runDiagnostic(listOf(root)) private const val DIAGNOSTIC_TIMEOUT: Long = 3 * 60 * 1000
} }
} }
private fun BackgroundTask<*>.cancelAfter(delaySeconds: Long, onCancel: () -> Unit) {
val cancelTask = {
this.cancel()
onCancel()
}
val cancelFuture = AppExecutorUtil.getAppScheduledExecutorService().schedule(cancelTask, delaySeconds, TimeUnit.SECONDS)
this.future.whenComplete { _, _ -> cancelFuture.cancel(false) }
}