From a20761acd82c87531e130e0d99544b76e5f14e4e Mon Sep 17 00:00:00 2001 From: Mikhail Sokolov Date: Wed, 9 Oct 2024 13:39:42 +0200 Subject: [PATCH] IJPL-163538 replace flows in ActionAsyncProvider with concurrent producers/consumers (cherry picked from commit bea2cb8325e7f02649c84c5cf583beaa6cc66404) IJ-MR-146410 (cherry picked from commit d336bf297fddb087a2dadaec74c70aa2f699d8df) (cherry picked from commit c9d83f78d187cd231cc4103f723de4c0ff9be4b6) IJ-CR-153899 GitOrigin-RevId: ddfd94e0dbad5920d9f29758161403667267a3b2 --- .../ide/util/gotoByName/GotoActionTest.java | 7 + .../util/gotoByName/ActionAsyncProvider.kt | 472 ++++++++---------- platform/util/coroutines/src/collections.kt | 12 + 3 files changed, 232 insertions(+), 259 deletions(-) diff --git a/java/java-tests/testSrc/com/intellij/ide/util/gotoByName/GotoActionTest.java b/java/java-tests/testSrc/com/intellij/ide/util/gotoByName/GotoActionTest.java index d223d006ea60..059dfa6d3e39 100644 --- a/java/java-tests/testSrc/com/intellij/ide/util/gotoByName/GotoActionTest.java +++ b/java/java-tests/testSrc/com/intellij/ide/util/gotoByName/GotoActionTest.java @@ -11,6 +11,7 @@ import com.intellij.ide.util.gotoByName.GotoActionModel.ActionWrapper; import com.intellij.ide.util.gotoByName.GotoActionModel.MatchedValue; import com.intellij.ide.util.gotoByName.GotoActionModel.MatchedValueType; import com.intellij.java.navigation.ChooseByNameTest; +import com.intellij.mock.MockProgressIndicator; import com.intellij.openapi.Disposable; import com.intellij.openapi.actionSystem.*; import com.intellij.openapi.application.ApplicationManager; @@ -334,6 +335,12 @@ public class GotoActionTest extends LightJavaCodeInsightFixtureTestCase { }); } + public void testSearchWorks() { + SearchEverywhereContributor contributor = createActionContributor(getProject(), getTestRootDisposable()); + List list = contributor.search("sea", new MockProgressIndicator(), 10).getItems(); + assertEquals(10, list.size()); + } + private static boolean isNavigableOption(Object o) { return o instanceof OptionDescription && !(o instanceof BooleanOptionDescription); } diff --git a/platform/lang-impl/src/com/intellij/ide/util/gotoByName/ActionAsyncProvider.kt b/platform/lang-impl/src/com/intellij/ide/util/gotoByName/ActionAsyncProvider.kt index 706b6e493a20..748b1454fd33 100644 --- a/platform/lang-impl/src/com/intellij/ide/util/gotoByName/ActionAsyncProvider.kt +++ b/platform/lang-impl/src/com/intellij/ide/util/gotoByName/ActionAsyncProvider.kt @@ -18,16 +18,21 @@ import com.intellij.openapi.diagnostic.debug import com.intellij.openapi.diagnostic.getOrLogException import com.intellij.openapi.diagnostic.logger import com.intellij.openapi.util.registry.Registry +import com.intellij.platform.util.coroutines.forEachConcurrent +import com.intellij.platform.util.coroutines.mapConcurrent +import com.intellij.platform.util.coroutines.mapNotNullConcurrent +import com.intellij.platform.util.coroutines.transformConcurrent import com.intellij.psi.codeStyle.MinusculeMatcher import com.intellij.psi.codeStyle.NameUtil import com.intellij.ui.switcher.QuickActionProvider +import com.intellij.util.CollectConsumer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.toList import java.util.* import java.util.concurrent.ConcurrentHashMap -import java.util.function.Consumer import kotlin.coroutines.coroutineContext private val LOG = logger() @@ -36,6 +41,7 @@ private val LOG = logger() internal class ActionAsyncProvider(private val model: GotoActionModel) { private val actionManager: ActionManager = ActionManager.getInstance() private val intentions = ConcurrentHashMap() + private val MATCHED_VALUE_COMPARATOR = Comparator { o1, o2 -> o1.compareWeights(o2) } fun processActions( scope: CoroutineScope, @@ -44,23 +50,10 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) { ids: Set, consumer: suspend (MatchedValue) -> Boolean, ) { - scope.launch { - val channel = produce { - flattenConcat(listOf( - matchedActionsAndStubsFlow(pattern = pattern, allIds = ids, presentationProvider = presentationProvider), - unmatchedStubsFlow(pattern = pattern, allIds = ids, presentationProvider = presentationProvider), - )).collect { send(it) } - } - - try { - for (i in channel) { - if (!consumer(i)) return@launch - } - } - finally { - channel.cancel() - } + val nonMatchedIdsChannel = Channel(capacity = Channel.UNLIMITED) + val matchedStubsJob = processMatchedActionsAndStubs(pattern, ids, presentationProvider, consumer, nonMatchedIdsChannel, null) + processUnmatchedStubs(nonMatchedIdsChannel, pattern, presentationProvider, consumer, matchedStubsJob) } } @@ -76,214 +69,195 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) { val actionIds = (actionManager as ActionManagerImpl).actionIds - val comparator: Comparator = Comparator { o1, o2 -> o1.compareWeights(o2) } - scope.launch { - val channel = produce { - flattenConcat(listOf( - abbreviationsFlow(pattern, presentationProvider).sorted(comparator), - matchedActionsAndStubsFlow(pattern, actionIds, presentationProvider), - unmatchedStubsFlow(pattern, actionIds, presentationProvider), - topHitsFlow(pattern, presentationProvider).sorted(comparator), - intentionsFlow(pattern, presentationProvider).sorted(comparator), - optionsFlow(pattern, presentationProvider).sorted(comparator) - )).collect { send(it) } - } + val abbreviationsJob = processAbbreviations(pattern, presentationProvider, consumer) - try { - for (i in channel) { - if (!consumer(i)) return@launch - } - } - finally { - channel.cancel() - } + val nonMatchedIdsChannel = Channel(capacity = Channel.UNLIMITED) + val matchedStubsJob = processMatchedActionsAndStubs(pattern, actionIds, presentationProvider, consumer, nonMatchedIdsChannel, abbreviationsJob) + val unmatchedStubsJob = processUnmatchedStubs(nonMatchedIdsChannel, pattern, presentationProvider, consumer, matchedStubsJob) + + val topHitsJob = processTopHits(pattern, presentationProvider, consumer, unmatchedStubsJob) + val intentionsJob = processIntentions(pattern, presentationProvider, consumer, topHitsJob) + processOptions(pattern, presentationProvider, consumer, intentionsJob) } } - private fun Flow.sorted(comparator: Comparator): Flow { - val sourceFlow = this - return flow { - val list = sourceFlow.catch { e -> LOG.error("Error while collecting actions.", e) } - .toSet() - .toMutableList() - list.sortWith(comparator) - list.forEach { emit(it) } - } - } - - private suspend fun abbreviationsFlow(pattern: String, presentationProvider: suspend (AnAction) -> Presentation): Flow { - LOG.debug { "Create abbreviations flow ($pattern)" } + private fun CoroutineScope.processAbbreviations(pattern: String, + presentationProvider: suspend (AnAction) -> Presentation, + consumer: suspend (MatchedValue) -> Boolean + ): Job = launch { + LOG.debug { "Process abbreviations for \"$pattern\"" } val matcher = buildWeightMatcher(pattern) val actionIds = serviceAsync().findActions(pattern) - return actionIds.asFlow() - .mapNotNull { loadAction(it) } - .buffer(RENDEZVOUS) - .map { - val presentation = presentationProvider(it) - val wrapper = wrapAnAction(it, presentation) - val degree = matcher.matchingDegree(pattern) - abbreviationMatchedValue(wrapper, pattern, degree) - } + actionIds.forEachConcurrent { id -> + val action = loadAction(id) ?: return@forEachConcurrent + val presentation = presentationProvider(action) + val wrapper = wrapAnAction(action, presentation) + val degree = matcher.matchingDegree(pattern) + val matchedValue = abbreviationMatchedValue(wrapper, pattern, degree) + if (!consumer(matchedValue)) cancel() + } } - private fun matchedActionsAndStubsFlow( - pattern: String, - allIds: Collection, - presentationProvider: suspend (AnAction) -> Presentation, - ): Flow { + private fun CoroutineScope.processMatchedActionsAndStubs(pattern: String, + allIds: Collection, + presentationProvider: suspend (AnAction) -> Presentation, + consumer: suspend (MatchedValue) -> Boolean, + unmatchedIdsChannel: SendChannel, + awaitJob: Job? + ): Job = launch { val weightMatcher = buildWeightMatcher(pattern) - return flow { - val list = collectMatchedActions(pattern, allIds, weightMatcher) - LOG.debug { "List is collected" } + val list = collectMatchedActions(pattern, allIds, weightMatcher, unmatchedIdsChannel) + LOG.debug { "Matched actions list is collected" } - for (matchedAction in list) { - val action = matchedAction.action - if (action is ActionStubBase) { - loadAction(action.id)?.let { loaded -> emit(MatchedAction(loaded, matchedAction.mode, matchedAction.weight)) } - } - else { - emit(matchedAction) - } - } + awaitJob?.join() //wait until all items from previous step are processed + LOG.debug { "Process matched actions for \"$pattern\"" } + list.forEachConcurrent { matchedActionOrStub -> //todo maintain order + val action = matchedActionOrStub.action + val matchedAction = if (action is ActionStubBase) loadAction(action.id)?.let { MatchedAction(it, matchedActionOrStub.mode, matchedActionOrStub.weight) } else matchedActionOrStub + if (matchedAction == null) return@forEachConcurrent + val presentation = presentationProvider(matchedAction.action) + val matchedValue = matchItem( + item = wrapAnAction(action = matchedAction.action, presentation = presentation, matchMode = matchedAction.mode), + matcher = weightMatcher, + pattern = pattern, + matchType = MatchedValueType.ACTION, + ) + if (!consumer(matchedValue)) cancel() } - .buffer(RENDEZVOUS) - .map { matchedAction -> - val presentation = presentationProvider(matchedAction.action) - matchItem( - item = wrapAnAction(action = matchedAction.action, presentation = presentation, matchMode = matchedAction.mode), - matcher = weightMatcher, - pattern = pattern, - matchType = MatchedValueType.ACTION, - ) - } } - private suspend fun collectMatchedActions(pattern: String, allIds: Collection, weightMatcher: MinusculeMatcher): List = coroutineScope { + private suspend fun collectMatchedActions(pattern: String, allIds: Collection, weightMatcher: MinusculeMatcher, unmatchedIdsChannel: SendChannel): List = coroutineScope { val matcher = buildMatcher(pattern) - val mainActions: List = allIds.mapNotNull { + data class ActionWithID(val action: AnAction, val id: String?) + fun List.withIDs() = map { ActionWithID(it, actionManager.getId(it)) } + + val mainActions: List = allIds.mapNotNull { val action = actionManager.getActionOrStub(it) ?: return@mapNotNull null if (action is ActionGroup && !action.isSearchable) return@mapNotNull null - return@mapNotNull action + return@mapNotNull ActionWithID(action, it) } - val extendedActions = model.dataContext.getData(QuickActionProvider.KEY)?.getActions(true) ?: emptyList() - val allActions = mainActions + extendedActions + extendedActions.flatMap { - (it as? ActionGroup)?.let { model.updateSession.children(it) } ?: emptyList() - } - val actions = allActions.mapNotNull { + val extendedActions: List = model.dataContext.getData(QuickActionProvider.KEY)?.getActions(true)?.withIDs() ?: emptyList() + val allActions: List = mainActions + + extendedActions + + extendedActions.flatMap { (it.action as? ActionGroup)?.let { model.updateSession.children(it).withIDs() } ?: emptyList() } + val actions = allActions.mapNotNullConcurrent { actionWithID -> runCatching { - val mode = model.actionMatches(pattern, matcher, it) + val mode = model.actionMatches(pattern, matcher, actionWithID.action) if (mode != MatchMode.NONE) { - val weight = calcElementWeight(it, pattern, weightMatcher) - return@runCatching(MatchedAction(it, mode, weight)) + val weight = calcElementWeight(actionWithID, pattern, weightMatcher) + return@runCatching(MatchedAction(actionWithID.action, mode, weight)) + } + else { + if (actionWithID.action is ActionStubBase) actionWithID.id?.let { unmatchedIdsChannel.send(it) } + return@runCatching null } - return@runCatching null }.getOrLogException(LOG) } + unmatchedIdsChannel.close() val comparator = Comparator.comparing { it.weight ?: 0 }.reversed() return@coroutineScope actions.sortedWith(comparator) } - private fun unmatchedStubsFlow(pattern: String, allIds: Collection, - presentationProvider: suspend (AnAction) -> Presentation): Flow { + private fun CoroutineScope.processUnmatchedStubs(nonMatchedIds: ReceiveChannel, + pattern: String, + presentationProvider: suspend (AnAction) -> Presentation, + consumer: suspend (MatchedValue) -> Boolean, + awaitJob: Job + ): Job = launch { val matcher = buildMatcher(pattern) val weightMatcher = buildWeightMatcher(pattern) - return allIds.asFlow() - .mapNotNull { - val action = actionManager.getActionOrStub(it) ?: return@mapNotNull null - if (action is ActionGroup && !action.isSearchable) return@mapNotNull null - action - } - .filter { - runCatching { (it is ActionStubBase) && model.actionMatches(pattern, matcher, it) == MatchMode.NONE } - .getOrLogException(LOG) == true - } - .transform { - runCatching { - val action = loadAction((it as ActionStubBase).id) ?: return@runCatching - val mode = model.actionMatches(pattern, matcher, action) - if (mode != MatchMode.NONE) { - val weight = calcElementWeight(element = action, pattern = pattern, matcher = weightMatcher) - emit(MatchedAction(action = action, mode = mode, weight = weight)) - } - }.getOrLogException(LOG) - } - .buffer(RENDEZVOUS) - .map { matchedAction -> + val matchedActions = nonMatchedIds.toList().mapNotNullConcurrent { id -> + runCatching { + val action = loadAction(id) ?: return@runCatching null + if (action is ActionGroup && !action.isSearchable) return@runCatching null + + val mode = model.actionMatches(pattern, matcher, action) + if (mode == MatchMode.NONE) return@runCatching null + + val weight = calcElementWeight(element = action, pattern = pattern, matcher = weightMatcher) + val matchedAction = MatchedAction(action = action, mode = mode, weight = weight) val presentation = presentationProvider(matchedAction.action) val item = wrapAnAction(matchedAction.action, presentation, matchedAction.mode) - matchItem(item = item, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.ACTION) - } + val matchedValue = matchItem(item = item, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.ACTION) + return@runCatching matchedValue + }.getOrLogException(LOG) + }.sortedWith(MATCHED_VALUE_COMPARATOR) + + awaitJob.join() //wait until all items from previous step are processed + LOG.debug { "Process unmatched stubs for \"$pattern\"" } + matchedActions.forEach { + if (!consumer(it)) cancel() + } } - private fun topHitsFlow( - pattern: String, - presentationProvider: suspend (AnAction) -> Presentation, - ): Flow { - LOG.debug { "Create TopHits flow ($pattern)" } - + private fun CoroutineScope.processTopHits(pattern: String, + presentationProvider: suspend (AnAction) -> Presentation, + consumer: suspend (MatchedValue) -> Boolean, + awaitJob: Job + ): Job = launch { val project = model.project val commandAccelerator = SearchTopHitProvider.getTopHitAccelerator() val matcher = buildWeightMatcher(pattern) + val collector = CollectConsumer() - return channelFlow { - val collector = Consumer { item -> - launch { - val obj = (item as? AnAction)?.let { wrapAnAction(action = it, presentation = presentationProvider(it)) } ?: item - val matchedValue = matchItem(item = obj, matcher = matcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT) - send(matchedValue) - } + for (provider in SearchTopHitProvider.EP_NAME.extensionList) { + @Suppress("DEPRECATION") + if (provider is com.intellij.ide.ui.OptionsTopHitProvider.CoveredByToggleActions) { + continue } - for (provider in SearchTopHitProvider.EP_NAME.extensionList) { - @Suppress("DEPRECATION") - if (provider is com.intellij.ide.ui.OptionsTopHitProvider.CoveredByToggleActions) { - continue - } - - if (provider is OptionsSearchTopHitProvider && !pattern.startsWith(commandAccelerator)) { - val prefix = commandAccelerator + provider.getId() + " " - provider.consumeTopHits(pattern = prefix + pattern, collector = collector, project = project) - } - else if (project != null && provider is ProjectLevelProvidersAdapter) { - provider.consumeAllTopHits( - pattern = pattern, - collector = { - send(matchItem(item = it, matcher = matcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT)) - }, - project = project, - ) - } - provider.consumeTopHits(pattern, collector, project) + if (provider is OptionsSearchTopHitProvider && !pattern.startsWith(commandAccelerator)) { + val prefix = commandAccelerator + provider.getId() + " " + provider.consumeTopHits(pattern = prefix + pattern, collector = collector, project = project) } + else if (project != null && provider is ProjectLevelProvidersAdapter) { + provider.consumeAllTopHits( + pattern = pattern, + collector = { collector.accept(it) }, + project = project, + ) + } + provider.consumeTopHits(pattern, collector, project) } + + val matchedValues = collector.result.mapConcurrent { item -> + val obj = (item as? AnAction)?.let { wrapAnAction(action = it, presentation = presentationProvider(it)) } ?: item + matchItem(item = obj, matcher = matcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT) + }.sortedWith(MATCHED_VALUE_COMPARATOR) + + awaitJob.join() //wait until all items from previous step are processed + LOG.debug { "Process top hits for \"$pattern\"" } + matchedValues.forEach { if (!consumer(it)) cancel() } } - private fun intentionsFlow( - pattern: String, - presentationProvider: suspend (AnAction) -> Presentation, - ): Flow { - LOG.debug { "Create intentions flow ($pattern)" } + private fun CoroutineScope.processIntentions(pattern: String, + presentationProvider: suspend (AnAction) -> Presentation, + consumer: suspend (MatchedValue) -> Boolean, + awaitJob: Job + ): Job = launch { val matcher = buildMatcher(pattern) val weightMatcher = buildWeightMatcher(pattern) - return channelFlow { - launch { - for ((text, action) in getIntentionsMap()) { - if (model.actionMatches(pattern, matcher, action) != MatchMode.NONE) { - val groupMapping = GroupMapping.createFromText(text, false) - send(ActionWrapper(action, groupMapping, MatchMode.INTENTION, presentationProvider(action))) - } - } + val matchedValues = getIntentionsMap().entries + .mapNotNullConcurrent { (text, action) -> + if (model.actionMatches(pattern, matcher, action) == MatchMode.NONE) return@mapNotNullConcurrent null + + val groupMapping = GroupMapping.createFromText(text, false) + val wrapper = ActionWrapper(action, groupMapping, MatchMode.INTENTION, presentationProvider(action)) + matchItem(wrapper, weightMatcher, pattern, MatchedValueType.INTENTION) } - } - .map { matchItem(it, weightMatcher, pattern, MatchedValueType.INTENTION) } + .sortedWith(MATCHED_VALUE_COMPARATOR) + + awaitJob.join() + LOG.debug { "Process intentions for \"$pattern\""} + matchedValues.forEach { if (!consumer(it)) cancel() } } private suspend fun getIntentionsMap(): Map { @@ -297,12 +271,10 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) { return intentions } - private suspend fun optionsFlow( - pattern: String, - presentationProvider: suspend (AnAction) -> Presentation, - ): Flow { - LOG.debug { "Create options flow ($pattern)" } - + private fun CoroutineScope.processOptions(pattern: String, + presentationProvider: suspend (AnAction) -> Presentation, + consumer: suspend (MatchedValue) -> Boolean, + awaitJob: Job): Job = launch { val weightMatcher = buildWeightMatcher(pattern) val map = model.configurablesNames @@ -311,66 +283,70 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) { val words = registrar.getProcessedWords(pattern) val filterOutInspections = Registry.`is`("go.to.action.filter.out.inspections", true) - @Suppress("RemoveExplicitTypeArguments") - return channelFlow { - // Use LinkedHashSet to preserve the order of the elements to iterate through them later - val optionDescriptions = LinkedHashSet() - if (pattern.isNotBlank()) { - val matcher = buildMatcher(pattern) - for ((key, value) in map) { - if (matcher.matches(value)) { - optionDescriptions.add(OptionDescription(_option = null, configurableId = key, hit = value, path = null, groupName = value)) - } - } - } + // Use LinkedHashSet to preserve the order of the elements to iterate through them later + val optionDescriptions = LinkedHashSet() - var registrarDescriptions: MutableSet? = null - registrar.initialize() - for (word in words) { - val descriptions = registrar.findAcceptableDescriptions(word) - ?.filter { - @Suppress("HardCodedStringLiteral") - !(it.path == "ActionManager" || filterOutInspections && it.groupName == "Inspections") - } - ?.toHashSet() - if (descriptions.isNullOrEmpty()) { - registrarDescriptions = null - break - } - - if (registrarDescriptions == null) { - registrarDescriptions = descriptions - } - else { - registrarDescriptions.retainAll(descriptions) - } - } - - // Add registrar's options to the end of the `LinkedHashSet` - // to guarantee that options from the `map` are going to be processed first - if (registrarDescriptions != null) { - optionDescriptions.addAll(registrarDescriptions) - } - - if (optionDescriptions.isNotEmpty()) { - val currentHits = HashSet() - val iterator = optionDescriptions.iterator() - for (description in iterator) { - val hit = description.hit - if (hit == null || !currentHits.add(hit.trim())) { - iterator.remove() - } - } - for (description in optionDescriptions) { - for (converter in ActionFromOptionDescriptorProvider.EP.extensionList) { - val action = converter.provide(description) ?: continue - send(ActionWrapper(action, null, MatchMode.NAME, presentationProvider(action))) - } - send(description) + if (pattern.isNotBlank()) { + val matcher = buildMatcher(pattern) + for ((key, value) in map) { + if (matcher.matches(value)) { + optionDescriptions.add(OptionDescription(_option = null, configurableId = key, hit = value, path = null, groupName = value)) } } } - .map { matchItem(item = it, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT) } + + var registrarDescriptions: MutableSet? = null + registrar.initialize() + for (word in words) { + val descriptions = registrar.findAcceptableDescriptions(word) + ?.filter { + @Suppress("HardCodedStringLiteral") + !(it.path == "ActionManager" || filterOutInspections && it.groupName == "Inspections") + } + ?.toHashSet() + if (descriptions.isNullOrEmpty()) { + registrarDescriptions = null + break + } + + if (registrarDescriptions == null) { + registrarDescriptions = descriptions + } + else { + registrarDescriptions.retainAll(descriptions) + } + } + + // Add registrar's options to the end of the `LinkedHashSet` + // to guarantee that options from the `map` are going to be processed first + if (registrarDescriptions != null) { + optionDescriptions.addAll(registrarDescriptions) + } + + if (optionDescriptions.isNotEmpty()) { + val currentHits = HashSet() + val iterator = optionDescriptions.iterator() + for (description in iterator) { + val hit = description.hit + if (hit == null || !currentHits.add(hit.trim())) { + iterator.remove() + } + } + + + val matchedValues = optionDescriptions.transformConcurrent { description -> + for (converter in ActionFromOptionDescriptorProvider.EP.extensionList) { + val action = converter.provide(description) ?: continue + val actionWrapper = ActionWrapper(action, null, MatchMode.NAME, presentationProvider(action)) + out(matchItem(item = actionWrapper, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT)) + } + out(matchItem(item = description, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT)) + }.sortedWith(MATCHED_VALUE_COMPARATOR) + + awaitJob.join() + LOG.debug { "Process options for \"$pattern\""} + matchedValues.forEach { if (!consumer(it)) cancel() } + } } private suspend fun loadAction(id: String): AnAction? { @@ -409,25 +385,3 @@ private fun abbreviationMatchedValue(wrapper: ActionWrapper, pattern: String, de } private data class MatchedAction(@JvmField val action: AnAction, @JvmField val mode: MatchMode, @JvmField val weight: Int?) - -/** - * Creates a unified [Flow] by efficiently combining multiple [Flow]s in a flattened, concatenated format. - * The conventional {code flattenConcat} method from the standard coroutines library isn't suitable in this context for the reasons below: - * - * - Initialization of the combined flows is resource-intensive, leading to a significant delay between the invocation of {code collect()} - * and the emission of the first item. - * - * - The standard {code flattenConcat} method generates subsequent flows only after prior data has been sequentially collected. This mechanism forces us to - * wait for completion of each flow's initialization process during the transition between flows. - * - * - Contrastingly, this function initiates a collection of all the flows concurrently using parallel coroutines, thereby accomplishing flow initialization in parallel, - * significantly enhancing the process's efficiency. - * @param flows The list of [Flow]s to flatten and concatenate. - * @return The flattened and concatenated [Flow]. - */ -@OptIn(ExperimentalCoroutinesApi::class) -private fun flattenConcat(flows: List>): Flow = channelFlow { - flows.map { flow -> produce { flow.collect { send(it) } } } - .forEach { ch -> for (i in ch) send(i) } -} - diff --git a/platform/util/coroutines/src/collections.kt b/platform/util/coroutines/src/collections.kt index 7a8a63a014d8..a348070bab94 100644 --- a/platform/util/coroutines/src/collections.kt +++ b/platform/util/coroutines/src/collections.kt @@ -132,6 +132,18 @@ suspend fun Collection.mapConcurrent( } } +/** + * Maps each item of [this] collection to another item with [action] concurrently using [transformConcurrent] and collects non-null results. + */ +suspend fun Collection.mapNotNullConcurrent( + concurrency: Int = DEFAULT_CONCURRENCY, + action: suspend (T) -> R? +): Collection { + return transformConcurrent(concurrency) { v -> + action(v)?.let { mv -> out(mv) } + } +} + /** * Filters items of [this] collection according to [action] concurrently using [transformConcurrent]. */