From 97034d5d2e0470ae58464b994ff53691f463fc4f Mon Sep 17 00:00:00 2001 From: Nikita Barkov Date: Tue, 13 Aug 2024 11:17:59 +0200 Subject: [PATCH] [perf_tests] Event bus. use coroutines instead of CompletableFeatures. Implemented UnsubscribeAllTest GitOrigin-RevId: 8e08620e907368501e16c60ff7bb5ed9738b1545 --- .../ide/starter/bus/local/LocalEventsFlow.kt | 53 +++++++++++------ .../starter/bus/local/UnsubscribeAllTest.kt | 59 +++++++++++++++++++ 2 files changed, 93 insertions(+), 19 deletions(-) create mode 100644 plugins/performanceTesting/event-bus/testSrc/com/intellij/tools/ide/starter/bus/local/UnsubscribeAllTest.kt diff --git a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/local/LocalEventsFlow.kt b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/local/LocalEventsFlow.kt index 8916e4cc136b..32d8273707a5 100644 --- a/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/local/LocalEventsFlow.kt +++ b/plugins/performanceTesting/event-bus/src/com/intellij/tools/ide/starter/bus/local/LocalEventsFlow.kt @@ -3,15 +3,9 @@ package com.intellij.tools.ide.starter.bus.local import com.intellij.tools.ide.starter.bus.EventsFlow import com.intellij.tools.ide.starter.bus.Subscriber import com.intellij.tools.ide.starter.bus.events.Event -import com.intellij.tools.ide.starter.bus.exceptions.EventBusException import com.intellij.tools.ide.starter.bus.logger.EventBusLoggerFactory -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.asExecutor -import kotlinx.coroutines.runBlocking -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionException +import kotlinx.coroutines.* import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.withLock import kotlin.jvm.internal.CallableReference @@ -26,6 +20,8 @@ class LocalEventsFlow : EventsFlow { // Therefore, CopyOnWriteArrayList is using private val subscribers = HashMap>>() private val subscribersLock = ReentrantReadWriteLock() + private var parentJob = Job() + private var scope = CoroutineScope(Dispatchers.IO) + parentJob // In case using class as subscriber. eg MyClass::class override fun getSubscriberObject(subscriber: Any) = if (subscriber is CallableReference) subscriber::class.toString() else subscriber @@ -67,23 +63,32 @@ class LocalEventsFlow : EventsFlow { (subscribersForEvent as? CopyOnWriteArrayList>) ?.map { subscriber -> LOG.debug("Post event $eventClassName for $subscriber.") - CompletableFuture.runAsync({ - LOG.debug("Start execution $eventClassName for $subscriber") - runBlocking { subscriber.callback(event) } - LOG.debug("Finished execution $eventClassName for $subscriber") - }, Dispatchers.IO.asExecutor()) - .orTimeout(subscriber.timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS) - .exceptionally { throwable -> - throw EventBusException(eventClassName, subscriber.subscriberName.toString(), - subscribersForEvent.map { it.subscriberName }, - throwable) + scope.launch { + LOG.debug("Start execution $eventClassName for $subscriber") + runBlocking { + withTimeout(subscriber.timeout) { + runInterruptible { + runBlocking { + withContext(Dispatchers.IO) { + try { + subscriber.callback(event) + } + catch (e: Throwable) { + exceptions.add(e) + } + } + } + } + } } + LOG.debug("Finished execution $eventClassName for $subscriber") + } } ?.forEach { try { - it.join() + runBlocking { it.join() } } - catch (e: CompletionException) { + catch (e: Throwable) { exceptions.add(e) } } @@ -98,5 +103,15 @@ class LocalEventsFlow : EventsFlow { subscribersLock.writeLock().withLock { subscribers.clear() } + try { + runBlocking { parentJob.cancelAndJoin() } + } + catch (t: Throwable) { + LOG.info("Scope was not canceled, $t") + } + finally { + parentJob = Job() + scope = CoroutineScope(Dispatchers.IO) + parentJob + } } } \ No newline at end of file diff --git a/plugins/performanceTesting/event-bus/testSrc/com/intellij/tools/ide/starter/bus/local/UnsubscribeAllTest.kt b/plugins/performanceTesting/event-bus/testSrc/com/intellij/tools/ide/starter/bus/local/UnsubscribeAllTest.kt new file mode 100644 index 000000000000..c76048a01c36 --- /dev/null +++ b/plugins/performanceTesting/event-bus/testSrc/com/intellij/tools/ide/starter/bus/local/UnsubscribeAllTest.kt @@ -0,0 +1,59 @@ +package com.intellij.tools.ide.starter.bus.local + +import com.intellij.tools.ide.starter.bus.EventsBus +import com.intellij.tools.ide.starter.bus.events.Event +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.time.Duration.Companion.seconds + + +class UnsubscribeAllTest { + companion object { + private val subscriberProcessedEvent = AtomicBoolean(false) + } + + + @BeforeEach + fun enableLogs() { + System.setProperty("eventbus.debug", "true") + } + + @AfterEach + fun afterEach() { + EventsBus.unsubscribeAll() + assertTrue(subscriberProcessedEvent.get()) + System.setProperty("eventbus.debug", "false") + } + + class CustomEvent : Event() + + @Test + fun `wait for postAndWait even in unsubscribeAll if running async`() { + val subscriberDelay = 4.seconds + val latch = CountDownLatch(1) + + EventsBus + .subscribe("First") { _: CustomEvent -> + val start = System.currentTimeMillis() + latch.countDown() + println("Count down") + while (System.currentTimeMillis() - start < subscriberDelay.inWholeMilliseconds) { + } + println("Finished task") + subscriberProcessedEvent.set(true) + } + + CoroutineScope(Dispatchers.Default).launch { + EventsBus.postAndWaitProcessing(CustomEvent()) + } + latch.await() + } + +} \ No newline at end of file