diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/SharedEventsFlow.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/SharedEventsFlow.kt index dbae8e71f34d..a47521786130 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/SharedEventsFlow.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/SharedEventsFlow.kt @@ -35,7 +35,7 @@ class SharedEventsFlow(private val client: EventBusServerClient, timeout: Duration, callback: suspend (event: EventType) -> Unit): Boolean { return localEventsFlow.subscribe(eventClass, subscriber, timeout, callback).also { - if (it) client.newSubscriber(eventClass) + if (it) client.newSubscriber(eventClass, timeout) } } diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/EventBusServerClient.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/EventBusServerClient.kt index 82616d9745c6..466afa8c6e7a 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/EventBusServerClient.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/EventBusServerClient.kt @@ -2,10 +2,11 @@ package com.intellij.tools.ide.starter.bus.shared.client import com.intellij.tools.ide.starter.bus.events.Event import com.intellij.tools.ide.starter.bus.shared.dto.SharedEventDto +import kotlin.time.Duration interface EventBusServerClient { fun postAndWaitProcessing(sharedEventDto: SharedEventDto): Boolean - fun newSubscriber(eventClass: Class) + fun newSubscriber(eventClass: Class, timeout: Duration) fun getEvents(): Map>?> fun processedEvent(eventName: String) fun endServerProcess() diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/LocalEventBusServerClient.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/LocalEventBusServerClient.kt index f0b86c348bdd..9cd228f62e4c 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/LocalEventBusServerClient.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/client/LocalEventBusServerClient.kt @@ -14,6 +14,7 @@ import java.rmi.ServerException import java.util.* import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.withLock +import kotlin.time.Duration private val LOG = EventBusLoggerFactory.getLogger(LocalEventBusServerClient::class.java) @@ -70,14 +71,14 @@ class LocalEventBusServerClient(val server: LocalEventBusServer) : EventBusServe return post("postAndWaitProcessing", objectMapper.writeValueAsString(sharedEventDto)).toBoolean() } - override fun newSubscriber(eventClass: Class) { + override fun newSubscriber(eventClass: Class, timeout: Duration) { val simpleName = eventClass.simpleName eventClassesLock.writeLock().withLock { eventClasses[simpleName] = eventClass.name } LOG.info("New subscriber $simpleName") - post("newSubscriber", objectMapper.writeValueAsString(SubscriberDto(simpleName, PROCESS_ID))).toBoolean() + post("newSubscriber", objectMapper.writeValueAsString(SubscriberDto(simpleName, PROCESS_ID, timeout.inWholeMilliseconds))).toBoolean() } override fun getEvents(): Map>?> { diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/dto/SubscriberDto.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/dto/SubscriberDto.kt index b36904574d36..810e78f32f1f 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/dto/SubscriberDto.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/dto/SubscriberDto.kt @@ -1,3 +1,3 @@ package com.intellij.tools.ide.starter.bus.shared.dto -data class SubscriberDto(val eventName: String, val processId: String) \ No newline at end of file +data class SubscriberDto(val eventName: String, val processId: String, val timeoutMs: Long) \ No newline at end of file diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt index 84e171140c0c..aeb6d786c2f5 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/shared/server/services/EventsFlowService.kt @@ -4,6 +4,7 @@ import com.intellij.tools.ide.starter.bus.logger.EventBusLoggerFactory import com.intellij.tools.ide.starter.bus.shared.dto.SharedEventDto import com.intellij.tools.ide.starter.bus.shared.dto.SubscriberDto import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.withLock @@ -11,7 +12,7 @@ private val LOG = EventBusLoggerFactory.getLogger(EventsFlowService::class.java) class EventsFlowService { // Key is processId. Necessary to avoid sending old events for processes that signed up later - private val eventsPerProcess = HashMap>>() + private val eventsPerProcess = HashMap>() private val eventsPerProcessLock = ReentrantReadWriteLock() private val eventsLatch = HashMap() @@ -29,15 +30,19 @@ class EventsFlowService { LOG.debug("Before synchronized") synchronized(getLock(sharedEventDto.eventId)) { LOG.debug("Start synchronized") + var timeout: Long val latch = CountDownLatch(eventsPerProcessLock.readLock().withLock { - eventsPerProcess.values.filter { it[sharedEventDto.eventName] != null }.size + val subscribers = eventsPerProcess.values.filter { it[sharedEventDto.eventName] != null } + // Sum of all subscriber timeouts + timeout = subscribers.flatMap { subscribersPerProcess -> subscribersPerProcess.values.map { it.timeoutMs } }.sum() + subscribers.size }) eventsLatchLock.writeLock().withLock { eventsLatch[sharedEventDto.eventId] = latch } eventsPerProcessLock.writeLock().withLock { - eventsPerProcess.values.forEach { it[sharedEventDto.eventName]?.add(sharedEventDto) } + eventsPerProcess.values.forEach { it[sharedEventDto.eventName]?.events?.add(sharedEventDto) } } LOG.debug("Before latch awaiting. Count ${latch.count}") - latch.await() + latch.await(timeout, TimeUnit.MILLISECONDS) LOG.debug("After latch awaiting") } } @@ -48,13 +53,15 @@ class EventsFlowService { eventsPerProcessLock.writeLock().withLock { eventsPerProcess .computeIfAbsent(subscriber.processId) { HashMap() } - .computeIfAbsent(subscriber.eventName) { mutableListOf() } + .computeIfAbsent(subscriber.eventName) { EventsWithTimeout(subscriber.timeoutMs, mutableListOf()) } } } } - fun getEvents(processId: String): HashMap> { - return eventsPerProcessLock.readLock().withLock { eventsPerProcess.getOrDefault(processId, HashMap()) } + fun getEvents(processId: String): Map> { + return eventsPerProcessLock.readLock().withLock { + eventsPerProcess.getOrDefault(processId, HashMap()).map { it.key to it.value.events }.toMap() + } } fun processedEvent(eventId: String) { @@ -72,4 +79,6 @@ class EventsFlowService { } lockByEvent.clear() } + + private data class EventsWithTimeout(val timeoutMs: Long, val events: MutableList) } \ No newline at end of file