IJPL-163538 replace flows in ActionAsyncProvider with concurrent producers/consumers

(cherry picked from commit bea2cb8325e7f02649c84c5cf583beaa6cc66404)

IJ-MR-146410

(cherry picked from commit d336bf297fddb087a2dadaec74c70aa2f699d8df)


(cherry picked from commit c9d83f78d187cd231cc4103f723de4c0ff9be4b6)

IJ-CR-153899

GitOrigin-RevId: ddfd94e0dbad5920d9f29758161403667267a3b2
This commit is contained in:
Mikhail Sokolov
2024-10-09 13:39:42 +02:00
committed by intellij-monorepo-bot
parent ddcbb4c342
commit a20761acd8
3 changed files with 232 additions and 259 deletions

View File

@@ -11,6 +11,7 @@ import com.intellij.ide.util.gotoByName.GotoActionModel.ActionWrapper;
import com.intellij.ide.util.gotoByName.GotoActionModel.MatchedValue;
import com.intellij.ide.util.gotoByName.GotoActionModel.MatchedValueType;
import com.intellij.java.navigation.ChooseByNameTest;
import com.intellij.mock.MockProgressIndicator;
import com.intellij.openapi.Disposable;
import com.intellij.openapi.actionSystem.*;
import com.intellij.openapi.application.ApplicationManager;
@@ -334,6 +335,12 @@ public class GotoActionTest extends LightJavaCodeInsightFixtureTestCase {
});
}
public void testSearchWorks() {
SearchEverywhereContributor<?> contributor = createActionContributor(getProject(), getTestRootDisposable());
List<?> list = contributor.search("sea", new MockProgressIndicator(), 10).getItems();
assertEquals(10, list.size());
}
private static boolean isNavigableOption(Object o) {
return o instanceof OptionDescription && !(o instanceof BooleanOptionDescription);
}

View File

@@ -18,16 +18,21 @@ import com.intellij.openapi.diagnostic.debug
import com.intellij.openapi.diagnostic.getOrLogException
import com.intellij.openapi.diagnostic.logger
import com.intellij.openapi.util.registry.Registry
import com.intellij.platform.util.coroutines.forEachConcurrent
import com.intellij.platform.util.coroutines.mapConcurrent
import com.intellij.platform.util.coroutines.mapNotNullConcurrent
import com.intellij.platform.util.coroutines.transformConcurrent
import com.intellij.psi.codeStyle.MinusculeMatcher
import com.intellij.psi.codeStyle.NameUtil
import com.intellij.ui.switcher.QuickActionProvider
import com.intellij.util.CollectConsumer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.toList
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer
import kotlin.coroutines.coroutineContext
private val LOG = logger<ActionAsyncProvider>()
@@ -36,6 +41,7 @@ private val LOG = logger<ActionAsyncProvider>()
internal class ActionAsyncProvider(private val model: GotoActionModel) {
private val actionManager: ActionManager = ActionManager.getInstance()
private val intentions = ConcurrentHashMap<String, ApplyIntentionAction>()
private val MATCHED_VALUE_COMPARATOR = Comparator<MatchedValue> { o1, o2 -> o1.compareWeights(o2) }
fun processActions(
scope: CoroutineScope,
@@ -44,23 +50,10 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) {
ids: Set<String>,
consumer: suspend (MatchedValue) -> Boolean,
) {
scope.launch {
val channel = produce {
flattenConcat(listOf(
matchedActionsAndStubsFlow(pattern = pattern, allIds = ids, presentationProvider = presentationProvider),
unmatchedStubsFlow(pattern = pattern, allIds = ids, presentationProvider = presentationProvider),
)).collect { send(it) }
}
try {
for (i in channel) {
if (!consumer(i)) return@launch
}
}
finally {
channel.cancel()
}
val nonMatchedIdsChannel = Channel<String>(capacity = Channel.UNLIMITED)
val matchedStubsJob = processMatchedActionsAndStubs(pattern, ids, presentationProvider, consumer, nonMatchedIdsChannel, null)
processUnmatchedStubs(nonMatchedIdsChannel, pattern, presentationProvider, consumer, matchedStubsJob)
}
}
@@ -76,214 +69,195 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) {
val actionIds = (actionManager as ActionManagerImpl).actionIds
val comparator: Comparator<MatchedValue> = Comparator { o1, o2 -> o1.compareWeights(o2) }
scope.launch {
val channel = produce {
flattenConcat(listOf(
abbreviationsFlow(pattern, presentationProvider).sorted(comparator),
matchedActionsAndStubsFlow(pattern, actionIds, presentationProvider),
unmatchedStubsFlow(pattern, actionIds, presentationProvider),
topHitsFlow(pattern, presentationProvider).sorted(comparator),
intentionsFlow(pattern, presentationProvider).sorted(comparator),
optionsFlow(pattern, presentationProvider).sorted(comparator)
)).collect { send(it) }
}
val abbreviationsJob = processAbbreviations(pattern, presentationProvider, consumer)
try {
for (i in channel) {
if (!consumer(i)) return@launch
}
}
finally {
channel.cancel()
}
val nonMatchedIdsChannel = Channel<String>(capacity = Channel.UNLIMITED)
val matchedStubsJob = processMatchedActionsAndStubs(pattern, actionIds, presentationProvider, consumer, nonMatchedIdsChannel, abbreviationsJob)
val unmatchedStubsJob = processUnmatchedStubs(nonMatchedIdsChannel, pattern, presentationProvider, consumer, matchedStubsJob)
val topHitsJob = processTopHits(pattern, presentationProvider, consumer, unmatchedStubsJob)
val intentionsJob = processIntentions(pattern, presentationProvider, consumer, topHitsJob)
processOptions(pattern, presentationProvider, consumer, intentionsJob)
}
}
private fun <T> Flow<T>.sorted(comparator: Comparator<in T>): Flow<T> {
val sourceFlow = this
return flow {
val list = sourceFlow.catch { e -> LOG.error("Error while collecting actions.", e) }
.toSet()
.toMutableList()
list.sortWith(comparator)
list.forEach { emit(it) }
}
}
private suspend fun abbreviationsFlow(pattern: String, presentationProvider: suspend (AnAction) -> Presentation): Flow<MatchedValue> {
LOG.debug { "Create abbreviations flow ($pattern)" }
private fun CoroutineScope.processAbbreviations(pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
consumer: suspend (MatchedValue) -> Boolean
): Job = launch {
LOG.debug { "Process abbreviations for \"$pattern\"" }
val matcher = buildWeightMatcher(pattern)
val actionIds = serviceAsync<AbbreviationManager>().findActions(pattern)
return actionIds.asFlow()
.mapNotNull { loadAction(it) }
.buffer(RENDEZVOUS)
.map {
val presentation = presentationProvider(it)
val wrapper = wrapAnAction(it, presentation)
val degree = matcher.matchingDegree(pattern)
abbreviationMatchedValue(wrapper, pattern, degree)
}
actionIds.forEachConcurrent { id ->
val action = loadAction(id) ?: return@forEachConcurrent
val presentation = presentationProvider(action)
val wrapper = wrapAnAction(action, presentation)
val degree = matcher.matchingDegree(pattern)
val matchedValue = abbreviationMatchedValue(wrapper, pattern, degree)
if (!consumer(matchedValue)) cancel()
}
}
private fun matchedActionsAndStubsFlow(
pattern: String,
allIds: Collection<String>,
presentationProvider: suspend (AnAction) -> Presentation,
): Flow<MatchedValue> {
private fun CoroutineScope.processMatchedActionsAndStubs(pattern: String,
allIds: Collection<String>,
presentationProvider: suspend (AnAction) -> Presentation,
consumer: suspend (MatchedValue) -> Boolean,
unmatchedIdsChannel: SendChannel<String>,
awaitJob: Job?
): Job = launch {
val weightMatcher = buildWeightMatcher(pattern)
return flow {
val list = collectMatchedActions(pattern, allIds, weightMatcher)
LOG.debug { "List is collected" }
val list = collectMatchedActions(pattern, allIds, weightMatcher, unmatchedIdsChannel)
LOG.debug { "Matched actions list is collected" }
for (matchedAction in list) {
val action = matchedAction.action
if (action is ActionStubBase) {
loadAction(action.id)?.let { loaded -> emit(MatchedAction(loaded, matchedAction.mode, matchedAction.weight)) }
}
else {
emit(matchedAction)
}
}
awaitJob?.join() //wait until all items from previous step are processed
LOG.debug { "Process matched actions for \"$pattern\"" }
list.forEachConcurrent { matchedActionOrStub -> //todo maintain order
val action = matchedActionOrStub.action
val matchedAction = if (action is ActionStubBase) loadAction(action.id)?.let { MatchedAction(it, matchedActionOrStub.mode, matchedActionOrStub.weight) } else matchedActionOrStub
if (matchedAction == null) return@forEachConcurrent
val presentation = presentationProvider(matchedAction.action)
val matchedValue = matchItem(
item = wrapAnAction(action = matchedAction.action, presentation = presentation, matchMode = matchedAction.mode),
matcher = weightMatcher,
pattern = pattern,
matchType = MatchedValueType.ACTION,
)
if (!consumer(matchedValue)) cancel()
}
.buffer(RENDEZVOUS)
.map { matchedAction ->
val presentation = presentationProvider(matchedAction.action)
matchItem(
item = wrapAnAction(action = matchedAction.action, presentation = presentation, matchMode = matchedAction.mode),
matcher = weightMatcher,
pattern = pattern,
matchType = MatchedValueType.ACTION,
)
}
}
private suspend fun collectMatchedActions(pattern: String, allIds: Collection<String>, weightMatcher: MinusculeMatcher): List<MatchedAction> = coroutineScope {
private suspend fun collectMatchedActions(pattern: String, allIds: Collection<String>, weightMatcher: MinusculeMatcher, unmatchedIdsChannel: SendChannel<String>): List<MatchedAction> = coroutineScope {
val matcher = buildMatcher(pattern)
val mainActions: List<AnAction> = allIds.mapNotNull {
data class ActionWithID(val action: AnAction, val id: String?)
fun List<AnAction>.withIDs() = map { ActionWithID(it, actionManager.getId(it)) }
val mainActions: List<ActionWithID> = allIds.mapNotNull {
val action = actionManager.getActionOrStub(it) ?: return@mapNotNull null
if (action is ActionGroup && !action.isSearchable) return@mapNotNull null
return@mapNotNull action
return@mapNotNull ActionWithID(action, it)
}
val extendedActions = model.dataContext.getData(QuickActionProvider.KEY)?.getActions(true) ?: emptyList()
val allActions = mainActions + extendedActions + extendedActions.flatMap {
(it as? ActionGroup)?.let { model.updateSession.children(it) } ?: emptyList()
}
val actions = allActions.mapNotNull {
val extendedActions: List<ActionWithID> = model.dataContext.getData(QuickActionProvider.KEY)?.getActions(true)?.withIDs() ?: emptyList()
val allActions: List<ActionWithID> = mainActions +
extendedActions +
extendedActions.flatMap { (it.action as? ActionGroup)?.let { model.updateSession.children(it).withIDs() } ?: emptyList() }
val actions = allActions.mapNotNullConcurrent { actionWithID ->
runCatching {
val mode = model.actionMatches(pattern, matcher, it)
val mode = model.actionMatches(pattern, matcher, actionWithID.action)
if (mode != MatchMode.NONE) {
val weight = calcElementWeight(it, pattern, weightMatcher)
return@runCatching(MatchedAction(it, mode, weight))
val weight = calcElementWeight(actionWithID, pattern, weightMatcher)
return@runCatching(MatchedAction(actionWithID.action, mode, weight))
}
else {
if (actionWithID.action is ActionStubBase) actionWithID.id?.let { unmatchedIdsChannel.send(it) }
return@runCatching null
}
return@runCatching null
}.getOrLogException(LOG)
}
unmatchedIdsChannel.close()
val comparator = Comparator.comparing<MatchedAction, Int> { it.weight ?: 0 }.reversed()
return@coroutineScope actions.sortedWith(comparator)
}
private fun unmatchedStubsFlow(pattern: String, allIds: Collection<String>,
presentationProvider: suspend (AnAction) -> Presentation): Flow<MatchedValue> {
private fun CoroutineScope.processUnmatchedStubs(nonMatchedIds: ReceiveChannel<String>,
pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
consumer: suspend (MatchedValue) -> Boolean,
awaitJob: Job
): Job = launch {
val matcher = buildMatcher(pattern)
val weightMatcher = buildWeightMatcher(pattern)
return allIds.asFlow()
.mapNotNull {
val action = actionManager.getActionOrStub(it) ?: return@mapNotNull null
if (action is ActionGroup && !action.isSearchable) return@mapNotNull null
action
}
.filter {
runCatching { (it is ActionStubBase) && model.actionMatches(pattern, matcher, it) == MatchMode.NONE }
.getOrLogException(LOG) == true
}
.transform {
runCatching {
val action = loadAction((it as ActionStubBase).id) ?: return@runCatching
val mode = model.actionMatches(pattern, matcher, action)
if (mode != MatchMode.NONE) {
val weight = calcElementWeight(element = action, pattern = pattern, matcher = weightMatcher)
emit(MatchedAction(action = action, mode = mode, weight = weight))
}
}.getOrLogException(LOG)
}
.buffer(RENDEZVOUS)
.map { matchedAction ->
val matchedActions = nonMatchedIds.toList().mapNotNullConcurrent { id ->
runCatching {
val action = loadAction(id) ?: return@runCatching null
if (action is ActionGroup && !action.isSearchable) return@runCatching null
val mode = model.actionMatches(pattern, matcher, action)
if (mode == MatchMode.NONE) return@runCatching null
val weight = calcElementWeight(element = action, pattern = pattern, matcher = weightMatcher)
val matchedAction = MatchedAction(action = action, mode = mode, weight = weight)
val presentation = presentationProvider(matchedAction.action)
val item = wrapAnAction(matchedAction.action, presentation, matchedAction.mode)
matchItem(item = item, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.ACTION)
}
val matchedValue = matchItem(item = item, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.ACTION)
return@runCatching matchedValue
}.getOrLogException(LOG)
}.sortedWith(MATCHED_VALUE_COMPARATOR)
awaitJob.join() //wait until all items from previous step are processed
LOG.debug { "Process unmatched stubs for \"$pattern\"" }
matchedActions.forEach {
if (!consumer(it)) cancel()
}
}
private fun topHitsFlow(
pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
): Flow<MatchedValue> {
LOG.debug { "Create TopHits flow ($pattern)" }
private fun CoroutineScope.processTopHits(pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
consumer: suspend (MatchedValue) -> Boolean,
awaitJob: Job
): Job = launch {
val project = model.project
val commandAccelerator = SearchTopHitProvider.getTopHitAccelerator()
val matcher = buildWeightMatcher(pattern)
val collector = CollectConsumer<Any>()
return channelFlow {
val collector = Consumer<Any> { item ->
launch {
val obj = (item as? AnAction)?.let { wrapAnAction(action = it, presentation = presentationProvider(it)) } ?: item
val matchedValue = matchItem(item = obj, matcher = matcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT)
send(matchedValue)
}
for (provider in SearchTopHitProvider.EP_NAME.extensionList) {
@Suppress("DEPRECATION")
if (provider is com.intellij.ide.ui.OptionsTopHitProvider.CoveredByToggleActions) {
continue
}
for (provider in SearchTopHitProvider.EP_NAME.extensionList) {
@Suppress("DEPRECATION")
if (provider is com.intellij.ide.ui.OptionsTopHitProvider.CoveredByToggleActions) {
continue
}
if (provider is OptionsSearchTopHitProvider && !pattern.startsWith(commandAccelerator)) {
val prefix = commandAccelerator + provider.getId() + " "
provider.consumeTopHits(pattern = prefix + pattern, collector = collector, project = project)
}
else if (project != null && provider is ProjectLevelProvidersAdapter) {
provider.consumeAllTopHits(
pattern = pattern,
collector = {
send(matchItem(item = it, matcher = matcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT))
},
project = project,
)
}
provider.consumeTopHits(pattern, collector, project)
if (provider is OptionsSearchTopHitProvider && !pattern.startsWith(commandAccelerator)) {
val prefix = commandAccelerator + provider.getId() + " "
provider.consumeTopHits(pattern = prefix + pattern, collector = collector, project = project)
}
else if (project != null && provider is ProjectLevelProvidersAdapter) {
provider.consumeAllTopHits(
pattern = pattern,
collector = { collector.accept(it) },
project = project,
)
}
provider.consumeTopHits(pattern, collector, project)
}
val matchedValues = collector.result.mapConcurrent { item ->
val obj = (item as? AnAction)?.let { wrapAnAction(action = it, presentation = presentationProvider(it)) } ?: item
matchItem(item = obj, matcher = matcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT)
}.sortedWith(MATCHED_VALUE_COMPARATOR)
awaitJob.join() //wait until all items from previous step are processed
LOG.debug { "Process top hits for \"$pattern\"" }
matchedValues.forEach { if (!consumer(it)) cancel() }
}
private fun intentionsFlow(
pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
): Flow<MatchedValue> {
LOG.debug { "Create intentions flow ($pattern)" }
private fun CoroutineScope.processIntentions(pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
consumer: suspend (MatchedValue) -> Boolean,
awaitJob: Job
): Job = launch {
val matcher = buildMatcher(pattern)
val weightMatcher = buildWeightMatcher(pattern)
return channelFlow {
launch {
for ((text, action) in getIntentionsMap()) {
if (model.actionMatches(pattern, matcher, action) != MatchMode.NONE) {
val groupMapping = GroupMapping.createFromText(text, false)
send(ActionWrapper(action, groupMapping, MatchMode.INTENTION, presentationProvider(action)))
}
}
val matchedValues = getIntentionsMap().entries
.mapNotNullConcurrent { (text, action) ->
if (model.actionMatches(pattern, matcher, action) == MatchMode.NONE) return@mapNotNullConcurrent null
val groupMapping = GroupMapping.createFromText(text, false)
val wrapper = ActionWrapper(action, groupMapping, MatchMode.INTENTION, presentationProvider(action))
matchItem(wrapper, weightMatcher, pattern, MatchedValueType.INTENTION)
}
}
.map { matchItem(it, weightMatcher, pattern, MatchedValueType.INTENTION) }
.sortedWith(MATCHED_VALUE_COMPARATOR)
awaitJob.join()
LOG.debug { "Process intentions for \"$pattern\""}
matchedValues.forEach { if (!consumer(it)) cancel() }
}
private suspend fun getIntentionsMap(): Map<String, ApplyIntentionAction> {
@@ -297,12 +271,10 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) {
return intentions
}
private suspend fun optionsFlow(
pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
): Flow<MatchedValue> {
LOG.debug { "Create options flow ($pattern)" }
private fun CoroutineScope.processOptions(pattern: String,
presentationProvider: suspend (AnAction) -> Presentation,
consumer: suspend (MatchedValue) -> Boolean,
awaitJob: Job): Job = launch {
val weightMatcher = buildWeightMatcher(pattern)
val map = model.configurablesNames
@@ -311,66 +283,70 @@ internal class ActionAsyncProvider(private val model: GotoActionModel) {
val words = registrar.getProcessedWords(pattern)
val filterOutInspections = Registry.`is`("go.to.action.filter.out.inspections", true)
@Suppress("RemoveExplicitTypeArguments")
return channelFlow<Any> {
// Use LinkedHashSet to preserve the order of the elements to iterate through them later
val optionDescriptions = LinkedHashSet<OptionDescription>()
if (pattern.isNotBlank()) {
val matcher = buildMatcher(pattern)
for ((key, value) in map) {
if (matcher.matches(value)) {
optionDescriptions.add(OptionDescription(_option = null, configurableId = key, hit = value, path = null, groupName = value))
}
}
}
// Use LinkedHashSet to preserve the order of the elements to iterate through them later
val optionDescriptions = LinkedHashSet<OptionDescription>()
var registrarDescriptions: MutableSet<OptionDescription>? = null
registrar.initialize()
for (word in words) {
val descriptions = registrar.findAcceptableDescriptions(word)
?.filter {
@Suppress("HardCodedStringLiteral")
!(it.path == "ActionManager" || filterOutInspections && it.groupName == "Inspections")
}
?.toHashSet()
if (descriptions.isNullOrEmpty()) {
registrarDescriptions = null
break
}
if (registrarDescriptions == null) {
registrarDescriptions = descriptions
}
else {
registrarDescriptions.retainAll(descriptions)
}
}
// Add registrar's options to the end of the `LinkedHashSet`
// to guarantee that options from the `map` are going to be processed first
if (registrarDescriptions != null) {
optionDescriptions.addAll(registrarDescriptions)
}
if (optionDescriptions.isNotEmpty()) {
val currentHits = HashSet<String>()
val iterator = optionDescriptions.iterator()
for (description in iterator) {
val hit = description.hit
if (hit == null || !currentHits.add(hit.trim())) {
iterator.remove()
}
}
for (description in optionDescriptions) {
for (converter in ActionFromOptionDescriptorProvider.EP.extensionList) {
val action = converter.provide(description) ?: continue
send(ActionWrapper(action, null, MatchMode.NAME, presentationProvider(action)))
}
send(description)
if (pattern.isNotBlank()) {
val matcher = buildMatcher(pattern)
for ((key, value) in map) {
if (matcher.matches(value)) {
optionDescriptions.add(OptionDescription(_option = null, configurableId = key, hit = value, path = null, groupName = value))
}
}
}
.map { matchItem(item = it, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT) }
var registrarDescriptions: MutableSet<OptionDescription>? = null
registrar.initialize()
for (word in words) {
val descriptions = registrar.findAcceptableDescriptions(word)
?.filter {
@Suppress("HardCodedStringLiteral")
!(it.path == "ActionManager" || filterOutInspections && it.groupName == "Inspections")
}
?.toHashSet()
if (descriptions.isNullOrEmpty()) {
registrarDescriptions = null
break
}
if (registrarDescriptions == null) {
registrarDescriptions = descriptions
}
else {
registrarDescriptions.retainAll(descriptions)
}
}
// Add registrar's options to the end of the `LinkedHashSet`
// to guarantee that options from the `map` are going to be processed first
if (registrarDescriptions != null) {
optionDescriptions.addAll(registrarDescriptions)
}
if (optionDescriptions.isNotEmpty()) {
val currentHits = HashSet<String>()
val iterator = optionDescriptions.iterator()
for (description in iterator) {
val hit = description.hit
if (hit == null || !currentHits.add(hit.trim())) {
iterator.remove()
}
}
val matchedValues = optionDescriptions.transformConcurrent { description ->
for (converter in ActionFromOptionDescriptorProvider.EP.extensionList) {
val action = converter.provide(description) ?: continue
val actionWrapper = ActionWrapper(action, null, MatchMode.NAME, presentationProvider(action))
out(matchItem(item = actionWrapper, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT))
}
out(matchItem(item = description, matcher = weightMatcher, pattern = pattern, matchType = MatchedValueType.TOP_HIT))
}.sortedWith(MATCHED_VALUE_COMPARATOR)
awaitJob.join()
LOG.debug { "Process options for \"$pattern\""}
matchedValues.forEach { if (!consumer(it)) cancel() }
}
}
private suspend fun loadAction(id: String): AnAction? {
@@ -409,25 +385,3 @@ private fun abbreviationMatchedValue(wrapper: ActionWrapper, pattern: String, de
}
private data class MatchedAction(@JvmField val action: AnAction, @JvmField val mode: MatchMode, @JvmField val weight: Int?)
/**
* Creates a unified [Flow] by efficiently combining multiple [Flow]s in a flattened, concatenated format.
* The conventional {code flattenConcat} method from the standard coroutines library isn't suitable in this context for the reasons below:
*
* - Initialization of the combined flows is resource-intensive, leading to a significant delay between the invocation of {code collect()}
* and the emission of the first item.
*
* - The standard {code flattenConcat} method generates subsequent flows only after prior data has been sequentially collected. This mechanism forces us to
* wait for completion of each flow's initialization process during the transition between flows.
*
* - Contrastingly, this function initiates a collection of all the flows concurrently using parallel coroutines, thereby accomplishing flow initialization in parallel,
* significantly enhancing the process's efficiency.
* @param flows The list of [Flow]s to flatten and concatenate.
* @return The flattened and concatenated [Flow].
*/
@OptIn(ExperimentalCoroutinesApi::class)
private fun <T> flattenConcat(flows: List<Flow<T>>): Flow<T> = channelFlow {
flows.map { flow -> produce { flow.collect { send(it) } } }
.forEach { ch -> for (i in ch) send(i) }
}

View File

@@ -132,6 +132,18 @@ suspend fun <T, R> Collection<T>.mapConcurrent(
}
}
/**
* Maps each item of [this] collection to another item with [action] concurrently using [transformConcurrent] and collects non-null results.
*/
suspend fun <T, R> Collection<T>.mapNotNullConcurrent(
concurrency: Int = DEFAULT_CONCURRENCY,
action: suspend (T) -> R?
): Collection<R> {
return transformConcurrent(concurrency) { v ->
action(v)?.let { mv -> out(mv) }
}
}
/**
* Filters items of [this] collection according to [action] concurrently using [transformConcurrent].
*/