IJPL-160418 Fix flaky test RestartOfBackendRobotUiTest.gw deploy and restart of backend

If the frontend receives an event, "stream_closed" with an exception (except for `CancellationException`), it shuts down kernel.
This leads to inability to recover after such disconnect.

In Fleet thrown `RuntimeException` indicates that something is wrong with a setup because the cancellation order is determined.
In the case of Intellij, this exception happens because we cancel only top-level coroutine scope and order of children cancellation is unknown.

Instead of closing the channel in `invokeOnCompletion`, it can be closed in finally after subscribed has been canceled.
This way we do both: support cancellation from above (intellij case) and ensure there is no out-of-scope subscription (fleet case)

GitOrigin-RevId: 3fa7c670bad7a81e404997a1a2d9fb3572c94125
This commit is contained in:
Kate Botsman
2024-09-11 15:03:18 +02:00
committed by intellij-monorepo-bot
parent 71e2d81521
commit a4effe8aa5

View File

@@ -123,8 +123,8 @@ suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subsc
val (send, receive) = channels<Change>(capacity)
// trick: use channel in place of deferred, cause the latter one would hold the firstDB for the lifetime of the entire subscription
val firstDB = Channel<DB>(1)
launch(start = CoroutineStart.UNDISPATCHED,
context = Dispatchers.Unconfined) {
val job = launch(start = CoroutineStart.UNDISPATCHED,
context = Dispatchers.Unconfined) {
log.collect { e ->
when (e) {
is SubscriptionEvent.First -> {
@@ -138,14 +138,20 @@ suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subsc
}
}
}
}.apply {
invokeOnCompletion { x ->
val e = RuntimeException("subscription terminated, is subscription being consumed out of scope?", x)
firstDB.close(e)
send.close(e)
}
try {
coroutineScope {
body.run { subscribed(firstDB.receive(), receive) }
}
}.use {
body.run { subscribed(firstDB.receive(), receive) }
}
finally {
withContext(NonCancellable) {
job.cancelAndJoin()
}
val e = RuntimeException("subscription terminated, is subscription being consumed out of scope?")
firstDB.close(e)
send.close(e)
}
}
}