[perf_tests]Added timeout for subscribers

GitOrigin-RevId: e3ebdff8edd1d8556aea61b3e8c48b56744acac0
This commit is contained in:
Nikita Barkov
2024-07-11 20:42:17 +02:00
committed by intellij-monorepo-bot
parent 7ec4a45292
commit 3bd262a237
5 changed files with 23 additions and 12 deletions

View File

@@ -35,7 +35,7 @@ class SharedEventsFlow(private val client: EventBusServerClient,
timeout: Duration,
callback: suspend (event: EventType) -> Unit): Boolean {
return localEventsFlow.subscribe(eventClass, subscriber, timeout, callback).also {
if (it) client.newSubscriber(eventClass)
if (it) client.newSubscriber(eventClass, timeout)
}
}

View File

@@ -2,10 +2,11 @@ package com.intellij.tools.ide.starter.bus.shared.client
import com.intellij.tools.ide.starter.bus.events.Event
import com.intellij.tools.ide.starter.bus.shared.dto.SharedEventDto
import kotlin.time.Duration
interface EventBusServerClient {
fun postAndWaitProcessing(sharedEventDto: SharedEventDto): Boolean
fun newSubscriber(eventClass: Class<out Event>)
fun newSubscriber(eventClass: Class<out Event>, timeout: Duration)
fun getEvents(): Map<String, List<Pair<String, Event>>?>
fun processedEvent(eventName: String)
fun endServerProcess()

View File

@@ -14,6 +14,7 @@ import java.rmi.ServerException
import java.util.*
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.time.Duration
private val LOG = EventBusLoggerFactory.getLogger(LocalEventBusServerClient::class.java)
@@ -70,14 +71,14 @@ class LocalEventBusServerClient(val server: LocalEventBusServer) : EventBusServe
return post("postAndWaitProcessing", objectMapper.writeValueAsString(sharedEventDto)).toBoolean()
}
override fun newSubscriber(eventClass: Class<out Event>) {
override fun newSubscriber(eventClass: Class<out Event>, timeout: Duration) {
val simpleName = eventClass.simpleName
eventClassesLock.writeLock().withLock {
eventClasses[simpleName] = eventClass.name
}
LOG.info("New subscriber $simpleName")
post("newSubscriber", objectMapper.writeValueAsString(SubscriberDto(simpleName, PROCESS_ID))).toBoolean()
post("newSubscriber", objectMapper.writeValueAsString(SubscriberDto(simpleName, PROCESS_ID, timeout.inWholeMilliseconds))).toBoolean()
}
override fun getEvents(): Map<String, List<Pair<String, Event>>?> {

View File

@@ -1,3 +1,3 @@
package com.intellij.tools.ide.starter.bus.shared.dto
data class SubscriberDto(val eventName: String, val processId: String)
data class SubscriberDto(val eventName: String, val processId: String, val timeoutMs: Long)

View File

@@ -4,6 +4,7 @@ import com.intellij.tools.ide.starter.bus.logger.EventBusLoggerFactory
import com.intellij.tools.ide.starter.bus.shared.dto.SharedEventDto
import com.intellij.tools.ide.starter.bus.shared.dto.SubscriberDto
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
@@ -11,7 +12,7 @@ private val LOG = EventBusLoggerFactory.getLogger(EventsFlowService::class.java)
class EventsFlowService {
// Key is processId. Necessary to avoid sending old events for processes that signed up later
private val eventsPerProcess = HashMap<String, HashMap<String, MutableList<SharedEventDto>>>()
private val eventsPerProcess = HashMap<String, HashMap<String, EventsWithTimeout>>()
private val eventsPerProcessLock = ReentrantReadWriteLock()
private val eventsLatch = HashMap<String, CountDownLatch>()
@@ -29,15 +30,19 @@ class EventsFlowService {
LOG.debug("Before synchronized")
synchronized(getLock(sharedEventDto.eventId)) {
LOG.debug("Start synchronized")
var timeout: Long
val latch = CountDownLatch(eventsPerProcessLock.readLock().withLock {
eventsPerProcess.values.filter { it[sharedEventDto.eventName] != null }.size
val subscribers = eventsPerProcess.values.filter { it[sharedEventDto.eventName] != null }
// Sum of all subscriber timeouts
timeout = subscribers.flatMap { subscribersPerProcess -> subscribersPerProcess.values.map { it.timeoutMs } }.sum()
subscribers.size
})
eventsLatchLock.writeLock().withLock { eventsLatch[sharedEventDto.eventId] = latch }
eventsPerProcessLock.writeLock().withLock {
eventsPerProcess.values.forEach { it[sharedEventDto.eventName]?.add(sharedEventDto) }
eventsPerProcess.values.forEach { it[sharedEventDto.eventName]?.events?.add(sharedEventDto) }
}
LOG.debug("Before latch awaiting. Count ${latch.count}")
latch.await()
latch.await(timeout, TimeUnit.MILLISECONDS)
LOG.debug("After latch awaiting")
}
}
@@ -48,13 +53,15 @@ class EventsFlowService {
eventsPerProcessLock.writeLock().withLock {
eventsPerProcess
.computeIfAbsent(subscriber.processId) { HashMap() }
.computeIfAbsent(subscriber.eventName) { mutableListOf() }
.computeIfAbsent(subscriber.eventName) { EventsWithTimeout(subscriber.timeoutMs, mutableListOf()) }
}
}
}
fun getEvents(processId: String): HashMap<String, MutableList<SharedEventDto>> {
return eventsPerProcessLock.readLock().withLock { eventsPerProcess.getOrDefault(processId, HashMap()) }
fun getEvents(processId: String): Map<String, MutableList<SharedEventDto>> {
return eventsPerProcessLock.readLock().withLock {
eventsPerProcess.getOrDefault(processId, HashMap()).map { it.key to it.value.events }.toMap()
}
}
fun processedEvent(eventId: String) {
@@ -72,4 +79,6 @@ class EventsFlowService {
}
lockByEvent.clear()
}
private data class EventsWithTimeout(val timeoutMs: Long, val events: MutableList<SharedEventDto>)
}