[perf_tests] Event bus. use coroutines instead of CompletableFeatures. Implemented UnsubscribeAllTest

GitOrigin-RevId: 8e08620e907368501e16c60ff7bb5ed9738b1545
This commit is contained in:
Nikita Barkov
2024-08-13 11:17:59 +02:00
committed by intellij-monorepo-bot
parent 37562bfadb
commit 97034d5d2e
2 changed files with 93 additions and 19 deletions

View File

@@ -3,15 +3,9 @@ package com.intellij.tools.ide.starter.bus.local
import com.intellij.tools.ide.starter.bus.EventsFlow
import com.intellij.tools.ide.starter.bus.Subscriber
import com.intellij.tools.ide.starter.bus.events.Event
import com.intellij.tools.ide.starter.bus.exceptions.EventBusException
import com.intellij.tools.ide.starter.bus.logger.EventBusLoggerFactory
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asExecutor
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import kotlinx.coroutines.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.jvm.internal.CallableReference
@@ -26,6 +20,8 @@ class LocalEventsFlow : EventsFlow {
// Therefore, CopyOnWriteArrayList is using
private val subscribers = HashMap<String, CopyOnWriteArrayList<Subscriber<out Event>>>()
private val subscribersLock = ReentrantReadWriteLock()
private var parentJob = Job()
private var scope = CoroutineScope(Dispatchers.IO) + parentJob
// In case using class as subscriber. eg MyClass::class
override fun getSubscriberObject(subscriber: Any) = if (subscriber is CallableReference) subscriber::class.toString() else subscriber
@@ -67,23 +63,32 @@ class LocalEventsFlow : EventsFlow {
(subscribersForEvent as? CopyOnWriteArrayList<Subscriber<T>>)
?.map { subscriber ->
LOG.debug("Post event $eventClassName for $subscriber.")
CompletableFuture.runAsync({
LOG.debug("Start execution $eventClassName for $subscriber")
runBlocking { subscriber.callback(event) }
LOG.debug("Finished execution $eventClassName for $subscriber")
}, Dispatchers.IO.asExecutor())
.orTimeout(subscriber.timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
.exceptionally { throwable ->
throw EventBusException(eventClassName, subscriber.subscriberName.toString(),
subscribersForEvent.map { it.subscriberName },
throwable)
scope.launch {
LOG.debug("Start execution $eventClassName for $subscriber")
runBlocking {
withTimeout(subscriber.timeout) {
runInterruptible {
runBlocking {
withContext(Dispatchers.IO) {
try {
subscriber.callback(event)
}
catch (e: Throwable) {
exceptions.add(e)
}
}
}
}
}
}
LOG.debug("Finished execution $eventClassName for $subscriber")
}
}
?.forEach {
try {
it.join()
runBlocking { it.join() }
}
catch (e: CompletionException) {
catch (e: Throwable) {
exceptions.add(e)
}
}
@@ -98,5 +103,15 @@ class LocalEventsFlow : EventsFlow {
subscribersLock.writeLock().withLock {
subscribers.clear()
}
try {
runBlocking { parentJob.cancelAndJoin() }
}
catch (t: Throwable) {
LOG.info("Scope was not canceled, $t")
}
finally {
parentJob = Job()
scope = CoroutineScope(Dispatchers.IO) + parentJob
}
}
}

View File

@@ -0,0 +1,59 @@
package com.intellij.tools.ide.starter.bus.local
import com.intellij.tools.ide.starter.bus.EventsBus
import com.intellij.tools.ide.starter.bus.events.Event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.time.Duration.Companion.seconds
class UnsubscribeAllTest {
companion object {
private val subscriberProcessedEvent = AtomicBoolean(false)
}
@BeforeEach
fun enableLogs() {
System.setProperty("eventbus.debug", "true")
}
@AfterEach
fun afterEach() {
EventsBus.unsubscribeAll()
assertTrue(subscriberProcessedEvent.get())
System.setProperty("eventbus.debug", "false")
}
class CustomEvent : Event()
@Test
fun `wait for postAndWait even in unsubscribeAll if running async`() {
val subscriberDelay = 4.seconds
val latch = CountDownLatch(1)
EventsBus
.subscribe("First") { _: CustomEvent ->
val start = System.currentTimeMillis()
latch.countDown()
println("Count down")
while (System.currentTimeMillis() - start < subscriberDelay.inWholeMilliseconds) {
}
println("Finished task")
subscriberProcessedEvent.set(true)
}
CoroutineScope(Dispatchers.Default).launch {
EventsBus.postAndWaitProcessing(CustomEvent())
}
latch.await()
}
}