diff --git a/fleet/kernel/src/fleet/kernel/SagaScope.kt b/fleet/kernel/src/fleet/kernel/SagaScope.kt index d4c89f3aa235..c0805a507921 100644 --- a/fleet/kernel/src/fleet/kernel/SagaScope.kt +++ b/fleet/kernel/src/fleet/kernel/SagaScope.kt @@ -25,7 +25,7 @@ data class SagaScopeEntity(override val eid: EID) : Entity { val transactor by KernelAttr } -suspend fun sagaScope(body: suspend CoroutineScope.(CoroutineScope) -> Unit) { +suspend fun sagaScope(body: suspend CoroutineScope.(CoroutineScope) -> T): T = coroutineScope { resource { cc -> spannedScope("sagaScope") { @@ -61,4 +61,3 @@ suspend fun sagaScope(body: suspend CoroutineScope.(CoroutineScope) -> Unit) { } } } -} diff --git a/fleet/kernel/src/fleet/kernel/Storage.kt b/fleet/kernel/src/fleet/kernel/Storage.kt index 1332a42bbe01..019d100c172f 100644 --- a/fleet/kernel/src/fleet/kernel/Storage.kt +++ b/fleet/kernel/src/fleet/kernel/Storage.kt @@ -27,13 +27,15 @@ object Storage { const val DbSnapshotVersion: String = "11" @OptIn(FlowPreview::class) -suspend fun withStorage(storageKey: StorageKey, - autoSaveDebounceMs: Long, - loadSnapshot: suspend CoroutineScope.() -> DurableSnapshotWithPartitions, // reads snapshot from file - saveSnapshot: suspend CoroutineScope.(DurableSnapshotWithPartitions) -> Unit, // writes snapshot to file - serializationRestrictions: Set> = emptySet(), - serialization: ISerialization, - body: suspend CoroutineScope.() -> Unit) { +suspend fun withStorage( + storageKey: StorageKey, + autoSaveDebounceMs: Long, + loadSnapshot: suspend CoroutineScope.() -> DurableSnapshotWithPartitions, // reads snapshot from file + saveSnapshot: suspend CoroutineScope.(DurableSnapshotWithPartitions) -> Unit, // writes snapshot to file + serializationRestrictions: Set> = emptySet(), + serialization: ISerialization, + body: suspend CoroutineScope.() -> T, +): T = coroutineScope { catching { Storage.logger.info { "loading snapshot $storageKey" } @@ -108,13 +110,13 @@ suspend fun withStorage(storageKey: StorageKey, } }.use { body() + }.also { + Storage.logger.info { "last save for $storageKey " } + saveSnapshot(asOf(transactor().lastKnownDb) { + durableSnapshotWithPartitions(serialization, storageKey, serializationRestrictions) + }) } - Storage.logger.info { "last save for $storageKey " } - saveSnapshot(asOf(transactor().lastKnownDb) { - durableSnapshotWithPartitions(serialization, storageKey, serializationRestrictions) - }) } -} @Suppress("UNCHECKED_CAST") private fun storageKeyAttr(): Attribute { @@ -122,8 +124,10 @@ private fun storageKeyAttr(): Attribute { } @Serializable -data class DurableSnapshotWithPartitions(val snapshot: DurableSnapshot, - val partitions: Map) { +data class DurableSnapshotWithPartitions( + val snapshot: DurableSnapshot, + val partitions: Map, +) { companion object { val Empty = DurableSnapshotWithPartitions(DurableSnapshot.Empty, emptyMap()) } @@ -161,9 +165,11 @@ private fun DbContext.applyDurableSnapshotWithPartitions(serialization: ISe } } -private fun durableSnapshotWithPartitions(serialization: ISerialization, - storageKey: StorageKey, - serializationRestrictions: Set>): DurableSnapshotWithPartitions { +private fun durableSnapshotWithPartitions( + serialization: ISerialization, + storageKey: StorageKey, + serializationRestrictions: Set>, +): DurableSnapshotWithPartitions { return with(DbContext.threadBound) { val storageKeyAttr = storageKeyAttr() val uidAttribute = uidAttribute() diff --git a/fleet/kernel/src/fleet/kernel/Transactor.kt b/fleet/kernel/src/fleet/kernel/Transactor.kt index de5e9526221a..dafb62bad1a2 100644 --- a/fleet/kernel/src/fleet/kernel/Transactor.kt +++ b/fleet/kernel/src/fleet/kernel/Transactor.kt @@ -118,7 +118,7 @@ suspend fun transactor(): Transactor { * */ @Deprecated("use Kernel.log") -suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subscriber) { +suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subscriber): T = coroutineScope { val (send, receive) = channels(capacity) // trick: use channel in place of deferred, cause the latter one would hold the firstDB for the lifetime of the entire subscription @@ -154,13 +154,12 @@ suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subsc send.close(e) } } -} -fun interface Subscriber { +fun interface Subscriber { /** * Changes in the [changes] channel are guaranteed to be sequential, starting from the change applied to [initial]. * */ - suspend fun CoroutineScope.subscribed(initial: DB, changes: ReceiveChannel) + suspend fun CoroutineScope.subscribed(initial: DB, changes: ReceiveChannel): T } val Transactor.lastKnownDb: DB get() = dbState.value @@ -258,12 +257,12 @@ sealed interface SubscriptionEvent { * [middleware] is applied to every change fn synchronously, being able to supply meta to the change, or alter the behavior of fn in other ways * Consider adding KernelMiddleware if additional routine has to be performed on every [Transactor.changeAsync] */ -suspend fun withTransactor( +suspend fun withTransactor( entityClasses: List, middleware: TransactorMiddleware = TransactorMiddleware.Identity, defaultPart: Int = CommonPart, - body: suspend CoroutineScope.(Transactor) -> Unit, -): Unit = + body: suspend CoroutineScope.(Transactor) -> T, +): T = spannedScope("withKernel") { val kernelId: UID = UID.random() val initialDb = span("emptyDB") { DB.empty() } diff --git a/fleet/kernel/src/fleet/kernel/rete/Rete.kt b/fleet/kernel/src/fleet/kernel/rete/Rete.kt index 49727caafe12..e5b957411941 100644 --- a/fleet/kernel/src/fleet/kernel/rete/Rete.kt +++ b/fleet/kernel/src/fleet/kernel/rete/Rete.kt @@ -56,9 +56,9 @@ class ReteEntity(override val eid: EID): Entity { * Sets up [Rete] for use inside [body] * Runs a coroutine which consumes changes made in db and commands to add or remove [QueryObserver]s * */ -suspend fun withRete(failWhenPropagationFailed: Boolean = false, body: suspend CoroutineScope.() -> Unit) { +suspend fun withRete(failWhenPropagationFailed: Boolean = false, body: suspend CoroutineScope.() -> T): T { val (commandsSender, commandsReceiver) = channels(Channel.UNLIMITED) - spannedScope("withRete") { + return spannedScope("withRete") { val kernel = transactor() kernel.subscribe { db, changes -> val lastKnownDb = MutableStateFlow(db) @@ -107,9 +107,10 @@ suspend fun withRete(failWhenPropagationFailed: Boolean = false, body: suspend C } withContext(rete) { body() - } - change { - reteEntity.delete() + }.also { + change { + reteEntity.delete() + } } } } diff --git a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Attributes.kt b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Attributes.kt index 5ad2df8ec9ec..8ac70f4cc798 100644 --- a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Attributes.kt +++ b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Attributes.kt @@ -19,18 +19,18 @@ internal fun attr(ident: String, schema: Schema): Attribute<*> = * - [Optional] * - [Many] */ -sealed class Attributes( +sealed class Attributes( val namespace: String, val module: String, - initial: Map> + initial: Map> ) { - private val mutableAttrInfos: MutableMap> = HashMap(initial) + private val mutableAttrInfos: MutableMap> = HashMap(initial) /** * [EntityAttribute]s defined by this [Attributes] by their ident: namespace/name * */ - internal val entityAttributes: Map> get() = mutableAttrInfos + internal val entityAttributes: Map> get() = mutableAttrInfos val attrs: List> get() = entityAttributes.values.map { it.attr } @@ -365,7 +365,7 @@ enum class RefFlags { CASCADE_DELETE_BY } -internal fun merge(attrs: List>): Map> = +internal fun merge(attrs: List>): Map> = buildMap { attrs.forEach { m -> m.entityAttributes.forEach { (k, v) -> diff --git a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/ChangeScope.kt b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/ChangeScope.kt index 1e9ad3a7b427..a717635e8a4a 100644 --- a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/ChangeScope.kt +++ b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/ChangeScope.kt @@ -184,9 +184,9 @@ interface ChangeScope { let { entity -> context.run { builder.build(object : EntityBuilder.Target { - override fun set(attribute: Attributes.Required, value: V) = entity.set(attribute, value) - override fun set(attribute: Attributes.Optional, value: V?) = entity.set(attribute, value) - override fun set(attribute: Attributes.Many, values: Set) = entity.set(attribute, values) + override fun set(attribute: Attributes.Required, value: V) = entity.set(attribute, value) + override fun set(attribute: Attributes.Optional, value: V?) = entity.set(attribute, value) + override fun set(attribute: Attributes.Many, values: Set) = entity.set(attribute, values) }) } } diff --git a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/DbContext.kt b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/DbContext.kt index 6c736a8cf88d..06d1c27048af 100644 --- a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/DbContext.kt +++ b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/DbContext.kt @@ -1,6 +1,8 @@ // Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. package com.jetbrains.rhizomedb +import kotlin.coroutines.cancellation.CancellationException + //fun getStack(): Throwable = Throwable("dbcontext creation stack") /** @@ -17,7 +19,8 @@ class DbContext( get() { val q = _private_value return when { - q is Throwable -> throw q + q is CancellationException -> throw CancellationException("DBContext is poisoned", q) + q is Throwable -> throw RuntimeException("DBContext is poisoned", q) else -> q as QQ } } diff --git a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityAttribute.kt b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityAttribute.kt index 63c18143861f..8aca089dc919 100644 --- a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityAttribute.kt +++ b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityAttribute.kt @@ -12,7 +12,7 @@ import kotlinx.serialization.builtins.serializer * It's [EntityType] is [EntityAttribute.Companion] * [EntityAttribute] is an [Entity.EntityObject] of [EntityAttribute] entity. * */ -sealed class EntityAttribute( +sealed class EntityAttribute( val ident: String, val attr: Attribute<*>, internal val serializerLazy: Lazy>?, diff --git a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityType.kt b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityType.kt index c6aed0c65fd1..3c6d7fe7dc98 100644 --- a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityType.kt +++ b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/EntityType.kt @@ -139,9 +139,9 @@ val E.entityType: EntityType fun interface EntityBuilder { interface Target { - operator fun set(attribute: Attributes.Required, value: V) - operator fun set(attribute: Attributes.Optional, value: V?) - operator fun set(attribute: Attributes.Many, values: Set) + operator fun set(attribute: Attributes.Required, value: V) + operator fun set(attribute: Attributes.Optional, value: V?) + operator fun set(attribute: Attributes.Many, values: Set) } fun build(target: Target) @@ -155,7 +155,7 @@ internal fun EntityType.buildAttributes(builder: EntityBuilder { - private fun add(attribute: EntityAttribute, value: V) { + private fun add(attribute: EntityAttribute, value: V) { initializedAttrs.add(attribute.attr.eid) @Suppress("UNCHECKED_CAST") val attr = attribute.attr as Attribute @@ -163,15 +163,15 @@ internal fun EntityType.buildAttributes(builder: EntityBuilder set(attribute: Attributes.Required, value: V) { + override fun set(attribute: Attributes.Required, value: V) { add(attribute, value) } - override fun set(attribute: Attributes.Optional, value: V?) { + override fun set(attribute: Attributes.Optional, value: V?) { value?.let { add(attribute, it) } } - override fun set(attribute: Attributes.Many, values: Set) { + override fun set(attribute: Attributes.Many, values: Set) { for (v in values) { add(attribute, v) } @@ -200,36 +200,4 @@ internal fun EntityType.buildAttributes(builder: EntityBuilder( - ident: String, - module: String, - vararg mixins: Mixin -) : Attributes(ident, module, merge(mixins.toList())) { - - /** - * Tell [Mixin] to use qualified name of the given [KClass] as [namespace] - * It may be useful to guarantee the uniqueness of the [namespace]. - * But it could backfire for durable entities, - * if one renames the class, or moves it to other package. - * */ - constructor( - ident: KClass, - vararg mixins: Mixin - ) : this(requireNotNull(ident.qualifiedName), entityModule(ident), *mixins) - - constructor(ident: KClass<*>, version: Int?, vararg mixins: Mixin) : this( - ident = ident.qualifiedName!! + if (version != null) ":$version" else "", - module = ident.java.module.name, - mixins = mixins, - ) - - override fun toString(): String = "Mixin($namespace)" -} - fun entityModule(entityClass: KClass): String = entityClass.java.module.name ?: "" diff --git a/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Mixin.kt b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Mixin.kt new file mode 100644 index 000000000000..389ab1beaa21 --- /dev/null +++ b/fleet/rhizomedb/src/main/kotlin/com/jetbrains/rhizomedb/Mixin.kt @@ -0,0 +1,35 @@ +package com.jetbrains.rhizomedb + +import kotlin.reflect.KClass + +/** + * [Mixin] allows to attach same attributes to multiple [EntityType]s + * Unlike [EntityType], [Mixin] has no identity and is not represeted in the database as [Entity]. + * It's only job is to carry a set of attributes, which can be attached to [EntityType]s. + * One example of it is [Entity.Companion], which defines attributes universal to all [EntityType]s. + * */ +abstract class Mixin( + ident: String, + module: String, + vararg mixins: Mixin +) : Attributes(ident, module, merge(mixins.toList())) { + + /** + * Tell [Mixin] to use qualified name of the given [KClass] as [namespace] + * It may be useful to guarantee the uniqueness of the [namespace]. + * But it could backfire for durable entities, + * if one renames the class, or moves it to other package. + * */ + constructor( + ident: KClass, + vararg mixins: Mixin + ) : this(requireNotNull(ident.qualifiedName), entityModule(ident), *mixins) + + constructor(ident: KClass<*>, version: Int?, vararg mixins: Mixin) : this( + ident = ident.qualifiedName!! + if (version != null) ":$version" else "", + module = ident.java.module.name, + mixins = mixins, + ) + + override fun toString(): String = "Mixin($namespace)" +} diff --git a/fleet/rpc/src/fleet/rpc/client/RpcClient.kt b/fleet/rpc/src/fleet/rpc/client/RpcClient.kt index 3b0fc46f1b54..43862dd29897 100644 --- a/fleet/rpc/src/fleet/rpc/client/RpcClient.kt +++ b/fleet/rpc/src/fleet/rpc/client/RpcClient.kt @@ -43,13 +43,13 @@ private data class OutgoingRequest( private data class OngoingRequest(val request: OutgoingRequest, val span: Span) @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) -suspend fun rpcClient( +suspend fun rpcClient( transport: Transport, serialization: () -> Serialization, origin: UID, requestInterceptor: RpcInterceptor = RpcInterceptor, - body: suspend CoroutineScope.(RpcClient) -> Unit, -) { + body: suspend CoroutineScope.(RpcClient) -> T, +): T = newSingleThreadContext("rpc-client-$origin").use { executor -> withSupervisor { supervisor -> val client = RpcClient(coroutineScope = supervisor + supervisor.coroutineNameAppended("RpcClient"), @@ -63,7 +63,6 @@ suspend fun rpcClient( } } } -} class RpcClient internal constructor( private val coroutineScope: CoroutineScope, diff --git a/fleet/util/core/src/fleet/util/async/ExperimentalResourceScope.kt b/fleet/util/core/src/fleet/util/async/ExperimentalResourceScope.kt deleted file mode 100644 index da77b15ab781..000000000000 --- a/fleet/util/core/src/fleet/util/async/ExperimentalResourceScope.kt +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. -package fleet.util.async - -import fleet.tracing.spannedScope -import kotlinx.coroutines.* -import java.lang.RuntimeException -import java.util.concurrent.ConcurrentHashMap - -@DslMarker -annotation class ResourceDsl - -@ResourceDsl -sealed interface ExperimentalResourceScope : CoroutineScope { - suspend fun Shared.await(): T - - /** - * Launces a coroutine that will collect resource returned by [body]. - * Coroutine will be launched on [this], it will inherit coroutine context and become a child of this scope (which might prevent it from shutting down). - * [ShareScope] allows one to establish dependencies on other shared resources so that we know in which order to shut them down. - * */ - fun CoroutineScope.share(debugName: String? = null, body: ShareScope.() -> ExperimentalResource): Shared - - /** - * Simplistic shortcut to register a job for cancellation on shutdown of this [ExperimentalResourceScope]. - * Cancellation will be triggered **after** all coroutines launched with [share]. - * */ - fun T.cancelOnExit(): T - fun shutdown() -} - -// at the moment of shutdown we will complete this deferred with a set of jobs to wait for -private typealias ShutdownSignal = CompletableDeferred> - -private class ResourceScopeImpl(scope: CoroutineScope) : ExperimentalResourceScope, CoroutineScope by scope { - private val shareds = ConcurrentHashMap.newKeySet>() - private val cancelOnExit = ConcurrentHashMap.newKeySet() - - override suspend fun Shared.await(): T = - let { shared -> - spannedScope("awaiting ${shared.debugName}") { - shared.deferred.await() - } - } - - override fun CoroutineScope.share(debugName: String?, body: ShareScope.() -> ExperimentalResource): Shared = - let { scope -> - val dependencies = hashSetOf>>() - val resource = object : ShareScope { - override fun Shared.require(): Deferred = - let { dependency -> - dependency.deferred.also { - // if dependency is launced outside of current resource scope, we should not bother with dependency tracking - if (dependency.resourceScope == this@ResourceScopeImpl) { - dependencies.add(dependency.shutdownSignal) - } - } - } - }.run(body) - val deferred = CompletableDeferred() - val shutdownSignal = CompletableDeferred>() - val job = scope.launch { - resource.collect { t -> - deferred.complete(t) - for (dep in shutdownSignal.await()) { - dep.join() - } - } - }.apply { - invokeOnCompletion { cause -> - if (!deferred.isCompleted) { - deferred.completeExceptionally(cause ?: RuntimeException("unreachable")) - } - } - } - - val shared = Shared( - resourceScope = this@ResourceScopeImpl, - deferred = deferred, - shutdownSignal = shutdownSignal, - job = job, - dependencies = dependencies, - debugName = debugName, - ) - shareds.add(shared) - return shared - } - - override fun T.cancelOnExit(): T = - also { job -> - cancelOnExit.add(job) - job.invokeOnCompletion { cancelOnExit.remove(job) } - } - - override fun shutdown() { - val shutdownOrder = buildMap>, MutableSet> { - shareds.forEach { shared -> - // schedule shutdown even if shared has no dependencies - getOrPut(shared.shutdownSignal) { mutableSetOf() } - // each dependency must wait for completion of our job - shared.dependencies.forEach { dep -> - getOrPut(dep) { mutableSetOf() }.add(shared.job) - } - } - } - shutdownOrder.forEach { (shutdownSignal, jobsToWaitFor) -> - shutdownSignal.complete(jobsToWaitFor) - } - for (job in cancelOnExit) { - job.cancel() - } - } -} - -/** - * EXPERIMENTAL NOT SAFE TO USE - * */ -suspend fun resourceScope(body: suspend ExperimentalResourceScope.() -> T): T = - coroutineScope { - ResourceScopeImpl(this).run { body() } - } - -class Shared internal constructor( - internal val resourceScope: ExperimentalResourceScope, - internal val deferred: Deferred, - // will be completed with dependency set at the moment of shutdown - internal val shutdownSignal: ShutdownSignal, - internal val job: Job, - // shareds that have to wait for us - internal val dependencies: HashSet, - internal val debugName: String?, -) - -@ResourceDsl -interface ShareScope { - fun Shared.require(): Deferred -} - -fun ShareScope.require(dependency: Shared): Deferred = dependency.require() - - -class ExperimentalResource { - /** - * Marker interface to prevent resource-related receivers from leaking into resource body. - * */ - @ResourceDsl - interface Scope : CoroutineScope - - private val producer: suspend (Consumer) -> Unit - - constructor(value: T) { - producer = { consumer -> consumer(value) } - } - - /** - * [body] **must** call given [Consumer] exactly once, failure to do so will result in exception. - * [Consumer] will suspend for the time the resource is in use. Once it returns it is time to perform shutdown. - * [body] is responsible for completion of all spawned jobs, they will not be cancelled automatically. - * */ - constructor(body: suspend Scope.(Consumer) -> Unit) { - producer = { consumer: Consumer -> - var emitted = false - coroutineScope { - object : Scope, CoroutineScope by this {} - .run { - body { t -> - require(!emitted) { "double emission" } - emitted = true - consumer(t) - } - } - } - } - } - - companion object { - private val NONE = Any() - } - - suspend fun use(body: suspend CoroutineScope.(Deferred) -> U): U = - coroutineScope { - val deferredResource = CompletableDeferred() - val shutdown = Job() - // TODO should it be cancellable? optionally cancellable? - launch(start = CoroutineStart.UNDISPATCHED) { - producer { t -> - deferredResource.complete(t) - shutdown.join() - } - }.invokeOnCompletion { - deferredResource.completeExceptionally(RuntimeException("Resource didn't emit")) - } - coroutineScope { body(deferredResource) }.also { shutdown.complete() } - } - - fun CoroutineScope.launch(): Deferred = TODO("cancellable? Handle with stop?") -} - -suspend fun ExperimentalResource.collect(body: suspend CoroutineScope.(T) -> U): U = use { d -> body(d.await()) } diff --git a/fleet/util/core/src/fleet/util/async/Handle.kt b/fleet/util/core/src/fleet/util/async/Handle.kt index acd437a53501..793c1926ce03 100644 --- a/fleet/util/core/src/fleet/util/async/Handle.kt +++ b/fleet/util/core/src/fleet/util/async/Handle.kt @@ -64,11 +64,10 @@ interface HandleScope : CoroutineScope { fun handle(launcher: Launcher): Handle } -suspend fun handleScope(body: suspend HandleScope.() -> Unit) { +suspend fun handleScope(body: suspend HandleScope.() -> T): T = supervisorScope { handleScopeImpl(this, body) } -} //@fleet.kernel.plugins.InternalInPluginModules(where = ["fleet.testlib"]) suspend fun handleScopeNonSupervising(body: suspend HandleScope.() -> Unit) { @@ -77,9 +76,9 @@ suspend fun handleScopeNonSupervising(body: suspend HandleScope.() -> Unit) { } } -private suspend fun handleScopeImpl(outerScope: CoroutineScope, body: suspend HandleScope.() -> Unit) { +private suspend fun handleScopeImpl(outerScope: CoroutineScope, body: suspend HandleScope.() -> T): T { val handles = AtomicRef(BifurcanSet>()) - try { + return try { coroutineScope { val context = coroutineContext object : HandleScope { diff --git a/fleet/util/core/src/fleet/util/async/Resource.kt b/fleet/util/core/src/fleet/util/async/Resource.kt index 9f642264cecb..1e995b7172c5 100644 --- a/fleet/util/core/src/fleet/util/async/Resource.kt +++ b/fleet/util/core/src/fleet/util/async/Resource.kt @@ -8,8 +8,6 @@ import kotlinx.coroutines.* import java.lang.RuntimeException import kotlin.coroutines.CoroutineContext -typealias Consumer = suspend (T) -> Unit - /** * Something backed by a coroutine tree. * @@ -49,9 +47,12 @@ interface Resource { } /** - * @see [Resource] + * Main [Resource] constructor. + * [producer] must invoke consumer with the resource instance. + * return type [Consumed], can only be constructed from consumer invocation. + * [Consumed] is used as a proof of invocation to prevent accidental null-punning and consequent leaked coroutines * */ -fun resource(producer: suspend CoroutineScope.(Consumer) -> Unit): Resource = +fun resource(producer: suspend CoroutineScope.(consumer: Consumer) -> Consumed): Resource = object : Resource { override suspend fun use(body: suspend CoroutineScope.(T) -> U): U = coroutineScope { @@ -62,6 +63,7 @@ fun resource(producer: suspend CoroutineScope.(Consumer) -> Unit): Resour producer { t -> check(deferred.complete(t)) { "Double emission" } shutdown.join() + Proof } }.invokeOnCompletion { cause -> deferred.completeExceptionally(cause ?: RuntimeException("Resource didn't emit")) @@ -70,6 +72,15 @@ fun resource(producer: suspend CoroutineScope.(Consumer) -> Unit): Resour } } +/** + * A proof that a consumer is invoked + * */ +sealed interface Consumed + +internal data object Proof: Consumed + +typealias Consumer = suspend (T) -> Consumed + fun resourceOf(value: T): Resource = object : Resource { override suspend fun use(body: suspend CoroutineScope.(T) -> U): U = diff --git a/fleet/util/core/src/fleet/util/async/WithLaunched.kt b/fleet/util/core/src/fleet/util/async/WithLaunched.kt index 62715d18361a..06ac4ce7cba0 100644 --- a/fleet/util/core/src/fleet/util/async/WithLaunched.kt +++ b/fleet/util/core/src/fleet/util/async/WithLaunched.kt @@ -35,10 +35,10 @@ suspend fun withSupervisor(body: suspend CoroutineScope.(scope: CoroutineSco } } -suspend fun withCoroutineScope(body: suspend CoroutineScope.(scope: CoroutineScope) -> Unit) { +suspend fun withCoroutineScope(body: suspend CoroutineScope.(scope: CoroutineScope) -> T): T { val context = currentCoroutineContext() val job = Job(context.job) - try { + return try { coroutineScope { body(CoroutineScope(context + job)) } } finally {