diff --git a/fleet/kernel/src/fleet/kernel/Transactor.kt b/fleet/kernel/src/fleet/kernel/Transactor.kt index 1b3e628e3604..de5e9526221a 100644 --- a/fleet/kernel/src/fleet/kernel/Transactor.kt +++ b/fleet/kernel/src/fleet/kernel/Transactor.kt @@ -123,8 +123,8 @@ suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subsc val (send, receive) = channels(capacity) // trick: use channel in place of deferred, cause the latter one would hold the firstDB for the lifetime of the entire subscription val firstDB = Channel(1) - launch(start = CoroutineStart.UNDISPATCHED, - context = Dispatchers.Unconfined) { + val job = launch(start = CoroutineStart.UNDISPATCHED, + context = Dispatchers.Unconfined) { log.collect { e -> when (e) { is SubscriptionEvent.First -> { @@ -138,14 +138,20 @@ suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subsc } } } - }.apply { - invokeOnCompletion { x -> - val e = RuntimeException("subscription terminated, is subscription being consumed out of scope?", x) - firstDB.close(e) - send.close(e) + } + + try { + coroutineScope { + body.run { subscribed(firstDB.receive(), receive) } } - }.use { - body.run { subscribed(firstDB.receive(), receive) } + } + finally { + withContext(NonCancellable) { + job.cancelAndJoin() + } + val e = RuntimeException("subscription terminated, is subscription being consumed out of scope?") + firstDB.close(e) + send.close(e) } } }