[fleet] usages of queryAsFlow

GitOrigin-RevId: 699d1330921200fdfe0e4291aa9dc26ce93ac5a2
This commit is contained in:
Alexander Shparun
2025-06-05 23:07:05 +02:00
committed by intellij-monorepo-bot
parent 0e751e0961
commit 02118dde3c
3 changed files with 22 additions and 19 deletions

View File

@@ -41,16 +41,18 @@ private sealed class State11 {
fun Entity.onDispose(rete: Rete, action: () -> Unit = {}): DisposableHandle =
let { entity ->
when {
!exists() -> {
!entity.exists() -> {
action()
DisposableHandle { }
}
else -> {
val state = AtomicReference<State11>(State11.Initial)
val handle = existence().observe(rete = rete,
contextMatches = null,
queryTracingKey = null,
dbTimestamp = DbContext.threadBound.impl.timestamp + 1) { matches ->
val handle = entity.existence().observe(
rete = rete,
contextMatches = null,
queryTracingKey = null,
dbTimestamp = DbContext.threadBound.impl.timestamp + 1,
) { matches ->
fun actionPrime() {
when (val witness = state.compareAndExchange(State11.Initial, State11.ActionInvoked)) {
is State11.Initial -> {
@@ -94,7 +96,7 @@ suspend fun <T> withCondition(condition: () -> Boolean, body: suspend CoroutineS
suspend fun waitFor(p: () -> Boolean) {
if (p()) return
val result = queryAsFlow { p() }.firstOrNull { it }
val result = query { p() }.asValuesFlow().firstOrNull { it }
// query could be terminated before our coroutine, null means we are in shutdown
if (result == null) {
throw CancellationException("Query was terminated")
@@ -115,7 +117,7 @@ suspend fun <T> waitForNotNullWithTimeout(timeMillis: Long = 30000L, p: () -> T?
suspend fun <T> waitForNotNullWithTimeoutOrNull(timeMillis: Long = 30000L, p: () -> T?): T? {
val value = p()
if (value != null) return value
return withTimeoutOrNull(timeMillis) { queryAsFlow { p() }.firstOrNull { it != null } }
return withTimeoutOrNull(timeMillis) { query { p() }.asValuesFlow().firstOrNull { it != null } }
}
suspend fun <T> waitForNotNull(p: () -> T?): T {

View File

@@ -5,6 +5,7 @@ import com.jetbrains.rhizomedb.*
import fleet.kernel.rete.impl.*
import fleet.kernel.rete.impl.DummyQueryScope
import fleet.kernel.rete.impl.distinct
import kotlinx.coroutines.flow.first
/**
* Emits a single unconditional match with the given value [t]
@@ -154,6 +155,17 @@ fun <T> Query<T>.filter(p: (T) -> Boolean): Query<T> =
inline fun <reified R> Query<*>.filterIsInstance(): Query<R> =
flatMap { t -> if (t is R) setOf(t) else emptySet() }
/**
* returns first value of [Query] [Match]es
* */
suspend fun <T> Query<T>.first(): T = asValuesFlow().first()
/**
* returns first value of [Query] [Match]es that satisfies [p] to true,
* [p] is being read-tracked
* */
suspend fun <T> Query<T>.first(p: (T) -> Boolean): T = filter(p).first()
/**
* version of [filter] working with [Match]
* [p] has to be a *pure* function of a database
@@ -454,5 +466,5 @@ fun <T> Query<T>.intern(firstKey: Any, vararg keys: Any): Query<T> =
internImpl(listOf(firstKey, *keys))
@PublishedApi
internal fun <T> Query<T>.internImpl(key: Any): Query<T> =
internal fun <T> Query<T>.internImpl(key: Any): Query<T> =
InternedQuery(key, this)

View File

@@ -495,17 +495,6 @@ suspend fun <T> Query<T>.collectLatest(f: suspend CoroutineScope.(T) -> Unit) {
matchesFlow().collectLatestMatch(f)
}
/**
* returns first value of [Query] [Match]es
* */
suspend fun <T> Query<T>.first(): T = matchesFlow().first().value
/**
* returns first value of [Query] [Match]es that satisfies [p] to true,
* [p] is being read-tracked
* */
suspend fun <T> Query<T>.first(p: (T) -> Boolean): T = filter(p).first()
private val CoroutineContext.rete: Rete
get() = requireNotNull(this[Rete]) { "no Rete on context $this" }