[perf_tests]Fixed discussions in EventsFlowService

GitOrigin-RevId: 83d16a1079511ecc65ed7dd2051203cfc9b9acfc
This commit is contained in:
Nikita Barkov
2024-08-26 21:08:51 +02:00
committed by intellij-monorepo-bot
parent ea9d95de6d
commit 4e03e40d27

View File

@@ -11,7 +11,8 @@ import kotlin.concurrent.withLock
private val LOG = EventBusLoggerFactory.getLogger(EventsFlowService::class.java)
class EventsFlowService {
// Key is processId. Necessary to avoid sending old events for processes that signed up later
// Key is processId. Necessary to avoid sending old events for processes that signed up later.
// The second key is the event name
private val subscribersPerProcess = HashMap<String, HashMap<String, SubscribersWithEvents>>()
private val subscribersPerProcessLock = ReentrantReadWriteLock()
@@ -33,17 +34,13 @@ class EventsFlowService {
var timeout: Long
val latch = CountDownLatch(subscribersPerProcessLock.readLock().withLock {
// One process with many subscribers == one subscriber
val subscribers = subscribersPerProcess.values.filter { it[sharedEventDto.eventName] != null }
val matchingSubscribers = subscribersPerProcess.values.filter { it[sharedEventDto.eventName] != null }
// Sum of all timeouts of all subscribers for all processes
timeout = subscribers
.flatMap { subscribersPerProcess ->
subscribersPerProcess.values
.flatMap { subscribers ->
subscribers.subscribers.map { subscriber -> subscriber.timeoutMs }
}
}
.sum()
subscribers.size
timeout = matchingSubscribers
.flatMap { it.values }
.flatMap { it.subscribers }
.sumOf { it.timeoutMs }
matchingSubscribers.size
})
eventsLatchLock.writeLock().withLock { eventsLatch[sharedEventDto.eventId] = latch }
subscribersPerProcessLock.writeLock().withLock {