From d3c82b96e3e17febbac7937ca5f01144728efdd3 Mon Sep 17 00:00:00 2001 From: Aydar Mukhametzyanov Date: Wed, 30 Jul 2025 13:39:41 +0200 Subject: [PATCH] IJPL-198927 Search Everywhere: add non-blocked providers to allow balancing on the frontend (cherry picked from commit 9e36a295a584096cdfd847b51dc3da1c9375e061) IJ-CR-170901 GitOrigin-RevId: 6ba1c1e3d120b871f0ba4fddca8f90c3b9af1fda --- .../backend/src/impl/SeBackendService.kt | 5 +- .../resultsProcessing/SeResultsAccumulator.kt | 9 +-- .../src/resultsProcessing/SeTabDelegate.kt | 21 ++++++- .../src/utils/SeResultsCountBalancer.kt | 59 ++++++++++++++----- 4 files changed, 67 insertions(+), 27 deletions(-) diff --git a/platform/searchEverywhere/backend/src/impl/SeBackendService.kt b/platform/searchEverywhere/backend/src/impl/SeBackendService.kt index a13be39d1ecb..20fbc6bb3176 100644 --- a/platform/searchEverywhere/backend/src/impl/SeBackendService.kt +++ b/platform/searchEverywhere/backend/src/impl/SeBackendService.kt @@ -58,8 +58,9 @@ class SeBackendService(val project: Project, private val coroutineScope: Corouti val splitProviderIds = providerHolder.splitToEssentialAndNonEssential(providerIds) val resultsBalancer = SeResultsCountBalancer("BE", - splitProviderIds[SeProviderIdUtils.ESSENTIAL_KEY]!!, - splitProviderIds[SeProviderIdUtils.NON_ESSENTIAL_KEY]!!) + nonBlockedProviderIds = emptyList(), + highPriorityProviderIds = splitProviderIds[SeProviderIdUtils.ESSENTIAL_KEY]!!, + lowPriorityProviderIds = splitProviderIds[SeProviderIdUtils.NON_ESSENTIAL_KEY]!!) SeLog.log(SeLog.ITEM_EMIT) { "Backend will request items from providers: ${providerIds.joinToString(", ")}" } diff --git a/platform/searchEverywhere/frontend/src/resultsProcessing/SeResultsAccumulator.kt b/platform/searchEverywhere/frontend/src/resultsProcessing/SeResultsAccumulator.kt index c76eda31c34f..f76bbd4f202d 100644 --- a/platform/searchEverywhere/frontend/src/resultsProcessing/SeResultsAccumulator.kt +++ b/platform/searchEverywhere/frontend/src/resultsProcessing/SeResultsAccumulator.kt @@ -9,18 +9,11 @@ import kotlinx.coroutines.sync.withLock import org.jetbrains.annotations.ApiStatus @ApiStatus.Internal -class SeResultsAccumulator(providerIds: List, nonBlockedProviderIds: List) { +class SeResultsAccumulator() { private val mutex = Mutex() private val items = mutableMapOf() - //private val balancer = SeResultsCountBalancer("FE", providerIds, nonBlockedProviderIds) - - suspend fun end(providerId: SeProviderId) { - //balancer.end(providerId) - } suspend fun add(newItem: SeItemData): SeResultEvent? { - //balancer.add(newItem) - mutex.withLock { val event = calculateEventType(newItem) diff --git a/platform/searchEverywhere/frontend/src/resultsProcessing/SeTabDelegate.kt b/platform/searchEverywhere/frontend/src/resultsProcessing/SeTabDelegate.kt index 7f68a7262835..9daa4d399e40 100644 --- a/platform/searchEverywhere/frontend/src/resultsProcessing/SeTabDelegate.kt +++ b/platform/searchEverywhere/frontend/src/resultsProcessing/SeTabDelegate.kt @@ -18,6 +18,7 @@ import com.intellij.platform.searchEverywhere.providers.SeLocalItemDataProvider import com.intellij.platform.searchEverywhere.providers.SeLog import com.intellij.platform.searchEverywhere.providers.SeLog.ITEM_EMIT import com.intellij.platform.searchEverywhere.providers.target.SeTypeVisibilityStatePresentation +import com.intellij.platform.searchEverywhere.utils.SeResultsCountBalancer import com.intellij.platform.searchEverywhere.utils.initAsync import fleet.kernel.DurableRef import kotlinx.coroutines.* @@ -46,10 +47,23 @@ class SeTabDelegate( return flow { val initializedProviders = providers.getValue() - val accumulator = SeResultsAccumulator(initializedProviders.getLocalProviderIds(), initializedProviders.getRemoteProviderIds()) + + val remoteProviderIds = initializedProviders.getRemoteProviderIds() + val allEssentialProviderIds = initializedProviders.essentialProviderIds + val localProviders = initializedProviders.getLocalProviderIds().toSet() + val localEssentialProviders = allEssentialProviderIds.intersect(localProviders) + val localNonEssentialProviders = localProviders.subtract(allEssentialProviderIds) + + // We shouldn't block remoteProviderIds because they may miss some results after equality check on the Backend + val balancer = SeResultsCountBalancer("FE", + nonBlockedProviderIds = remoteProviderIds, + highPriorityProviderIds = localEssentialProviders, + lowPriorityProviderIds = localNonEssentialProviders) + + val accumulator = SeResultsAccumulator() disabledProviders?.forEach { - accumulator.end(it) + balancer.end(it) emit(SeResultEndEvent(it)) } @@ -57,11 +71,12 @@ class SeTabDelegate( when (transferEvent) { is SeTransferEnd -> { SeLog.log(ITEM_EMIT) { "Tab delegate for ${logLabel} ends: ${transferEvent.providerId.value}" } - accumulator.end(transferEvent.providerId) + balancer.end(transferEvent.providerId) SeResultEndEvent(transferEvent.providerId) } is SeTransferItem -> { val itemData = transferEvent.itemData + balancer.add(itemData) val checkedItemData = if (equalityChecker != null) { equalityChecker.checkAndUpdateIfNeeded(itemData) diff --git a/platform/searchEverywhere/shared/src/utils/SeResultsCountBalancer.kt b/platform/searchEverywhere/shared/src/utils/SeResultsCountBalancer.kt index beb77c51bcfe..4be8fd4423e2 100644 --- a/platform/searchEverywhere/shared/src/utils/SeResultsCountBalancer.kt +++ b/platform/searchEverywhere/shared/src/utils/SeResultsCountBalancer.kt @@ -9,19 +9,42 @@ import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withLock import org.jetbrains.annotations.ApiStatus import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.atomics.AtomicInt +import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.concurrent.atomics.fetchAndIncrement +/** + * Balances the number of results from different providers. + * + * nonBlockedProviderIds - providers that are not limited, they can produce any number of the results without being blocked. + * + * highPriorityProviderIds - providers that are limited by the maximum number of results produced by any of the providers from nonBlockedProviderIds, + * and by a condition that the difference between the numbers of results produced by providers from highPriorityProviderIds is less than or equal to DIFFERENCE_LIMIT. + * + * lowPriorityProviderIds - providers that are limited by the maximum number of results produced by any of the providers from nonBlockedProviderIds, + * and by the minimum number of results produced by providers from highPriorityProviderIds, but are independent from each other + */ +@OptIn(ExperimentalAtomicApi::class) @ApiStatus.Internal -class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderIds: Collection, lowPriorityProviderIds: Collection) { +class SeResultsCountBalancer(private val logLabel: String, + nonBlockedProviderIds: Collection, + highPriorityProviderIds: Collection, + lowPriorityProviderIds: Collection) { private val mutex = Mutex() + private val nonBlockedRunning = nonBlockedProviderIds.toMutableSet() + private val nonBlockedCounts = HashMap().apply { + this.putAll(nonBlockedProviderIds.map { it to AtomicInt(-DIFFERENCE_LIMIT) }) + } + private val highPriorityRunning = highPriorityProviderIds.toMutableSet() private val highPriorityPermits = HashMap().apply { - this.putAll(highPriorityProviderIds.map { it to Semaphore(ELEMENTS_LIMIT) }) + this.putAll(highPriorityProviderIds.map { it to Semaphore(DIFFERENCE_LIMIT) }) } private val lowPriorityRunning = lowPriorityProviderIds.toMutableSet() private val lowPriorityPermits = HashMap().apply { - this.putAll(lowPriorityProviderIds.map { it to RelaxedSemaphore(ELEMENTS_LIMIT) }) + this.putAll(lowPriorityProviderIds.map { it to RelaxedSemaphore(DIFFERENCE_LIMIT) }) } suspend fun end(providerId: SeProviderId) { @@ -30,7 +53,9 @@ class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderI } suspend fun add(newItem: SeItemData): SeItemData { - highPriorityPermits[newItem.providerId]?.acquire() ?: lowPriorityPermits[newItem.providerId]?.acquire() + highPriorityPermits[newItem.providerId]?.acquire() + ?: lowPriorityPermits[newItem.providerId]?.acquire() + ?: nonBlockedCounts[newItem.providerId]?.fetchAndIncrement() balancePermits() return newItem } @@ -38,27 +63,33 @@ class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderI private suspend fun balancePermits(providerToRemove: SeProviderId? = null) { mutex.withLock { providerToRemove?.let { + nonBlockedRunning.remove(it) highPriorityRunning.remove(it) lowPriorityRunning.remove(it) } - if (highPriorityRunning.isEmpty()) { + if (nonBlockedRunning.isEmpty() && highPriorityRunning.isEmpty()) { lowPriorityRunning.forEach { lowPriorityPermits[it]?.makeItFreeToGo() } reportPermits() return } + val nonBlockedCountMaximum = nonBlockedRunning.mapNotNull { nonBlockedCounts[it]?.load() }.maxOrNull() ?: Int.MAX_VALUE val highPriorityToAvailablePermits = highPriorityRunning.associateWith { highPriorityPermits[it]!!.availablePermits } - if (highPriorityToAvailablePermits.values.all { it == 0 }) { + if (highPriorityToAvailablePermits.values.all { it == 0 } && nonBlockedCountMaximum >= 0) { + nonBlockedRunning.forEach { providerId -> + nonBlockedCounts[providerId]?.fetchAndAdd(-DIFFERENCE_LIMIT) + } + highPriorityToAvailablePermits.keys.forEach { providerId -> - repeat(ELEMENTS_LIMIT) { + repeat(DIFFERENCE_LIMIT) { highPriorityPermits[providerId]?.release() } } lowPriorityRunning.forEach { providerId -> - lowPriorityPermits[providerId]?.release(ELEMENTS_LIMIT) + lowPriorityPermits[providerId]?.release(DIFFERENCE_LIMIT) } } @@ -68,21 +99,21 @@ class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderI private suspend fun reportPermits() { SeLog.logSuspendable(SeLog.BALANCING) { - if (highPriorityRunning.isEmpty()) { - return@logSuspendable "($logLabel) No running high priority providers." - } - + val nonBlocked = nonBlockedRunning.associateWith { nonBlockedCounts[it]!!.load() }.map { "${it.key.value}: ${it.value}" }.joinToString(", ") val highPriority = highPriorityRunning.associateWith { highPriorityPermits[it]!!.availablePermits }.map { "${it.key.value}: ${it.value}" }.joinToString(", ") val lowPriority = lowPriorityRunning.associateWith { lowPriorityPermits[it]!!.availablePermits() }.map { "${it.key.value}: ${it.value}" }.joinToString(", ") - "($logLabel) Available permits: high: $highPriority, low: $lowPriority" + "($logLabel) Available permits: nonBlocked: $nonBlocked, high: $highPriority, low: $lowPriority" } } companion object { - private const val ELEMENTS_LIMIT: Int = 15 + private const val DIFFERENCE_LIMIT: Int = 15 } } +/** + * A semaphore that allows releasing more permits, than the initial permits value. + */ @ApiStatus.Internal private class RelaxedSemaphore(permits: Int) { private val mutex = Mutex()