[starter] introduce subscribeOnce for EventBus

GitOrigin-RevId: 1fe1beae604f3b3c3a8a210132245d4c8060c5b6
This commit is contained in:
Anastasia Katsman
2025-09-16 18:07:50 +02:00
committed by intellij-monorepo-bot
parent 3caa4d60c7
commit a60284d8bc
6 changed files with 197 additions and 14 deletions

View File

@@ -40,7 +40,8 @@ object EventsBus {
/**
* Can have only one subscription by pair subscriber + event
* Subscriber might be invoked multiple times on different events since unsubscription happens only after end of test.
* Subscriber might be invoked multiple times on different events since unsubscription happens only after end of test container.
* See com.intellij.ide.starter.config.StarterConfigurationStorage.afterEachMessageBusCleanup()
* If you need to unsubscribe earlier call [unsubscribe]
*/
inline fun <reified EventType : Event> subscribe(
@@ -60,6 +61,27 @@ object EventsBus {
return this
}
/**
* Can have only one subscription by pair subscriber + event
* Subscriber will be invoked once for a single event.
*/
inline fun <reified EventType : Event> subscribeOnce(
subscriber: Any,
timeout: Duration = 2.minutes,
ignoreExceptions: Boolean = true,
noinline callback: suspend (event: EventType) -> Unit
): EventsBus {
executeWithExceptionHandling(ignoreExceptions) {
if (SharedEvent::class.java.isAssignableFrom(EventType::class.java)) {
SHARED_EVENTS_FLOW.subscribeOnce(eventClass = EventType::class.java, subscriber = subscriber, timeout, callback)
SHARED_EVENTS_FLOW.startServerPolling()
}
else
EVENTS_FLOW.subscribeOnce(eventClass = EventType::class.java, subscriber = subscriber, timeout, callback)
}
return this
}
inline fun <reified EventType : Event> unsubscribe(subscriber: Any, ignoreExceptions: Boolean = true) {
executeWithExceptionHandling(ignoreExceptions) {
if (SharedEvent::class.java.isAssignableFrom(EventType::class.java)) {

View File

@@ -7,10 +7,19 @@ import kotlin.time.Duration.Companion.seconds
interface EventsFlow {
fun unsubscribeAll()
fun <EventType : Event> subscribe(eventClass: Class<EventType>,
subscriber: Any,
timeout: Duration = 30.seconds,
callback: suspend (event: EventType) -> Unit): Boolean
fun <EventType : Event> subscribe(
eventClass: Class<EventType>,
subscriber: Any,
timeout: Duration = 30.seconds,
callback: suspend (event: EventType) -> Unit,
): Boolean
fun <EventType : Event> subscribeOnce(
eventClass: Class<EventType>,
subscriber: Any,
timeout: Duration = 30.seconds,
callback: suspend (event: EventType) -> Unit,
): Boolean
fun <T : Event> postAndWaitProcessing(event: T)
fun <EventType : Event> unsubscribe(eventClass: Class<EventType>, subscriber: Any)

View File

@@ -6,4 +6,5 @@ import kotlin.time.Duration
data class Subscriber<T : Event>(val subscriberName: Any,
val timeout: Duration,
val executeOnce: Boolean = false,
val callback: suspend (event: T) -> Unit)

View File

@@ -29,16 +29,20 @@ class LocalEventsFlow : EventsFlow {
override fun <EventType : Event> unsubscribe(eventClass: Class<EventType>, subscriber: Any) {
subscribersLock.writeLock().withLock {
val eventClassName = eventClass.simpleName
val subscriberName = getSubscriberObject(subscriber)
subscribers[eventClassName]?.removeIf { it.subscriberName == subscriberName }
LOG.debug("Unsubscribing $subscriberName for $eventClassName")
unsubscribeNoLock(eventClass, subscriber)
}
}
private fun <EventType : Event> unsubscribeNoLock(eventClass: Class<EventType>, subscriber: Any) {
val eventClassName = eventClass.simpleName
val subscriberName = getSubscriberObject(subscriber)
subscribers[eventClassName]?.removeIf { it.subscriberName == subscriberName }
LOG.debug("Unsubscribing $subscriberName for $eventClassName")
}
override fun <EventType : Event> subscribe(
private fun <EventType : Event> subscribe(
eventClass: Class<EventType>,
subscriber: Any,
executeOnce: Boolean,
timeout: Duration,
callback: suspend (event: EventType) -> Unit,
): Boolean {
@@ -48,20 +52,40 @@ class LocalEventsFlow : EventsFlow {
val subscriberObject = getSubscriberObject(subscriber)
// To avoid double subscriptions
if (subscribers[eventClassName]?.any { it.subscriberName == subscriberObject } == true) return false
val newSubscriber = Subscriber(subscriberObject, timeout, callback)
val newSubscriber = Subscriber(subscriberObject, timeout, executeOnce = executeOnce, callback)
LOG.debug("New subscriber $newSubscriber for $eventClassName")
subscribers.computeIfAbsent(eventClassName) { CopyOnWriteArrayList() }.add(newSubscriber)
return true
}
}
override fun <EventType : Event> subscribeOnce(
eventClass: Class<EventType>,
subscriber: Any,
timeout: Duration,
callback: suspend (event: EventType) -> Unit,
): Boolean = subscribe(eventClass, subscriber, true, timeout, callback)
override fun <EventType : Event> subscribe(
eventClass: Class<EventType>,
subscriber: Any,
timeout: Duration,
callback: suspend (event: EventType) -> Unit,
): Boolean = subscribe(eventClass, subscriber, false, timeout, callback)
override fun <T : Event> postAndWaitProcessing(event: T) {
val eventClassName = event.javaClass.simpleName
val subscribersForEvent = subscribersLock.readLock().withLock {
subscribers[eventClassName]
val subscribersForEvent = subscribersLock.writeLock().withLock {
subscribers[eventClassName]?.toList().also { allSubscribersForEvent ->
allSubscribersForEvent?.forEach { subscriber ->
if (subscriber.executeOnce) {
unsubscribeNoLock(event.javaClass, subscriber.subscriberName)
}
}
}
}
val exceptions = CopyOnWriteArrayList<Throwable>()
(subscribersForEvent as? CopyOnWriteArrayList<Subscriber<T>>)
(subscribersForEvent as? List<Subscriber<T>>)
?.map { subscriber ->
// In case the job is interrupted (e.g. due to timeout), the coroutine may enter Cancelling state
// and finish before the 'catch' block is executed. Using CompletableDeferred ensures we wait

View File

@@ -43,6 +43,17 @@ class SharedEventsFlow(
}
}
override fun <EventType : Event> subscribeOnce(
eventClass: Class<EventType>,
subscriber: Any,
timeout: Duration,
callback: suspend (event: EventType) -> Unit,
): Boolean {
return localEventsFlow.subscribeOnce(eventClass, subscriber, timeout, callback).also {
if (it) client.newSubscriber(eventClass, timeout, getSubscriberObject(subscriber).toString())
}
}
override fun <T : Event> postAndWaitProcessing(event: T) {
LOG.debug("Post event $event")
client.postAndWaitProcessing(

View File

@@ -2,10 +2,15 @@ package com.intellij.tools.ide.starter.bus.local
import com.intellij.tools.ide.starter.bus.EventsBus
import com.intellij.tools.ide.starter.bus.events.Event
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.seconds
class SubscribingOnlyOnceTest {
@@ -49,4 +54,115 @@ class SubscribingOnlyOnceTest {
assertEquals(eventProcessedTimes.get(), 1)
assertEquals(secondProcessedTimes.get(), 1)
}
@Test
fun `subscribe is executed many times when triggered from different threads`() {
val obj = Any()
val eventProcessedTimes = AtomicInteger()
EventsBus.subscribe(obj) { _: Event ->
eventProcessedTimes.incrementAndGet()
}
runBlocking {
val n = 10
launch {
repeat(n) {
launch(Dispatchers.IO) {
EventsBus.postAndWaitProcessing(Event())
}
}
}.join()
delay(2.seconds)
assertEquals(eventProcessedTimes.get(), n)
EventsBus.postAndWaitProcessing(Event())
assertEquals(eventProcessedTimes.get(), n + 1)
}
}
@Test
fun `subscribe once is executed once`() {
val obj = Any()
val eventProcessedTimes = AtomicInteger()
val secondProcessedTimes = AtomicInteger()
EventsBus
.subscribeOnce(this) { _: Event ->
eventProcessedTimes.incrementAndGet()
}
.subscribe(obj) { _: Event ->
secondProcessedTimes.incrementAndGet()
}
runBlocking {
val n = 10
launch {
repeat(n) {
launch(Dispatchers.IO) {
EventsBus.postAndWaitProcessing(Event())
}
}
}.join()
assertEquals(1, eventProcessedTimes.get())
assertEquals(n, secondProcessedTimes.get())
EventsBus.postAndWaitProcessing(Event())
assertEquals(1, eventProcessedTimes.get())
assertEquals(n + 1, secondProcessedTimes.get())
}
}
@Test
fun `subscribe once and subscribe once again`() {
val eventProcessedTimes = AtomicInteger()
runBlocking {
EventsBus.subscribeOnce(this) { _: Event -> eventProcessedTimes.incrementAndGet() }
val n = 10
launch {
repeat(n) {
launch(Dispatchers.IO) {
EventsBus.postAndWaitProcessing(Event())
}
}
}.join()
assertEquals(1, eventProcessedTimes.get())
eventProcessedTimes.set(0)
EventsBus.subscribeOnce(this) { _: Event -> eventProcessedTimes.incrementAndGet() }
launch {
repeat(n) {
launch(Dispatchers.IO) {
EventsBus.postAndWaitProcessing(Event())
}
}
}.join()
assertEquals(1, eventProcessedTimes.get())
}
}
@Test
fun `subscribe once while adding events`() {
val eventProcessedTimes = AtomicInteger()
runBlocking {
launch {
repeat(10) {
launch(Dispatchers.IO) {
EventsBus.postAndWaitProcessing(Event())
}
}
launch { EventsBus.subscribeOnce(this) { _: Event -> eventProcessedTimes.incrementAndGet() } }
}.join()
EventsBus.postAndWaitProcessing(Event())
assertEquals(1, eventProcessedTimes.get())
}
}
}