From 4e03e40d27ecb5b930f00208590e561a3a6600eb Mon Sep 17 00:00:00 2001 From: Nikita Barkov Date: Mon, 26 Aug 2024 21:08:51 +0200 Subject: [PATCH] [perf_tests]Fixed discussions in EventsFlowService GitOrigin-RevId: 83d16a1079511ecc65ed7dd2051203cfc9b9acfc --- .../server/services/EventsFlowService.kt | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt index e6cc9134ae3c..2f6c03fe4d20 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt @@ -11,7 +11,8 @@ import kotlin.concurrent.withLock private val LOG = EventBusLoggerFactory.getLogger(EventsFlowService::class.java) class EventsFlowService { - // Key is processId. Necessary to avoid sending old events for processes that signed up later + // Key is processId. Necessary to avoid sending old events for processes that signed up later. + // The second key is the event name private val subscribersPerProcess = HashMap>() private val subscribersPerProcessLock = ReentrantReadWriteLock() @@ -33,17 +34,13 @@ class EventsFlowService { var timeout: Long val latch = CountDownLatch(subscribersPerProcessLock.readLock().withLock { // One process with many subscribers == one subscriber - val subscribers = subscribersPerProcess.values.filter { it[sharedEventDto.eventName] != null } + val matchingSubscribers = subscribersPerProcess.values.filter { it[sharedEventDto.eventName] != null } // Sum of all timeouts of all subscribers for all processes - timeout = subscribers - .flatMap { subscribersPerProcess -> - subscribersPerProcess.values - .flatMap { subscribers -> - subscribers.subscribers.map { subscriber -> subscriber.timeoutMs } - } - } - .sum() - subscribers.size + timeout = matchingSubscribers + .flatMap { it.values } + .flatMap { it.subscribers } + .sumOf { it.timeoutMs } + matchingSubscribers.size }) eventsLatchLock.writeLock().withLock { eventsLatch[sharedEventDto.eventId] = latch } subscribersPerProcessLock.writeLock().withLock {