[event_bus]AT-2238.Fix exception handling

(cherry picked from commit f6967ef259c92fd24d357b9b6dbc490ec1b256ab)

IJ-CR-159788

GitOrigin-RevId: 30e792d74569d3be997f57174d0c4b6cb5317955
This commit is contained in:
Nikita Barkov
2025-04-08 16:33:24 +02:00
committed by intellij-monorepo-bot
parent b4c0cd8609
commit fa86b53066

View File

@@ -59,22 +59,34 @@ class LocalEventsFlow : EventsFlow {
val subscribersForEvent = subscribersLock.readLock().withLock {
subscribers[eventClassName]
}
val exceptions = mutableListOf<Throwable>()
val exceptions = CopyOnWriteArrayList<Throwable>()
(subscribersForEvent as? CopyOnWriteArrayList<Subscriber<T>>)
?.map { subscriber ->
// In case the job is interrupted (e.g. due to timeout), the coroutine may enter Cancelling state
// and finish before the 'catch' block is executed. Using CompletableDeferred ensures we wait
// until either successful completion or proper exception handling has occurred.
val result = CompletableDeferred<Unit>()
LOG.debug("Post event $eventClassName for $subscriber.")
// Launching a new coroutine for each subscriber
scope.launch {
LOG.debug("Start execution $eventClassName for $subscriber")
// Blocking the current coroutine to enforce timeout and interruptibility
runBlocking {
// Enforces a timeout for the entire subscriber execution
withTimeout(subscriber.timeout) {
// Ensures the operation inside is interruptible — if the thread is blocked,
// it will be interrupted when the coroutine is cancelled (e.g. by timeout)
runInterruptible {
runBlocking {
// Switch to IO dispatcher for potentially blocking I/O operations and for more workers
withContext(Dispatchers.IO) {
try {
subscriber.callback(event)
result.complete(Unit)
}
catch (e: Throwable) {
exceptions.add(e)
result.complete(Unit)
}
}
}
@@ -83,16 +95,19 @@ class LocalEventsFlow : EventsFlow {
}
LOG.debug("Finished execution $eventClassName for $subscriber")
}
return@map result
}
?.forEach {
try {
runBlocking { it.join() }
runBlocking { it.await() }
}
catch (e: Throwable) {
LOG.info("Exception occurred while processing $e")
exceptions.add(e)
}
}
LOG.debug("All exceptions: $exceptions")
if (exceptions.isNotEmpty()) {
val exceptionsString = exceptions.joinToString(separator = "\n") { e -> "${exceptions.indexOf(e) + 1}) ${e.message}" }
throw IllegalArgumentException("Exceptions occurred while processing subscribers. $exceptionsString")