[fleet] major (This is a combination of 5 commits)

[fleet] throw new exception on attempt to access poisoned DbContext to see the access stacktrace

[fleet] FL-29872 Failed to navigate to NavigationParameters: 'Resource didn't emit'

safer version of resource constructor

[fleet] preserve exception kind

[fleet] rebase

[fleet] FL-29573 Argument covariance in EntityAttribute allows to write incorrect code

GitOrigin-RevId: 2f1937bc18efb4031f8420d7290550291a3db30c
This commit is contained in:
Andrey Zaytsev
2024-09-18 19:03:55 +02:00
committed by intellij-monorepo-bot
parent 08ba66f735
commit 897f79bfc4
15 changed files with 114 additions and 292 deletions

View File

@@ -25,7 +25,7 @@ data class SagaScopeEntity(override val eid: EID) : Entity {
val transactor by KernelAttr
}
suspend fun sagaScope(body: suspend CoroutineScope.(CoroutineScope) -> Unit) {
suspend fun<T> sagaScope(body: suspend CoroutineScope.(CoroutineScope) -> T): T =
coroutineScope {
resource { cc ->
spannedScope("sagaScope") {
@@ -61,4 +61,3 @@ suspend fun sagaScope(body: suspend CoroutineScope.(CoroutineScope) -> Unit) {
}
}
}
}

View File

@@ -27,13 +27,15 @@ object Storage {
const val DbSnapshotVersion: String = "11"
@OptIn(FlowPreview::class)
suspend fun withStorage(storageKey: StorageKey,
autoSaveDebounceMs: Long,
loadSnapshot: suspend CoroutineScope.() -> DurableSnapshotWithPartitions, // reads snapshot from file
saveSnapshot: suspend CoroutineScope.(DurableSnapshotWithPartitions) -> Unit, // writes snapshot to file
serializationRestrictions: Set<KClass<*>> = emptySet(),
serialization: ISerialization,
body: suspend CoroutineScope.() -> Unit) {
suspend fun <T> withStorage(
storageKey: StorageKey,
autoSaveDebounceMs: Long,
loadSnapshot: suspend CoroutineScope.() -> DurableSnapshotWithPartitions, // reads snapshot from file
saveSnapshot: suspend CoroutineScope.(DurableSnapshotWithPartitions) -> Unit, // writes snapshot to file
serializationRestrictions: Set<KClass<*>> = emptySet(),
serialization: ISerialization,
body: suspend CoroutineScope.() -> T,
): T =
coroutineScope {
catching {
Storage.logger.info { "loading snapshot $storageKey" }
@@ -108,13 +110,13 @@ suspend fun withStorage(storageKey: StorageKey,
}
}.use {
body()
}.also {
Storage.logger.info { "last save for $storageKey " }
saveSnapshot(asOf(transactor().lastKnownDb) {
durableSnapshotWithPartitions(serialization, storageKey, serializationRestrictions)
})
}
Storage.logger.info { "last save for $storageKey " }
saveSnapshot(asOf(transactor().lastKnownDb) {
durableSnapshotWithPartitions(serialization, storageKey, serializationRestrictions)
})
}
}
@Suppress("UNCHECKED_CAST")
private fun storageKeyAttr(): Attribute<StorageKey> {
@@ -122,8 +124,10 @@ private fun storageKeyAttr(): Attribute<StorageKey> {
}
@Serializable
data class DurableSnapshotWithPartitions(val snapshot: DurableSnapshot,
val partitions: Map<UID, Int>) {
data class DurableSnapshotWithPartitions(
val snapshot: DurableSnapshot,
val partitions: Map<UID, Int>,
) {
companion object {
val Empty = DurableSnapshotWithPartitions(DurableSnapshot.Empty, emptyMap())
}
@@ -161,9 +165,11 @@ private fun DbContext<Mut>.applyDurableSnapshotWithPartitions(serialization: ISe
}
}
private fun durableSnapshotWithPartitions(serialization: ISerialization,
storageKey: StorageKey,
serializationRestrictions: Set<KClass<*>>): DurableSnapshotWithPartitions {
private fun durableSnapshotWithPartitions(
serialization: ISerialization,
storageKey: StorageKey,
serializationRestrictions: Set<KClass<*>>,
): DurableSnapshotWithPartitions {
return with(DbContext.threadBound) {
val storageKeyAttr = storageKeyAttr()
val uidAttribute = uidAttribute()

View File

@@ -118,7 +118,7 @@ suspend fun transactor(): Transactor {
*
*/
@Deprecated("use Kernel.log")
suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subscriber) {
suspend fun <T> Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subscriber<T>): T =
coroutineScope {
val (send, receive) = channels<Change>(capacity)
// trick: use channel in place of deferred, cause the latter one would hold the firstDB for the lifetime of the entire subscription
@@ -154,13 +154,12 @@ suspend fun Transactor.subscribe(capacity: Int = Channel.RENDEZVOUS, body: Subsc
send.close(e)
}
}
}
fun interface Subscriber {
fun interface Subscriber<T> {
/**
* Changes in the [changes] channel are guaranteed to be sequential, starting from the change applied to [initial].
* */
suspend fun CoroutineScope.subscribed(initial: DB, changes: ReceiveChannel<Change>)
suspend fun CoroutineScope.subscribed(initial: DB, changes: ReceiveChannel<Change>): T
}
val Transactor.lastKnownDb: DB get() = dbState.value
@@ -258,12 +257,12 @@ sealed interface SubscriptionEvent {
* [middleware] is applied to every change fn synchronously, being able to supply meta to the change, or alter the behavior of fn in other ways
* Consider adding KernelMiddleware if additional routine has to be performed on every [Transactor.changeAsync]
*/
suspend fun withTransactor(
suspend fun<T> withTransactor(
entityClasses: List<EntityTypeDefinition>,
middleware: TransactorMiddleware = TransactorMiddleware.Identity,
defaultPart: Int = CommonPart,
body: suspend CoroutineScope.(Transactor) -> Unit,
): Unit =
body: suspend CoroutineScope.(Transactor) -> T,
): T =
spannedScope("withKernel") {
val kernelId: UID = UID.random()
val initialDb = span("emptyDB") { DB.empty() }

View File

@@ -56,9 +56,9 @@ class ReteEntity(override val eid: EID): Entity {
* Sets up [Rete] for use inside [body]
* Runs a coroutine which consumes changes made in db and commands to add or remove [QueryObserver]s
* */
suspend fun withRete(failWhenPropagationFailed: Boolean = false, body: suspend CoroutineScope.() -> Unit) {
suspend fun<T> withRete(failWhenPropagationFailed: Boolean = false, body: suspend CoroutineScope.() -> T): T {
val (commandsSender, commandsReceiver) = channels<Rete.Command>(Channel.UNLIMITED)
spannedScope("withRete") {
return spannedScope("withRete") {
val kernel = transactor()
kernel.subscribe { db, changes ->
val lastKnownDb = MutableStateFlow(db)
@@ -107,9 +107,10 @@ suspend fun withRete(failWhenPropagationFailed: Boolean = false, body: suspend C
}
withContext(rete) {
body()
}
change {
reteEntity.delete()
}.also {
change {
reteEntity.delete()
}
}
}
}

View File

@@ -19,18 +19,18 @@ internal fun attr(ident: String, schema: Schema): Attribute<*> =
* - [Optional]
* - [Many]
*/
sealed class Attributes<in E : Entity>(
sealed class Attributes<E : Entity>(
val namespace: String,
val module: String,
initial: Map<String, EntityAttribute<E, *>>
initial: Map<String, EntityAttribute<in E, *>>
) {
private val mutableAttrInfos: MutableMap<String, EntityAttribute<E, *>> = HashMap(initial)
private val mutableAttrInfos: MutableMap<String, EntityAttribute<in E, *>> = HashMap(initial)
/**
* [EntityAttribute]s defined by this [Attributes] by their ident: namespace/name
* */
internal val entityAttributes: Map<String, EntityAttribute<E, *>> get() = mutableAttrInfos
internal val entityAttributes: Map<String, EntityAttribute<in E, *>> get() = mutableAttrInfos
val attrs: List<Attribute<*>>
get() = entityAttributes.values.map { it.attr }
@@ -365,7 +365,7 @@ enum class RefFlags {
CASCADE_DELETE_BY
}
internal fun<E: Entity> merge(attrs: List<Attributes<E>>): Map<String, EntityAttribute<E, *>> =
internal fun<E: Entity> merge(attrs: List<Attributes<in E>>): Map<String, EntityAttribute<in E, *>> =
buildMap {
attrs.forEach { m ->
m.entityAttributes.forEach { (k, v) ->

View File

@@ -184,9 +184,9 @@ interface ChangeScope {
let { entity ->
context.run {
builder.build(object : EntityBuilder.Target<E> {
override fun <V : Any> set(attribute: Attributes<E>.Required<V>, value: V) = entity.set(attribute, value)
override fun <V : Any> set(attribute: Attributes<E>.Optional<V>, value: V?) = entity.set(attribute, value)
override fun <V : Any> set(attribute: Attributes<E>.Many<V>, values: Set<V>) = entity.set(attribute, values)
override fun <V : Any> set(attribute: Attributes<in E>.Required<V>, value: V) = entity.set(attribute, value)
override fun <V : Any> set(attribute: Attributes<in E>.Optional<V>, value: V?) = entity.set(attribute, value)
override fun <V : Any> set(attribute: Attributes<in E>.Many<V>, values: Set<V>) = entity.set(attribute, values)
})
}
}

View File

@@ -1,6 +1,8 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.jetbrains.rhizomedb
import kotlin.coroutines.cancellation.CancellationException
//fun getStack(): Throwable = Throwable("dbcontext creation stack")
/**
@@ -17,7 +19,8 @@ class DbContext<out QQ : Q>(
get() {
val q = _private_value
return when {
q is Throwable -> throw q
q is CancellationException -> throw CancellationException("DBContext is poisoned", q)
q is Throwable -> throw RuntimeException("DBContext is poisoned", q)
else -> q as QQ
}
}

View File

@@ -12,7 +12,7 @@ import kotlinx.serialization.builtins.serializer
* It's [EntityType] is [EntityAttribute.Companion]
* [EntityAttribute] is an [Entity.EntityObject] of [EntityAttribute] entity.
* */
sealed class EntityAttribute<in E : Entity, T : Any>(
sealed class EntityAttribute<E : Entity, T : Any>(
val ident: String,
val attr: Attribute<*>,
internal val serializerLazy: Lazy<KSerializer<T>>?,

View File

@@ -139,9 +139,9 @@ val <E : Entity> E.entityType: EntityType<E>
fun interface EntityBuilder<E : Entity> {
interface Target<E : Entity> {
operator fun <V : Any> set(attribute: Attributes<E>.Required<V>, value: V)
operator fun <V : Any> set(attribute: Attributes<E>.Optional<V>, value: V?)
operator fun <V : Any> set(attribute: Attributes<E>.Many<V>, values: Set<V>)
operator fun <V : Any> set(attribute: Attributes<in E>.Required<V>, value: V)
operator fun <V : Any> set(attribute: Attributes<in E>.Optional<V>, value: V?)
operator fun <V : Any> set(attribute: Attributes<in E>.Many<V>, values: Set<V>)
}
fun build(target: Target<E>)
@@ -155,7 +155,7 @@ internal fun <E : Entity> EntityType<E>.buildAttributes(builder: EntityBuilder<E
buildList {
val initializedAttrs = IntOpenHashSet()
builder.build(object : EntityBuilder.Target<E> {
private fun <V : Any> add(attribute: EntityAttribute<E, V>, value: V) {
private fun <V : Any> add(attribute: EntityAttribute<in E, V>, value: V) {
initializedAttrs.add(attribute.attr.eid)
@Suppress("UNCHECKED_CAST")
val attr = attribute.attr as Attribute<Any>
@@ -163,15 +163,15 @@ internal fun <E : Entity> EntityType<E>.buildAttributes(builder: EntityBuilder<E
add(attr to indexValue)
}
override fun <V : Any> set(attribute: Attributes<E>.Required<V>, value: V) {
override fun <V : Any> set(attribute: Attributes<in E>.Required<V>, value: V) {
add(attribute, value)
}
override fun <V : Any> set(attribute: Attributes<E>.Optional<V>, value: V?) {
override fun <V : Any> set(attribute: Attributes<in E>.Optional<V>, value: V?) {
value?.let { add(attribute, it) }
}
override fun <V : Any> set(attribute: Attributes<E>.Many<V>, values: Set<V>) {
override fun <V : Any> set(attribute: Attributes<in E>.Many<V>, values: Set<V>) {
for (v in values) {
add(attribute, v)
}
@@ -200,36 +200,4 @@ internal fun <E : Entity> EntityType<E>.buildAttributes(builder: EntityBuilder<E
}
}
/**
* [Mixin] allows to attach same attributes to multiple [EntityType]s
* Unlike [EntityType], [Mixin] has no identity and is not represeted in the database as [Entity].
* It's only job is to carry a set of attributes, which can be attached to [EntityType]s.
* One example of it is [Entity.Companion], which defines attributes universal to all [EntityType]s.
* */
abstract class Mixin<E : Entity>(
ident: String,
module: String,
vararg mixins: Mixin<in E>
) : Attributes<E>(ident, module, merge(mixins.toList())) {
/**
* Tell [Mixin] to use qualified name of the given [KClass] as [namespace]
* It may be useful to guarantee the uniqueness of the [namespace].
* But it could backfire for durable entities,
* if one renames the class, or moves it to other package.
* */
constructor(
ident: KClass<out Entity>,
vararg mixins: Mixin<in E>
) : this(requireNotNull(ident.qualifiedName), entityModule(ident), *mixins)
constructor(ident: KClass<*>, version: Int?, vararg mixins: Mixin<in E>) : this(
ident = ident.qualifiedName!! + if (version != null) ":$version" else "",
module = ident.java.module.name,
mixins = mixins,
)
override fun toString(): String = "Mixin($namespace)"
}
fun entityModule(entityClass: KClass<out Entity>): String = entityClass.java.module.name ?: "<unknown>"

View File

@@ -0,0 +1,35 @@
package com.jetbrains.rhizomedb
import kotlin.reflect.KClass
/**
* [Mixin] allows to attach same attributes to multiple [EntityType]s
* Unlike [EntityType], [Mixin] has no identity and is not represeted in the database as [Entity].
* It's only job is to carry a set of attributes, which can be attached to [EntityType]s.
* One example of it is [Entity.Companion], which defines attributes universal to all [EntityType]s.
* */
abstract class Mixin<E : Entity>(
ident: String,
module: String,
vararg mixins: Mixin<in E>
) : Attributes<E>(ident, module, merge(mixins.toList())) {
/**
* Tell [Mixin] to use qualified name of the given [KClass] as [namespace]
* It may be useful to guarantee the uniqueness of the [namespace].
* But it could backfire for durable entities,
* if one renames the class, or moves it to other package.
* */
constructor(
ident: KClass<out Entity>,
vararg mixins: Mixin<in E>
) : this(requireNotNull(ident.qualifiedName), entityModule(ident), *mixins)
constructor(ident: KClass<*>, version: Int?, vararg mixins: Mixin<in E>) : this(
ident = ident.qualifiedName!! + if (version != null) ":$version" else "",
module = ident.java.module.name,
mixins = mixins,
)
override fun toString(): String = "Mixin($namespace)"
}

View File

@@ -43,13 +43,13 @@ private data class OutgoingRequest(
private data class OngoingRequest(val request: OutgoingRequest, val span: Span)
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
suspend fun rpcClient(
suspend fun<T> rpcClient(
transport: Transport<TransportMessage>,
serialization: () -> Serialization,
origin: UID,
requestInterceptor: RpcInterceptor = RpcInterceptor,
body: suspend CoroutineScope.(RpcClient) -> Unit,
) {
body: suspend CoroutineScope.(RpcClient) -> T,
): T =
newSingleThreadContext("rpc-client-$origin").use { executor ->
withSupervisor { supervisor ->
val client = RpcClient(coroutineScope = supervisor + supervisor.coroutineNameAppended("RpcClient"),
@@ -63,7 +63,6 @@ suspend fun rpcClient(
}
}
}
}
class RpcClient internal constructor(
private val coroutineScope: CoroutineScope,

View File

@@ -1,198 +0,0 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package fleet.util.async
import fleet.tracing.spannedScope
import kotlinx.coroutines.*
import java.lang.RuntimeException
import java.util.concurrent.ConcurrentHashMap
@DslMarker
annotation class ResourceDsl
@ResourceDsl
sealed interface ExperimentalResourceScope : CoroutineScope {
suspend fun <T> Shared<T>.await(): T
/**
* Launces a coroutine that will collect resource returned by [body].
* Coroutine will be launched on [this], it will inherit coroutine context and become a child of this scope (which might prevent it from shutting down).
* [ShareScope] allows one to establish dependencies on other shared resources so that we know in which order to shut them down.
* */
fun <T> CoroutineScope.share(debugName: String? = null, body: ShareScope.() -> ExperimentalResource<T>): Shared<T>
/**
* Simplistic shortcut to register a job for cancellation on shutdown of this [ExperimentalResourceScope].
* Cancellation will be triggered **after** all coroutines launched with [share].
* */
fun <T : Job> T.cancelOnExit(): T
fun shutdown()
}
// at the moment of shutdown we will complete this deferred with a set of jobs to wait for
private typealias ShutdownSignal = CompletableDeferred<Set<Job>>
private class ResourceScopeImpl(scope: CoroutineScope) : ExperimentalResourceScope, CoroutineScope by scope {
private val shareds = ConcurrentHashMap.newKeySet<Shared<*>>()
private val cancelOnExit = ConcurrentHashMap.newKeySet<Job>()
override suspend fun <T> Shared<T>.await(): T =
let { shared ->
spannedScope("awaiting ${shared.debugName}") {
shared.deferred.await()
}
}
override fun <T> CoroutineScope.share(debugName: String?, body: ShareScope.() -> ExperimentalResource<T>): Shared<T> =
let { scope ->
val dependencies = hashSetOf<CompletableDeferred<Set<Job>>>()
val resource = object : ShareScope {
override fun <T> Shared<T>.require(): Deferred<T> =
let { dependency ->
dependency.deferred.also {
// if dependency is launced outside of current resource scope, we should not bother with dependency tracking
if (dependency.resourceScope == this@ResourceScopeImpl) {
dependencies.add(dependency.shutdownSignal)
}
}
}
}.run(body)
val deferred = CompletableDeferred<T>()
val shutdownSignal = CompletableDeferred<Set<Job>>()
val job = scope.launch {
resource.collect { t ->
deferred.complete(t)
for (dep in shutdownSignal.await()) {
dep.join()
}
}
}.apply {
invokeOnCompletion { cause ->
if (!deferred.isCompleted) {
deferred.completeExceptionally(cause ?: RuntimeException("unreachable"))
}
}
}
val shared = Shared(
resourceScope = this@ResourceScopeImpl,
deferred = deferred,
shutdownSignal = shutdownSignal,
job = job,
dependencies = dependencies,
debugName = debugName,
)
shareds.add(shared)
return shared
}
override fun <T : Job> T.cancelOnExit(): T =
also { job ->
cancelOnExit.add(job)
job.invokeOnCompletion { cancelOnExit.remove(job) }
}
override fun shutdown() {
val shutdownOrder = buildMap<CompletableDeferred<Set<Job>>, MutableSet<Job>> {
shareds.forEach { shared ->
// schedule shutdown even if shared has no dependencies
getOrPut(shared.shutdownSignal) { mutableSetOf() }
// each dependency must wait for completion of our job
shared.dependencies.forEach { dep ->
getOrPut(dep) { mutableSetOf() }.add(shared.job)
}
}
}
shutdownOrder.forEach { (shutdownSignal, jobsToWaitFor) ->
shutdownSignal.complete(jobsToWaitFor)
}
for (job in cancelOnExit) {
job.cancel()
}
}
}
/**
* EXPERIMENTAL NOT SAFE TO USE
* */
suspend fun <T> resourceScope(body: suspend ExperimentalResourceScope.() -> T): T =
coroutineScope {
ResourceScopeImpl(this).run { body() }
}
class Shared<T> internal constructor(
internal val resourceScope: ExperimentalResourceScope,
internal val deferred: Deferred<T>,
// will be completed with dependency set at the moment of shutdown
internal val shutdownSignal: ShutdownSignal,
internal val job: Job,
// shareds that have to wait for us
internal val dependencies: HashSet<ShutdownSignal>,
internal val debugName: String?,
)
@ResourceDsl
interface ShareScope {
fun <T> Shared<T>.require(): Deferred<T>
}
fun <T> ShareScope.require(dependency: Shared<T>): Deferred<T> = dependency.require()
class ExperimentalResource<T> {
/**
* Marker interface to prevent resource-related receivers from leaking into resource body.
* */
@ResourceDsl
interface Scope : CoroutineScope
private val producer: suspend (Consumer<T>) -> Unit
constructor(value: T) {
producer = { consumer -> consumer(value) }
}
/**
* [body] **must** call given [Consumer] exactly once, failure to do so will result in exception.
* [Consumer] will suspend for the time the resource is in use. Once it returns it is time to perform shutdown.
* [body] is responsible for completion of all spawned jobs, they will not be cancelled automatically.
* */
constructor(body: suspend Scope.(Consumer<T>) -> Unit) {
producer = { consumer: Consumer<T> ->
var emitted = false
coroutineScope {
object : Scope, CoroutineScope by this {}
.run {
body { t ->
require(!emitted) { "double emission" }
emitted = true
consumer(t)
}
}
}
}
}
companion object {
private val NONE = Any()
}
suspend fun <U> use(body: suspend CoroutineScope.(Deferred<T>) -> U): U =
coroutineScope {
val deferredResource = CompletableDeferred<T>()
val shutdown = Job()
// TODO should it be cancellable? optionally cancellable?
launch(start = CoroutineStart.UNDISPATCHED) {
producer { t ->
deferredResource.complete(t)
shutdown.join()
}
}.invokeOnCompletion {
deferredResource.completeExceptionally(RuntimeException("Resource didn't emit"))
}
coroutineScope { body(deferredResource) }.also { shutdown.complete() }
}
fun <T> CoroutineScope.launch(): Deferred<T> = TODO("cancellable? Handle with stop?")
}
suspend fun <T, U> ExperimentalResource<T>.collect(body: suspend CoroutineScope.(T) -> U): U = use { d -> body(d.await()) }

View File

@@ -64,11 +64,10 @@ interface HandleScope : CoroutineScope {
fun <T> handle(launcher: Launcher<T>): Handle<T>
}
suspend fun handleScope(body: suspend HandleScope.() -> Unit) {
suspend fun<T> handleScope(body: suspend HandleScope.() -> T): T =
supervisorScope {
handleScopeImpl(this, body)
}
}
//@fleet.kernel.plugins.InternalInPluginModules(where = ["fleet.testlib"])
suspend fun handleScopeNonSupervising(body: suspend HandleScope.() -> Unit) {
@@ -77,9 +76,9 @@ suspend fun handleScopeNonSupervising(body: suspend HandleScope.() -> Unit) {
}
}
private suspend fun handleScopeImpl(outerScope: CoroutineScope, body: suspend HandleScope.() -> Unit) {
private suspend fun<T> handleScopeImpl(outerScope: CoroutineScope, body: suspend HandleScope.() -> T): T {
val handles = AtomicRef(BifurcanSet<Handle<*>>())
try {
return try {
coroutineScope {
val context = coroutineContext
object : HandleScope {

View File

@@ -8,8 +8,6 @@ import kotlinx.coroutines.*
import java.lang.RuntimeException
import kotlin.coroutines.CoroutineContext
typealias Consumer<T> = suspend (T) -> Unit
/**
* Something backed by a coroutine tree.
*
@@ -49,9 +47,12 @@ interface Resource<out T> {
}
/**
* @see [Resource]
* Main [Resource] constructor.
* [producer] must invoke consumer with the resource instance.
* return type [Consumed], can only be constructed from consumer invocation.
* [Consumed] is used as a proof of invocation to prevent accidental null-punning and consequent leaked coroutines
* */
fun <T> resource(producer: suspend CoroutineScope.(Consumer<T>) -> Unit): Resource<T> =
fun <T> resource(producer: suspend CoroutineScope.(consumer: Consumer<T>) -> Consumed): Resource<T> =
object : Resource<T> {
override suspend fun <U> use(body: suspend CoroutineScope.(T) -> U): U =
coroutineScope {
@@ -62,6 +63,7 @@ fun <T> resource(producer: suspend CoroutineScope.(Consumer<T>) -> Unit): Resour
producer { t ->
check(deferred.complete(t)) { "Double emission" }
shutdown.join()
Proof
}
}.invokeOnCompletion { cause ->
deferred.completeExceptionally(cause ?: RuntimeException("Resource didn't emit"))
@@ -70,6 +72,15 @@ fun <T> resource(producer: suspend CoroutineScope.(Consumer<T>) -> Unit): Resour
}
}
/**
* A proof that a consumer is invoked
* */
sealed interface Consumed
internal data object Proof: Consumed
typealias Consumer<T> = suspend (T) -> Consumed
fun <T> resourceOf(value: T): Resource<T> =
object : Resource<T> {
override suspend fun <U> use(body: suspend CoroutineScope.(T) -> U): U =

View File

@@ -35,10 +35,10 @@ suspend fun <T> withSupervisor(body: suspend CoroutineScope.(scope: CoroutineSco
}
}
suspend fun withCoroutineScope(body: suspend CoroutineScope.(scope: CoroutineScope) -> Unit) {
suspend fun<T> withCoroutineScope(body: suspend CoroutineScope.(scope: CoroutineScope) -> T): T {
val context = currentCoroutineContext()
val job = Job(context.job)
try {
return try {
coroutineScope { body(CoroutineScope(context + job)) }
}
finally {