IJPL-198927 Search Everywhere: add non-blocked providers to allow balancing on the frontend

(cherry picked from commit 9e36a295a584096cdfd847b51dc3da1c9375e061)

IJ-CR-170901

GitOrigin-RevId: 6ba1c1e3d120b871f0ba4fddca8f90c3b9af1fda
This commit is contained in:
Aydar Mukhametzyanov
2025-07-30 13:39:41 +02:00
committed by intellij-monorepo-bot
parent acb531a019
commit d3c82b96e3
4 changed files with 67 additions and 27 deletions

View File

@@ -58,8 +58,9 @@ class SeBackendService(val project: Project, private val coroutineScope: Corouti
val splitProviderIds = providerHolder.splitToEssentialAndNonEssential(providerIds)
val resultsBalancer = SeResultsCountBalancer("BE",
splitProviderIds[SeProviderIdUtils.ESSENTIAL_KEY]!!,
splitProviderIds[SeProviderIdUtils.NON_ESSENTIAL_KEY]!!)
nonBlockedProviderIds = emptyList(),
highPriorityProviderIds = splitProviderIds[SeProviderIdUtils.ESSENTIAL_KEY]!!,
lowPriorityProviderIds = splitProviderIds[SeProviderIdUtils.NON_ESSENTIAL_KEY]!!)
SeLog.log(SeLog.ITEM_EMIT) { "Backend will request items from providers: ${providerIds.joinToString(", ")}" }

View File

@@ -9,18 +9,11 @@ import kotlinx.coroutines.sync.withLock
import org.jetbrains.annotations.ApiStatus
@ApiStatus.Internal
class SeResultsAccumulator(providerIds: List<SeProviderId>, nonBlockedProviderIds: List<SeProviderId>) {
class SeResultsAccumulator() {
private val mutex = Mutex()
private val items = mutableMapOf<String, SeItemData>()
//private val balancer = SeResultsCountBalancer("FE", providerIds, nonBlockedProviderIds)
suspend fun end(providerId: SeProviderId) {
//balancer.end(providerId)
}
suspend fun add(newItem: SeItemData): SeResultEvent? {
//balancer.add(newItem)
mutex.withLock {
val event = calculateEventType(newItem)

View File

@@ -18,6 +18,7 @@ import com.intellij.platform.searchEverywhere.providers.SeLocalItemDataProvider
import com.intellij.platform.searchEverywhere.providers.SeLog
import com.intellij.platform.searchEverywhere.providers.SeLog.ITEM_EMIT
import com.intellij.platform.searchEverywhere.providers.target.SeTypeVisibilityStatePresentation
import com.intellij.platform.searchEverywhere.utils.SeResultsCountBalancer
import com.intellij.platform.searchEverywhere.utils.initAsync
import fleet.kernel.DurableRef
import kotlinx.coroutines.*
@@ -46,10 +47,23 @@ class SeTabDelegate(
return flow {
val initializedProviders = providers.getValue()
val accumulator = SeResultsAccumulator(initializedProviders.getLocalProviderIds(), initializedProviders.getRemoteProviderIds())
val remoteProviderIds = initializedProviders.getRemoteProviderIds()
val allEssentialProviderIds = initializedProviders.essentialProviderIds
val localProviders = initializedProviders.getLocalProviderIds().toSet()
val localEssentialProviders = allEssentialProviderIds.intersect(localProviders)
val localNonEssentialProviders = localProviders.subtract(allEssentialProviderIds)
// We shouldn't block remoteProviderIds because they may miss some results after equality check on the Backend
val balancer = SeResultsCountBalancer("FE",
nonBlockedProviderIds = remoteProviderIds,
highPriorityProviderIds = localEssentialProviders,
lowPriorityProviderIds = localNonEssentialProviders)
val accumulator = SeResultsAccumulator()
disabledProviders?.forEach {
accumulator.end(it)
balancer.end(it)
emit(SeResultEndEvent(it))
}
@@ -57,11 +71,12 @@ class SeTabDelegate(
when (transferEvent) {
is SeTransferEnd -> {
SeLog.log(ITEM_EMIT) { "Tab delegate for ${logLabel} ends: ${transferEvent.providerId.value}" }
accumulator.end(transferEvent.providerId)
balancer.end(transferEvent.providerId)
SeResultEndEvent(transferEvent.providerId)
}
is SeTransferItem -> {
val itemData = transferEvent.itemData
balancer.add(itemData)
val checkedItemData = if (equalityChecker != null) {
equalityChecker.checkAndUpdateIfNeeded(itemData)

View File

@@ -9,19 +9,42 @@ import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import org.jetbrains.annotations.ApiStatus
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.atomics.AtomicInt
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.concurrent.atomics.fetchAndIncrement
/**
* Balances the number of results from different providers.
*
* nonBlockedProviderIds - providers that are not limited, they can produce any number of the results without being blocked.
*
* highPriorityProviderIds - providers that are limited by the maximum number of results produced by any of the providers from nonBlockedProviderIds,
* and by a condition that the difference between the numbers of results produced by providers from highPriorityProviderIds is less than or equal to DIFFERENCE_LIMIT.
*
* lowPriorityProviderIds - providers that are limited by the maximum number of results produced by any of the providers from nonBlockedProviderIds,
* and by the minimum number of results produced by providers from highPriorityProviderIds, but are independent from each other
*/
@OptIn(ExperimentalAtomicApi::class)
@ApiStatus.Internal
class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderIds: Collection<SeProviderId>, lowPriorityProviderIds: Collection<SeProviderId>) {
class SeResultsCountBalancer(private val logLabel: String,
nonBlockedProviderIds: Collection<SeProviderId>,
highPriorityProviderIds: Collection<SeProviderId>,
lowPriorityProviderIds: Collection<SeProviderId>) {
private val mutex = Mutex()
private val nonBlockedRunning = nonBlockedProviderIds.toMutableSet()
private val nonBlockedCounts = HashMap<SeProviderId, AtomicInt>().apply {
this.putAll(nonBlockedProviderIds.map { it to AtomicInt(-DIFFERENCE_LIMIT) })
}
private val highPriorityRunning = highPriorityProviderIds.toMutableSet()
private val highPriorityPermits = HashMap<SeProviderId, Semaphore>().apply {
this.putAll(highPriorityProviderIds.map { it to Semaphore(ELEMENTS_LIMIT) })
this.putAll(highPriorityProviderIds.map { it to Semaphore(DIFFERENCE_LIMIT) })
}
private val lowPriorityRunning = lowPriorityProviderIds.toMutableSet()
private val lowPriorityPermits = HashMap<SeProviderId, RelaxedSemaphore>().apply {
this.putAll(lowPriorityProviderIds.map { it to RelaxedSemaphore(ELEMENTS_LIMIT) })
this.putAll(lowPriorityProviderIds.map { it to RelaxedSemaphore(DIFFERENCE_LIMIT) })
}
suspend fun end(providerId: SeProviderId) {
@@ -30,7 +53,9 @@ class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderI
}
suspend fun add(newItem: SeItemData): SeItemData {
highPriorityPermits[newItem.providerId]?.acquire() ?: lowPriorityPermits[newItem.providerId]?.acquire()
highPriorityPermits[newItem.providerId]?.acquire()
?: lowPriorityPermits[newItem.providerId]?.acquire()
?: nonBlockedCounts[newItem.providerId]?.fetchAndIncrement()
balancePermits()
return newItem
}
@@ -38,27 +63,33 @@ class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderI
private suspend fun balancePermits(providerToRemove: SeProviderId? = null) {
mutex.withLock {
providerToRemove?.let {
nonBlockedRunning.remove(it)
highPriorityRunning.remove(it)
lowPriorityRunning.remove(it)
}
if (highPriorityRunning.isEmpty()) {
if (nonBlockedRunning.isEmpty() && highPriorityRunning.isEmpty()) {
lowPriorityRunning.forEach { lowPriorityPermits[it]?.makeItFreeToGo() }
reportPermits()
return
}
val nonBlockedCountMaximum = nonBlockedRunning.mapNotNull { nonBlockedCounts[it]?.load() }.maxOrNull() ?: Int.MAX_VALUE
val highPriorityToAvailablePermits = highPriorityRunning.associateWith { highPriorityPermits[it]!!.availablePermits }
if (highPriorityToAvailablePermits.values.all { it == 0 }) {
if (highPriorityToAvailablePermits.values.all { it == 0 } && nonBlockedCountMaximum >= 0) {
nonBlockedRunning.forEach { providerId ->
nonBlockedCounts[providerId]?.fetchAndAdd(-DIFFERENCE_LIMIT)
}
highPriorityToAvailablePermits.keys.forEach { providerId ->
repeat(ELEMENTS_LIMIT) {
repeat(DIFFERENCE_LIMIT) {
highPriorityPermits[providerId]?.release()
}
}
lowPriorityRunning.forEach { providerId ->
lowPriorityPermits[providerId]?.release(ELEMENTS_LIMIT)
lowPriorityPermits[providerId]?.release(DIFFERENCE_LIMIT)
}
}
@@ -68,21 +99,21 @@ class SeResultsCountBalancer(private val logLabel: String, highPriorityProviderI
private suspend fun reportPermits() {
SeLog.logSuspendable(SeLog.BALANCING) {
if (highPriorityRunning.isEmpty()) {
return@logSuspendable "($logLabel) No running high priority providers."
}
val nonBlocked = nonBlockedRunning.associateWith { nonBlockedCounts[it]!!.load() }.map { "${it.key.value}: ${it.value}" }.joinToString(", ")
val highPriority = highPriorityRunning.associateWith { highPriorityPermits[it]!!.availablePermits }.map { "${it.key.value}: ${it.value}" }.joinToString(", ")
val lowPriority = lowPriorityRunning.associateWith { lowPriorityPermits[it]!!.availablePermits() }.map { "${it.key.value}: ${it.value}" }.joinToString(", ")
"($logLabel) Available permits: high: $highPriority, low: $lowPriority"
"($logLabel) Available permits: nonBlocked: $nonBlocked, high: $highPriority, low: $lowPriority"
}
}
companion object {
private const val ELEMENTS_LIMIT: Int = 15
private const val DIFFERENCE_LIMIT: Int = 15
}
}
/**
* A semaphore that allows releasing more permits, than the initial permits value.
*/
@ApiStatus.Internal
private class RelaxedSemaphore(permits: Int) {
private val mutex = Mutex()