diff --git a/fleet/kernel/srcCommonMain/fleet/kernel/ReteExt.kt b/fleet/kernel/srcCommonMain/fleet/kernel/ReteExt.kt index 0dfd510c7804..30b6e3870b19 100644 --- a/fleet/kernel/srcCommonMain/fleet/kernel/ReteExt.kt +++ b/fleet/kernel/srcCommonMain/fleet/kernel/ReteExt.kt @@ -41,16 +41,18 @@ private sealed class State11 { fun Entity.onDispose(rete: Rete, action: () -> Unit = {}): DisposableHandle = let { entity -> when { - !exists() -> { + !entity.exists() -> { action() DisposableHandle { } } else -> { val state = AtomicReference(State11.Initial) - val handle = existence().observe(rete = rete, - contextMatches = null, - queryTracingKey = null, - dbTimestamp = DbContext.threadBound.impl.timestamp + 1) { matches -> + val handle = entity.existence().observe( + rete = rete, + contextMatches = null, + queryTracingKey = null, + dbTimestamp = DbContext.threadBound.impl.timestamp + 1, + ) { matches -> fun actionPrime() { when (val witness = state.compareAndExchange(State11.Initial, State11.ActionInvoked)) { is State11.Initial -> { @@ -94,7 +96,7 @@ suspend fun withCondition(condition: () -> Boolean, body: suspend CoroutineS suspend fun waitFor(p: () -> Boolean) { if (p()) return - val result = queryAsFlow { p() }.firstOrNull { it } + val result = query { p() }.asValuesFlow().firstOrNull { it } // query could be terminated before our coroutine, null means we are in shutdown if (result == null) { throw CancellationException("Query was terminated") @@ -115,7 +117,7 @@ suspend fun waitForNotNullWithTimeout(timeMillis: Long = 30000L, p: () -> T? suspend fun waitForNotNullWithTimeoutOrNull(timeMillis: Long = 30000L, p: () -> T?): T? { val value = p() if (value != null) return value - return withTimeoutOrNull(timeMillis) { queryAsFlow { p() }.firstOrNull { it != null } } + return withTimeoutOrNull(timeMillis) { query { p() }.asValuesFlow().firstOrNull { it != null } } } suspend fun waitForNotNull(p: () -> T?): T { diff --git a/fleet/kernel/srcCommonMain/fleet/kernel/rete/Queries.kt b/fleet/kernel/srcCommonMain/fleet/kernel/rete/Queries.kt index 5f9c64de946a..250cb2a9f8d4 100644 --- a/fleet/kernel/srcCommonMain/fleet/kernel/rete/Queries.kt +++ b/fleet/kernel/srcCommonMain/fleet/kernel/rete/Queries.kt @@ -5,6 +5,7 @@ import com.jetbrains.rhizomedb.* import fleet.kernel.rete.impl.* import fleet.kernel.rete.impl.DummyQueryScope import fleet.kernel.rete.impl.distinct +import kotlinx.coroutines.flow.first /** * Emits a single unconditional match with the given value [t] @@ -154,6 +155,17 @@ fun Query.filter(p: (T) -> Boolean): Query = inline fun Query<*>.filterIsInstance(): Query = flatMap { t -> if (t is R) setOf(t) else emptySet() } +/** + * returns first value of [Query] [Match]es + * */ +suspend fun Query.first(): T = asValuesFlow().first() + +/** + * returns first value of [Query] [Match]es that satisfies [p] to true, + * [p] is being read-tracked + * */ +suspend fun Query.first(p: (T) -> Boolean): T = filter(p).first() + /** * version of [filter] working with [Match] * [p] has to be a *pure* function of a database @@ -454,5 +466,5 @@ fun Query.intern(firstKey: Any, vararg keys: Any): Query = internImpl(listOf(firstKey, *keys)) @PublishedApi -internal fun Query.internImpl(key: Any): Query = +internal fun Query.internImpl(key: Any): Query = InternedQuery(key, this) diff --git a/fleet/kernel/srcCommonMain/fleet/kernel/rete/Rete.kt b/fleet/kernel/srcCommonMain/fleet/kernel/rete/Rete.kt index 4af29dd29811..df4c1dfc0526 100644 --- a/fleet/kernel/srcCommonMain/fleet/kernel/rete/Rete.kt +++ b/fleet/kernel/srcCommonMain/fleet/kernel/rete/Rete.kt @@ -495,17 +495,6 @@ suspend fun Query.collectLatest(f: suspend CoroutineScope.(T) -> Unit) { matchesFlow().collectLatestMatch(f) } -/** - * returns first value of [Query] [Match]es - * */ -suspend fun Query.first(): T = matchesFlow().first().value - -/** - * returns first value of [Query] [Match]es that satisfies [p] to true, - * [p] is being read-tracked - * */ -suspend fun Query.first(p: (T) -> Boolean): T = filter(p).first() - private val CoroutineContext.rete: Rete get() = requireNotNull(this[Rete]) { "no Rete on context $this" }