[Workspace Model] [IDEA-336944] Implement reactive read for workspace model

GitOrigin-RevId: 11646f30d0e44751c68f8f4dd6d32d4b399d6dfe
This commit is contained in:
Alex Plate
2023-12-13 11:57:43 +02:00
committed by intellij-monorepo-bot
parent ff2d80b4da
commit 209f9cd83f
18 changed files with 526 additions and 111 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.packaging.impl.run
import com.intellij.execution.BeforeRunTaskProvider

View File

@@ -7,6 +7,10 @@ import com.intellij.platform.backend.workspace.WorkspaceModel
import com.intellij.platform.workspace.storage.ImmutableEntityStorage
import com.intellij.platform.workspace.storage.MutableEntityStorage
import com.intellij.platform.workspace.storage.impl.VersionedEntityStorageImpl
import com.intellij.platform.workspace.storage.impl.query.Diff
import com.intellij.platform.workspace.storage.query.CollectionQuery
import com.intellij.platform.workspace.storage.query.StorageQuery
import kotlinx.coroutines.flow.Flow
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.NonNls
@@ -69,6 +73,15 @@ public interface WorkspaceModelInternal: WorkspaceModel {
* @see [WorkspaceModel.getBuilderSnapshot]
*/
public fun replaceProjectModel(replacement: StorageReplacement): Boolean
@ApiStatus.Experimental
public suspend fun <T> flowOfQuery(query: StorageQuery<T>): Flow<T>
@ApiStatus.Experimental
public suspend fun <T> flowOfNewElements(query: CollectionQuery<T>): Flow<T>
@ApiStatus.Experimental
public suspend fun <T> flowOfDiff(query: CollectionQuery<T>): Flow<Diff<T>>
}
@get:ApiStatus.Internal

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.backend.workspace
import com.intellij.openapi.util.registry.Registry

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2022 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.module
import com.intellij.openapi.components.service

View File

@@ -20,11 +20,15 @@ import com.intellij.platform.workspace.storage.impl.VersionedEntityStorageImpl
import com.intellij.platform.workspace.storage.impl.assertConsistency
import com.intellij.platform.workspace.storage.instrumentation.EntityStorageInstrumentationApi
import com.intellij.platform.workspace.storage.instrumentation.MutableEntityStorageInstrumentation
import com.intellij.platform.workspace.storage.impl.query.Diff
import com.intellij.platform.workspace.storage.query.CollectionQuery
import com.intellij.platform.workspace.storage.query.StorageQuery
import com.intellij.platform.workspace.storage.url.VirtualFileUrlManager
import com.intellij.serviceContainer.AlreadyDisposedException
import com.intellij.workspaceModel.core.fileIndex.EntityStorageKind
import com.intellij.workspaceModel.core.fileIndex.WorkspaceFileIndex
import com.intellij.workspaceModel.core.fileIndex.impl.WorkspaceFileIndexImpl
import com.intellij.workspaceModel.ide.impl.reactive.WmReactive
import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
@@ -42,6 +46,8 @@ open class WorkspaceModelImpl(private val project: Project, private val cs: Coro
var loadedFromCache = false
protected set
private val reactive = WmReactive(this)
final override val entityStorage: VersionedEntityStorageImpl
private val unloadedEntitiesStorage: VersionedEntityStorageImpl
@@ -381,6 +387,10 @@ open class WorkspaceModelImpl(private val project: Project, private val cs: Coro
return true
}
override suspend fun <T> flowOfQuery(query: StorageQuery<T>): Flow<T> = reactive.flowOfQuery(query)
override suspend fun <T> flowOfNewElements(query: CollectionQuery<T>): Flow<T> = reactive.flowOfNewElements(query)
override suspend fun <T> flowOfDiff(query: CollectionQuery<T>): Flow<Diff<T>> = reactive.flowOfDiff(query)
final override fun dispose() = Unit
private fun initializeBridges(change: Map<Class<*>, List<EntityChange<*>>>, builder: MutableEntityStorage) {

View File

@@ -0,0 +1,89 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.workspaceModel.ide.impl.reactive
import com.intellij.platform.workspace.storage.impl.cache.CacheProcessingStatus
import com.intellij.platform.workspace.storage.impl.cache.ChangeOnVersionedChange
import com.intellij.platform.workspace.storage.impl.cache.cache
import com.intellij.platform.workspace.storage.impl.query.Diff
import com.intellij.platform.workspace.storage.instrumentation.EntityStorageInstrumentationApi
import com.intellij.platform.workspace.storage.instrumentation.ImmutableEntityStorageInstrumentation
import com.intellij.platform.workspace.storage.query.CollectionQuery
import com.intellij.platform.workspace.storage.query.StorageQuery
import com.intellij.workspaceModel.ide.impl.WorkspaceModelImpl
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
@OptIn(EntityStorageInstrumentationApi::class)
class WmReactive(private val workspaceModel: WorkspaceModelImpl) {
suspend fun <T> flowOfQuery(query: StorageQuery<T>): Flow<T> {
return flow {
workspaceModel.subscribe { firstSnapshot, changeChannel ->
var cache = cache()
val res = cache.cached(query, firstSnapshot as ImmutableEntityStorageInstrumentation, null)
emit(res.value)
changeChannel
.collect {
val newCache = cache()
val changes = ChangeOnVersionedChange(it.getAllChanges())
newCache.pullCache(it.storageAfter, cache, changes)
val cachedValue = newCache.cached(query,
it.storageAfter as ImmutableEntityStorageInstrumentation,
it.storageBefore as ImmutableEntityStorageInstrumentation)
if (cachedValue.cacheProcessStatus is CacheProcessingStatus.ValueChanged) {
emit(cachedValue.value)
}
cache = newCache
}
}
}
}
suspend fun <T> flowOfNewElements(query: CollectionQuery<T>): Flow<T> {
return flow {
workspaceModel.subscribe { firstSnapshot, changeChannel ->
var cache = cache()
val res = cache.diff(query, firstSnapshot as ImmutableEntityStorageInstrumentation, null)
res.value.added.forEach {
emit(it)
}
changeChannel
.collect { change ->
val newCache = cache()
val changes = ChangeOnVersionedChange(change.getAllChanges())
newCache.pullCache(change.storageAfter, cache, changes)
val cachedValue = newCache.diff(query,
change.storageAfter as ImmutableEntityStorageInstrumentation,
change.storageBefore as ImmutableEntityStorageInstrumentation)
if (cachedValue.cacheProcessStatus is CacheProcessingStatus.ValueChanged) {
val newState = cachedValue.value
newState.added.forEach { emit(it) }
}
cache = newCache
}
}
}
}
suspend fun <T> flowOfDiff(query: CollectionQuery<T>): Flow<Diff<T>> {
return flow {
workspaceModel.subscribe { firstSnapshot, changeChannel ->
var cache = cache()
val res = cache.diff(query, firstSnapshot as ImmutableEntityStorageInstrumentation, null)
emit(res.value)
changeChannel
.collect { change ->
val newCache = cache()
val changes = ChangeOnVersionedChange(change.getAllChanges())
newCache.pullCache(change.storageAfter, cache, changes)
val cachedValue = newCache.diff(query,
change.storageAfter as ImmutableEntityStorageInstrumentation,
change.storageBefore as ImmutableEntityStorageInstrumentation)
if (cachedValue.cacheProcessStatus is CacheProcessingStatus.ValueChanged) {
emit(cachedValue.value)
}
cache = newCache
}
}
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.workspaceModel.ide
import com.intellij.openapi.application.EDT

View File

@@ -80,7 +80,7 @@ internal open class ImmutableEntityStorageImpl(
private val entityCache: Long2ObjectMap<WorkspaceEntity> = Long2ObjectOpenHashMap() // guarded by entityCache
override fun <T> cached(query: StorageQuery<T>): T {
return snapshotCache.cached(query, this)
return snapshotCache.cached(query, this, null).value
}
@Suppress("UNCHECKED_CAST")

View File

@@ -1,16 +1,20 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.workspace.storage.impl.cache
import com.intellij.platform.workspace.storage.EntityChange
import com.intellij.platform.workspace.storage.ExternalMappingKey
import com.intellij.platform.workspace.storage.ImmutableEntityStorage
import com.intellij.platform.workspace.storage.impl.ChangeEntry
import com.intellij.platform.workspace.storage.impl.EntityId
import com.intellij.platform.workspace.storage.impl.WorkspaceBuilderChangeLog
import com.intellij.platform.workspace.storage.impl.asBase
import com.intellij.platform.workspace.storage.impl.cache.CacheResetTracker.cacheReset
import com.intellij.platform.workspace.storage.impl.query.MatchSet
import com.intellij.platform.workspace.storage.impl.query.Diff
import com.intellij.platform.workspace.storage.impl.query.MatchList
import com.intellij.platform.workspace.storage.impl.query.MatchWithEntityId
import com.intellij.platform.workspace.storage.instrumentation.EntityStorageInstrumentationApi
import com.intellij.platform.workspace.storage.instrumentation.ImmutableEntityStorageInstrumentation
import com.intellij.platform.workspace.storage.query.CollectionQuery
import com.intellij.platform.workspace.storage.query.StorageQuery
import com.intellij.platform.workspace.storage.trace.ReadTrace
import com.intellij.platform.workspace.storage.trace.ReadTraceHashSet
@@ -19,6 +23,22 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.TestOnly
public data class CachedValue<T>(
public val cacheProcessStatus: CacheProcessingStatus,
public val value: T,
)
public sealed interface CacheProcessingStatus {
public sealed interface Hit: CacheProcessingStatus
public sealed interface ValueChanged: CacheProcessingStatus
}
internal data object CacheHit: CacheProcessingStatus.Hit
internal data object CacheHitInSynchronized: CacheProcessingStatus.Hit
internal data object CacheHitNotAffectedByChanges: CacheProcessingStatus.Hit
internal data object IncrementalUpdate: CacheProcessingStatus.ValueChanged
internal data object Initialization: CacheProcessingStatus.ValueChanged
@OptIn(EntityStorageInstrumentationApi::class)
@ApiStatus.Experimental
@ApiStatus.Internal
@@ -26,8 +46,21 @@ public interface TracedSnapshotCache {
/**
* Thread-safe
*
* [prevStorage] should always be null for calculation of the cache. It can be not null if we perform reactive update
*/
public fun <T> cached(query: StorageQuery<T>, snapshot: ImmutableEntityStorageInstrumentation): T
public fun <T> cached(query: StorageQuery<T>,
snapshot: ImmutableEntityStorageInstrumentation,
prevStorage: ImmutableEntityStorageInstrumentation?): CachedValue<T>
/**
* Thread-safe
*
* [prevStorage] should always be null for calculation of the cache. It can be not null if we perform reactive update
*/
public fun <T> diff(query: CollectionQuery<T>,
snapshot: ImmutableEntityStorageInstrumentation,
prevStorage: ImmutableEntityStorageInstrumentation?): CachedValue<Diff<T>>
/**
* Not thread-safe
@@ -45,6 +78,10 @@ public interface TracedSnapshotCache {
}
}
public fun cache(): TracedSnapshotCache {
return TracedSnapshotCacheImpl()
}
@ApiStatus.Experimental
@ApiStatus.Internal
public sealed interface EntityStorageChange {
@@ -55,12 +92,14 @@ public sealed interface EntityStorageChange {
internal fun EntityStorageChange.createTraces(snapshot: ImmutableEntityStorageInstrumentation): ReadTraceHashSet {
return when (this) {
is ChangeOnWorkspaceBuilderChangeLog -> this.createTraces(snapshot)
is ChangeOnVersionedChange -> this.createTraces(snapshot)
}
}
internal fun EntityStorageChange.makeTokensForDiff(): MatchSet {
internal fun EntityStorageChange.makeTokensForDiff(): MatchList {
return when (this) {
is ChangeOnWorkspaceBuilderChangeLog -> this.makeTokensForDiff()
is ChangeOnVersionedChange -> this.makeTokensForDiff()
}
}
@@ -78,10 +117,48 @@ internal fun List<EntityStorageChange>.collapse(): EntityStorageChange {
}
ChangeOnWorkspaceBuilderChangeLog(targetChangelog, targetMap)
}
is ChangeOnVersionedChange -> {
if (this.size > 1) error("We should not collect more than one changelog")
firstChange
}
}
return target
}
public class ChangeOnVersionedChange(
private val changes: Sequence<EntityChange<*>>,
) : EntityStorageChange {
override val size: Int
get() = 0 // We should not collect more than one changelog, so there is no need to analyze the size
@OptIn(EntityStorageInstrumentationApi::class)
internal fun createTraces(snapshot: ImmutableEntityStorageInstrumentation): ReadTraceHashSet = changes.toTraces(snapshot)
internal fun makeTokensForDiff(): MatchList {
val matchList = MatchList()
val createdAddTokens = LongOpenHashSet()
val createdRemovedTokens = LongOpenHashSet()
changes.forEach { change ->
val entityId = change.newEntity?.asBase()?.id ?:change.oldEntity?.asBase()?.id!!
when (change) {
is EntityChange.Added<*> -> {
if (createdAddTokens.add(entityId)) matchList.addedMatch(MatchWithEntityId(entityId, null))
}
is EntityChange.Removed<*> -> {
if (createdRemovedTokens.add(entityId)) matchList.removedMatch(MatchWithEntityId(entityId, null))
}
is EntityChange.Replaced<*> -> {
if (createdRemovedTokens.add(entityId)) matchList.removedMatch(MatchWithEntityId(entityId, null))
if (createdAddTokens.add(entityId)) matchList.addedMatch(MatchWithEntityId(entityId, null))
}
}
}
return matchList
}
}
internal class ChangeOnWorkspaceBuilderChangeLog(
private val changes: WorkspaceBuilderChangeLog,
private val externalMappingChanges: Map<ExternalMappingKey<*>, MutableSet<EntityId>>,
@@ -111,34 +188,34 @@ internal class ChangeOnWorkspaceBuilderChangeLog(
return newTraces
}
internal fun makeTokensForDiff(): MatchSet {
val matchSet = MatchSet()
internal fun makeTokensForDiff(): MatchList {
val matchList = MatchList()
val createdAddTokens = LongOpenHashSet()
val createdRemovedTokens = LongOpenHashSet()
changes.changeLog.forEach { (entityId, change) ->
when (change) {
is ChangeEntry.AddEntity -> {
if (createdAddTokens.add(entityId)) matchSet.addedMatch(MatchWithEntityId(entityId, null))
if (createdAddTokens.add(entityId)) matchList.addedMatch(MatchWithEntityId(entityId, null))
}
is ChangeEntry.RemoveEntity -> {
if (createdRemovedTokens.add(entityId)) matchSet.removedMatch(MatchWithEntityId(entityId, null))
if (createdRemovedTokens.add(entityId)) matchList.removedMatch(MatchWithEntityId(entityId, null))
}
is ChangeEntry.ReplaceEntity -> {
if (createdRemovedTokens.add(entityId)) matchSet.removedMatch(MatchWithEntityId(entityId, null))
if (createdAddTokens.add(entityId)) matchSet.addedMatch(MatchWithEntityId(entityId, null))
if (createdRemovedTokens.add(entityId)) matchList.removedMatch(MatchWithEntityId(entityId, null))
if (createdAddTokens.add(entityId)) matchList.addedMatch(MatchWithEntityId(entityId, null))
}
}
}
externalMappingChanges.values.forEach { affectedIds ->
affectedIds.forEach { entityId ->
if (createdRemovedTokens.add(entityId)) matchSet.removedMatch(MatchWithEntityId(entityId, null))
if (createdAddTokens.add(entityId)) matchSet.addedMatch(MatchWithEntityId(entityId, null))
if (createdRemovedTokens.add(entityId)) matchList.removedMatch(MatchWithEntityId(entityId, null))
if (createdAddTokens.add(entityId)) matchList.addedMatch(MatchWithEntityId(entityId, null))
}
}
return matchSet
return matchList
}
}

View File

@@ -7,10 +7,13 @@ import com.intellij.platform.workspace.storage.impl.query.*
import com.intellij.platform.workspace.storage.impl.trace.ReadTraceIndex
import com.intellij.platform.workspace.storage.instrumentation.EntityStorageInstrumentationApi
import com.intellij.platform.workspace.storage.instrumentation.ImmutableEntityStorageInstrumentation
import com.intellij.platform.workspace.storage.query.CollectionQuery
import com.intellij.platform.workspace.storage.query.StorageQuery
import com.intellij.platform.workspace.storage.query.compile
import com.intellij.platform.workspace.storage.query.trackDiff
import com.intellij.platform.workspace.storage.trace.ReadTraceHashSet
import org.jetbrains.annotations.TestOnly
import java.util.*
internal data class CellUpdateInfo(
val chainId: QueryId,
@@ -58,10 +61,11 @@ internal sealed interface UpdateType {
internal class PropagationResult<T>(
val newCell: Cell<T>,
val matchSet: MatchSet,
val matchList: MatchList,
val subscriptions: List<Pair<ReadTraceHashSet, UpdateType>>,
)
@OptIn(EntityStorageInstrumentationApi::class)
internal class TracedSnapshotCacheImpl : TracedSnapshotCache {
private val lock = Any()
@@ -77,6 +81,7 @@ internal class TracedSnapshotCacheImpl : TracedSnapshotCache {
* still this flag exists to catch bugs in implementation or after refactorings.
*/
private var pullingCache = false
internal var shuffleEntities: Long = -1L
override fun pullCache(
newSnapshot: ImmutableEntityStorage,
@@ -96,6 +101,7 @@ internal class TracedSnapshotCacheImpl : TracedSnapshotCache {
}
this.queryIdToChain.putAll(from.queryIdToChain)
this.changeQueue.putAll(from.changeQueue.mapValues { ArrayList(it.value) })
this.shuffleEntities = from.shuffleEntities
val cachesToRemove = ArrayList<QueryId>()
this.queryIdToChain.keys.forEach { chainId ->
@@ -126,49 +132,85 @@ internal class TracedSnapshotCacheImpl : TracedSnapshotCache {
changeQueue.remove(queryId)
}
@OptIn(EntityStorageInstrumentationApi::class)
private fun updateCellIndex(chainId: QueryId,
changes: EntityStorageChange,
newSnapshot: ImmutableEntityStorageInstrumentation) {
newSnapshot: ImmutableEntityStorageInstrumentation,
prevStorage: ImmutableEntityStorageInstrumentation?): Boolean {
val cellIndex = queryIdToTraceIndex.getValue(chainId)
val newTraces = changes.createTraces(newSnapshot)
cellIndex.get(newTraces).forEach { updateRequest ->
val updatedCells = HashMap<CellId, MatchSet>()
var cellsUpdated = false
cellIndex.get(newTraces).maybeShuffled().firstDiffThenRecalculate().forEach { updateRequest ->
cellsUpdated = true
val cells = queryIdToChain[updateRequest.chainId] ?: error("Unindexed cell")
val (newChain, tracesAndModifiedCells) = cells.changeInput(newSnapshot, updateRequest, changes, updateRequest.cellId)
val (newChain, tracesAndModifiedCells) = cells.changeInput(newSnapshot, prevStorage, updateRequest, changes, updateRequest.cellId,
updatedCells)
tracesAndModifiedCells.forEach { (traces, updateRequest) ->
cellIndex.set(traces, updateRequest)
}
this.queryIdToChain[newChain.id] = newChain
}
return cellsUpdated
}
@OptIn(EntityStorageInstrumentationApi::class)
override fun <T> cached(query: StorageQuery<T>, snapshot: ImmutableEntityStorageInstrumentation): T {
private fun Collection<CellUpdateInfo>.firstDiffThenRecalculate(): List<CellUpdateInfo> {
val (diff, recalculate) = this.partition { it.updateType == UpdateType.DIFF }
return diff + recalculate
}
@Suppress("UNCHECKED_CAST")
override fun <T> cached(query: StorageQuery<T>,
snapshot: ImmutableEntityStorageInstrumentation,
prevStorage: ImmutableEntityStorageInstrumentation?): CachedValue<T> {
check(!pullingCache) {
"It's not allowed to request query when the cache is pulled from other snapshot"
}
val lastCell = getUpdatedLastCell(query, snapshot, prevStorage)
return CachedValue(lastCell.cacheProcessStatus, lastCell.value.data() as T)
}
@OptIn(EntityStorageInstrumentationApi::class)
override fun <T> diff(query: CollectionQuery<T>,
snapshot: ImmutableEntityStorageInstrumentation,
prevStorage: ImmutableEntityStorageInstrumentation?): CachedValue<Diff<T>> {
require(query !is CollectionQuery.TrackDiff<*>)
val queryWithDiffTracker = query.trackDiff()
val lastCell = getUpdatedLastCell(queryWithDiffTracker, snapshot, prevStorage)
check(lastCell.value is DiffCollectorCell<*>)
val diff = DiffImpl(lastCell.value.addedData as List<T>, lastCell.value.removedData as List<T>)
return CachedValue(lastCell.cacheProcessStatus, diff)
}
private fun <T> getUpdatedLastCell(query: StorageQuery<T>,
snapshot: ImmutableEntityStorageInstrumentation,
prevStorage: ImmutableEntityStorageInstrumentation?): CachedValue<Cell<*>> {
val queryId = query.queryId
val changes = changeQueue[queryId]
val cellChain = queryIdToChain[queryId]
if (cellChain != null && (changes == null || changes.size == 0)) {
return cellChain.data()
return CachedValue(CacheHit, cellChain.last())
}
synchronized(lock) {
val doubleCheckChanges = changeQueue[queryId]
val doubleCheckChain = queryIdToChain[queryId]
if (doubleCheckChain != null && (doubleCheckChanges == null || doubleCheckChanges.size == 0)) {
return doubleCheckChain.data()
return CachedValue(CacheHitInSynchronized, doubleCheckChain.last())
}
if (doubleCheckChanges != null && doubleCheckChanges.size > 0) {
val collapsedChangelog = doubleCheckChanges.collapse()
updateCellIndex(queryId, collapsedChangelog, snapshot)
val recalculated = updateCellIndex(queryId, collapsedChangelog, snapshot, prevStorage)
changeQueue.remove(queryId)
return queryIdToChain[queryId]!!.data()
val status = if (recalculated) IncrementalUpdate else CacheHitNotAffectedByChanges
return CachedValue(status, queryIdToChain[queryId]!!.last())
}
val emptyCellChain = query.compile()
@@ -180,7 +222,7 @@ internal class TracedSnapshotCacheImpl : TracedSnapshotCache {
}
}
queryIdToChain[newChain.id] = newChain
return newChain.data()
return CachedValue(Initialization, newChain.last())
}
}
@@ -190,4 +232,14 @@ internal class TracedSnapshotCacheImpl : TracedSnapshotCache {
internal fun getQueryIdToChain() = queryIdToChain
@TestOnly
internal fun getQueryIdToTraceIndex() = queryIdToTraceIndex
/**
* Shuffle collection if the field [shuffleEntities] is not -1 (set in tests)
*/
private fun <E> Collection<E>.maybeShuffled(): Collection<E> {
if (shuffleEntities != -1L && this.size > 1) {
return this.shuffled(Random(shuffleEntities))
}
return this
}
}

View File

@@ -26,10 +26,51 @@ internal sealed class Cell<T>(val id: CellId) {
throw NotImplementedError()
}
abstract fun input(prevData: MatchSet, newSnapshot: ImmutableEntityStorage): PropagationResult<T>
abstract fun input(prevData: MatchList, newSnapshot: ImmutableEntityStorage): PropagationResult<T>
abstract fun data(): T
}
public interface Diff<T> {
public val added: List<T>
public val removed: List<T>
}
public class DiffImpl<T>(override val added: List<T>, override val removed: List<T>) : Diff<T>
internal class DiffCollectorCell<T>(
id: CellId,
val addedData: List<T>,
val removedData: List<T>,
) : Cell<T>(id) {
override fun input(prevData: MatchList, newSnapshot: ImmutableEntityStorage): PropagationResult<T> {
error("Another input should be called")
}
/**
* [DiffCollectorCell] has a special processing. This cell can be used only in reactive read and
* we calculcate the removed data by previous snapshot.
* It's not possible to use the [prevSnapshot] in cache because there is no such snapshot at all. Previous calculatio of the snapshot can
* be done at any previous snapshot.
*/
fun input(prevData: MatchList, newSnapshot: ImmutableEntityStorage, prevSnapshot: ImmutableEntityStorage?): PropagationResult<T> {
val newAddedData = ArrayList<T>()
val newRemovedData = ArrayList<T>()
prevData.removedMatches().forEach {
if (prevSnapshot == null) error("Prev snapshot cannot be null for diff")
newRemovedData.add(it.getData(prevSnapshot) as T)
}
prevData.addedMatches().forEach {
newAddedData.add(it.getData(newSnapshot) as T)
}
val newCell = DiffCollectorCell(id, newAddedData, newRemovedData)
return PropagationResult(newCell, MatchList(), emptyList())
}
override fun data(): T {
error("Should not be accessed")
}
}
/**
* Cell related to [entities] query. It doesn't store any intermediate calculations.
*/
@@ -50,25 +91,25 @@ internal class EntityCell<T : WorkspaceEntity>(
.toList()
val traces = ReadTraceHashSet()
traces.add(ReadTrace.EntitiesOfType(type.java).hash)
val matchSet = MatchSet().also { set -> matches.forEach { set.addedMatch(it) } }
return PropagationResult(newCell, matchSet, listOf(traces to UpdateType.DIFF))
val matchList = MatchList().also { set -> matches.forEach { set.addedMatch(it) } }
return PropagationResult(newCell, matchList, listOf(traces to UpdateType.DIFF))
}
override fun input(prevData: MatchSet,
override fun input(prevData: MatchList,
newSnapshot: ImmutableEntityStorage): PropagationResult<List<T>> {
val matchSet = MatchSet()
val matchList = MatchList()
prevData.addedMatches()
.asSequence()
.filter { (it as MatchWithEntityId).entityId.clazz.findWorkspaceEntity().kotlin == type }
.forEach { matchSet.addedMatch(it) }
.forEach { matchList.addedMatch(it) }
prevData.removedMatches()
.asSequence()
.filter { (it as MatchWithEntityId).entityId.clazz.findWorkspaceEntity().kotlin == type }
.forEach { matchSet.removedMatch(it) }
.forEach { matchList.removedMatch(it) }
val traces = ReadTraceHashSet()
traces.add(ReadTrace.EntitiesOfType(type.java).hash)
return PropagationResult(EntityCell(this.id, this.type), matchSet,
return PropagationResult(EntityCell(this.id, this.type), matchList,
listOf(traces to UpdateType.DIFF))
}
@@ -81,35 +122,35 @@ internal class EntityCell<T : WorkspaceEntity>(
internal class FlatMapCell<T, K>(
id: CellId,
val mapping: (T, ImmutableEntityStorage) -> Iterable<K>,
private val memory: PersistentMap<Match, Iterable<K>>,
private val memory: PersistentMap<Match, Iterable<Match>>,
) : Cell<List<K>>(id) {
private var dataCache: List<K>? = null
override fun input(prevData: MatchSet,
override fun input(prevData: MatchList,
newSnapshot: ImmutableEntityStorage): PropagationResult<List<K>> {
val generatedMatches = MatchSet()
val generatedMatches = MatchList()
val traces = ArrayList<Pair<ReadTraceHashSet, UpdateType>>()
val newMemory = memory.mutate { mutableMemory ->
prevData.removedMatches().forEach { match ->
val removedValue = mutableMemory.remove(match)
removedValue?.forEach {
generatedMatches.removedMatch(it.toMatch(match))
val removedValue = mutableMemory.remove(match) ?: error("Nothing to remove")
removedValue.forEach {
generatedMatches.removedMatch(it)
}
}
val target = LongArrayList()
val tracker = ReadTracker.tracedSnapshot(newSnapshot, target)
val res = HashMap<Match, Iterable<K>>()
val res = HashMap<Match, Iterable<Match>>()
prevData.addedMatches().forEach { match ->
target.clear()
val mappingTarget = match.getData(tracker)
val mappedValues = mapping(mappingTarget as T, tracker)
val mappedValues = mapping(mappingTarget as T, tracker).map { it.toMatch(match) }
val newTraces = ReadTraceHashSet(target)
res[match] = mappedValues
mappedValues.forEach {
generatedMatches.addedMatch(it.toMatch(match))
generatedMatches.addedMatch(it)
}
val recalculate = UpdateType.RECALCULATE(match)
traces += newTraces to recalculate
@@ -126,7 +167,7 @@ internal class FlatMapCell<T, K>(
return existingData
}
val res = memory.values.flatten()
val res = memory.values.flatten().map { it.value() as K }
this.dataCache = res
return res
}
@@ -136,35 +177,34 @@ internal class FlatMapCell<T, K>(
internal class MapCell<T, K>(
id: CellId,
val mapping: (T, ImmutableEntityStorage) -> K,
private val memory: PersistentMap<Match, K>,
private val memory: PersistentMap<Match, Match>,
) : Cell<List<K>>(id) {
private var dataCache: List<K>? = null
override fun input(prevData: MatchSet,
override fun input(prevData: MatchList,
newSnapshot: ImmutableEntityStorage): PropagationResult<List<K>> {
val generatedMatches = MatchSet()
val generatedMatches = MatchList()
val traces = ArrayList<Pair<ReadTraceHashSet, UpdateType>>()
val newMemory = memory.mutate { mutableMemory ->
prevData.removedMatches().forEach { match ->
val removedValue = mutableMemory.remove(match)
removedValue?.let {
generatedMatches.removedMatch(it.toMatch(match))
}
val removedValue = mutableMemory.remove(match) ?: error("Nothing to remove")
removedValue.let { generatedMatches.removedMatch(it) }
}
val target = LongArrayList()
val tracker = ReadTracker.tracedSnapshot(newSnapshot, target)
val res = HashMap<Match, K>()
val res = HashMap<Match, Match>()
prevData.addedMatches().forEach { match ->
target.clear()
val mappingTarget = match.getData(tracker)
val mappedValues = mapping(mappingTarget as T, tracker)
val mappedValue = mapping(mappingTarget as T, tracker)
val mappedMatch = mappedValue.toMatch(match)
val newTraces = ReadTraceHashSet(target)
res[match] = mappedValues
res[match] = mappedMatch
mappedValues.let {
generatedMatches.addedMatch(it.toMatch(match))
if (mappedValue != null) {
generatedMatches.addedMatch(mappedMatch)
}
val recalculate = UpdateType.RECALCULATE(match)
traces += newTraces to recalculate
@@ -181,7 +221,7 @@ internal class MapCell<T, K>(
return existingData
}
val res = memory.values.toList()
val res = memory.values.map { it.value() as K }
this.dataCache = res
return res
}
@@ -192,33 +232,31 @@ internal class GroupByCell<T, K, V>(
id: CellId,
val keySelector: (T) -> K,
val valueTransform: (T) -> V,
private val myMemory: PersistentMap<Match, Pair<K, V>>,
private val myMemory: PersistentMap<Match, Match>,
) : Cell<Map<K, List<V>>>(id) {
private var mapCache: Map<K, List<V>>? = null
override fun input(prevData: MatchSet,
override fun input(prevData: MatchList,
newSnapshot: ImmutableEntityStorage): PropagationResult<Map<K, List<V>>> {
val generatedMatches = MatchSet()
val generatedMatches = MatchList()
val traces = ArrayList<Pair<ReadTraceHashSet, UpdateType>>()
val newMemory = myMemory.mutate { mutableMemory ->
prevData.removedMatches().forEach { match ->
val removedValue = mutableMemory.remove(match)
if (removedValue != null) {
generatedMatches.removedMatch(removedValue.toMatch(match))
}
val removedValue = mutableMemory.remove(match) ?: error("Nothing to remove")
generatedMatches.removedMatch(removedValue)
}
val target = LongArrayList()
val tracker = ReadTracker.tracedSnapshot(newSnapshot, target)
prevData.addedMatches().forEach { match ->
target.clear()
val origData = match.getData(tracker)
val keyToValue = keySelector(origData as T) to valueTransform(origData as T)
val keyToValue = (keySelector(origData as T) to valueTransform(origData as T)).toMatch(match)
val newTraces = ReadTraceHashSet(target)
mutableMemory[match] = keyToValue
generatedMatches.addedMatch(keyToValue.toMatch(match))
generatedMatches.addedMatch(keyToValue)
val recalculate = UpdateType.RECALCULATE(match)
traces += newTraces to recalculate
@@ -233,7 +271,8 @@ internal class GroupByCell<T, K, V>(
val myMapCache = mapCache
if (myMapCache != null) return myMapCache
val res = mutableMapOf<K, MutableList<V>>()
myMemory.values.forEach { (k, v) ->
myMemory.values.forEach { match ->
val (k, v) = match.value() as Pair<K, V>
res.getOrPut(k) { ArrayList() }.add(v)
}
mapCache = res

View File

@@ -20,12 +20,12 @@ internal class CellChain(
fun snapshotInput(snapshot: ImmutableEntityStorage): Pair<CellChain, List<Pair<ReadTraceHashSet, CellUpdateInfo>>> {
val traces = ArrayList<Pair<ReadTraceHashSet, CellUpdateInfo>>()
val newChain = cells.mutate {
var tokens = MatchSet()
var tokens = MatchList()
it.indices.forEach { index ->
val cell = it[index]
if (index == 0) {
val cellAndTokens = cell.snapshotInput(snapshot)
tokens = cellAndTokens.matchSet
tokens = cellAndTokens.matchList
it[index] = cellAndTokens.newCell
cellAndTokens.subscriptions.forEach { update ->
val trace = update.second
@@ -33,8 +33,12 @@ internal class CellChain(
}
}
else {
val cellAndTokens = cell.input(tokens, snapshot)
tokens = cellAndTokens.matchSet
val cellAndTokens = if (cell is DiffCollectorCell<*>) {
cell.input(tokens, snapshot, null)
}
else cell.input(tokens, snapshot)
tokens = cellAndTokens.matchList
it[index] = cellAndTokens.newCell
cellAndTokens.subscriptions.forEach { update ->
val trace = update.second
@@ -47,20 +51,27 @@ internal class CellChain(
}
fun changeInput(newSnapshot: ImmutableEntityStorage,
prevStorage: ImmutableEntityStorage?,
changeRequest: CellUpdateInfo,
changes: EntityStorageChange,
cellToActivate: CellId): Pair<CellChain, List<Pair<ReadTraceHashSet, CellUpdateInfo>>> {
cellToActivate: CellId,
updatedCells: HashMap<CellId, MatchSet>): Pair<CellChain, List<Pair<ReadTraceHashSet, CellUpdateInfo>>> {
val traces = ArrayList<Pair<ReadTraceHashSet, CellUpdateInfo>>()
var myTokens = when (changeRequest.updateType) {
is UpdateType.DIFF -> changes.makeTokensForDiff()
is UpdateType.RECALCULATE -> {
val tokens = MatchSet()
val tokens = MatchList()
val match = changeRequest.updateType.match
tokens.removedMatch(match)
if (match.isValid(newSnapshot)) {
tokens.addedMatch(match)
if (updatedCells[cellToActivate]?.contains(match) != true) {
if (prevStorage == null || match.isValid(prevStorage)) {
tokens.removedMatch(match)
}
if (match.isValid(newSnapshot)) {
tokens.addedMatch(match)
}
}
tokens
}
}
@@ -68,8 +79,20 @@ internal class CellChain(
val startingIndex = cellList.withIndex().first { it.value.id == cellToActivate }.index
(startingIndex..cellList.lastIndex).forEach { index ->
val cell = cellList[index]
val cellAndTokens = cell.input(myTokens, newSnapshot)
myTokens = cellAndTokens.matchSet
val updatedMatches = updatedCells[cell.id]
if (updatedMatches != null) {
myTokens.removeMatches(updatedMatches)
}
if (myTokens.isEmpty()) return@mutate
updatedCells.getOrPut(cell.id) { MatchSet() }.also { it.addFromList(myTokens) }
val cellAndTokens = if (cell is DiffCollectorCell<*>) {
cell.input(myTokens, newSnapshot, prevStorage)
}
else {
cell.input(myTokens, newSnapshot)
}
myTokens = cellAndTokens.matchList
cellList[index] = cellAndTokens.newCell
cellAndTokens.subscriptions.forEach { update ->
val trace = update.second
@@ -80,9 +103,8 @@ internal class CellChain(
return newChain to traces
}
@Suppress("UNCHECKED_CAST")
fun <T> data(): T {
return cells.last().data() as T
fun last(): Cell<*> {
return cells.last()
}
private fun PersistentList<Cell<*>>.toChain(id: QueryId) = CellChain(this, id)

View File

@@ -0,0 +1,61 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.workspace.storage.impl.query
/**
* Collection of matches grouped by operation type
*/
internal class MatchList {
private val addedMatches: MutableList<Match> = mutableListOf()
private val removedMatches: MutableList<Match> = mutableListOf()
fun addedMatch(match: Match) {
addedMatches += match
}
fun removedMatch(match: Match) {
removedMatches += match
}
fun addedMatches(): List<Match> {
return addedMatches
}
fun removedMatches(): List<Match> {
return removedMatches
}
fun isEmpty(): Boolean {
return addedMatches.isEmpty() && removedMatches.isEmpty()
}
fun removeMatches(matchSet: MatchSet) {
val addedIterator = addedMatches.iterator()
while (addedIterator.hasNext()) {
val next = addedIterator.next()
if (matchSet.containsAdded(next)) addedIterator.remove()
}
val removedIterator = removedMatches.iterator()
while (removedIterator.hasNext()) {
val next = removedIterator.next()
if (matchSet.containsRemoved(next)) removedIterator.remove()
}
}
}
internal class MatchSet {
private val addedMatches: MutableSet<Match> = mutableSetOf()
private val removedMatches: MutableSet<Match> = mutableSetOf()
fun contains(match: Match): Boolean = addedMatches.contains(match) || removedMatches.contains(match)
fun addFromList(matchList: MatchList) {
matchList.addedMatches().forEach { this.addedMatches.add(it) }
matchList.removedMatches().forEach { this.removedMatches.add(it) }
}
fun containsAdded(match: Match): Boolean = addedMatches.contains(match)
fun containsRemoved(match: Match): Boolean = removedMatches.contains(match)
}

View File

@@ -1,27 +0,0 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.workspace.storage.impl.query
/**
* Collection of matches grouped by operation type
*/
internal class MatchSet() {
private val addedMatches: MutableList<Match> = mutableListOf()
private val removedMatches: MutableList<Match> = mutableListOf()
fun addedMatch(match: Match) {
addedMatches += match
}
fun removedMatch(match: Match) {
removedMatches += match
}
fun addedMatches(): List<Match> {
return addedMatches
}
fun removedMatches(): List<Match> {
return removedMatches
}
}

View File

@@ -86,6 +86,13 @@ internal fun Match.getData(snapshot: ImmutableEntityStorage): Any? {
}
}
internal fun Match.value(): Any? {
return when (this) {
is MatchWithEntityId -> error("Cannot get a value from $this")
is MatchWithData -> this.data
}
}
internal fun Any?.toMatch(basedOn: Match?): Match {
return if (this is WorkspaceEntityBase) {
MatchWithEntityId(this.id, basedOn)

View File

@@ -59,6 +59,10 @@ public fun <T, K> CollectionQuery<T>.flatMap(mapping: (T, ImmutableEntityStorage
return CollectionQuery.FlatMapTo(this.queryId, this, mapping)
}
internal fun <T> CollectionQuery<T>.trackDiff(): CollectionQuery<T>{
return CollectionQuery.TrackDiff(this.queryId, this)
}
public fun <T, K, V> CollectionQuery<T>.groupBy(
keySelector: (T) -> K,
valueTransformer: (T) -> V,

View File

@@ -31,6 +31,7 @@ public sealed interface CollectionQuery<T> : StorageQuery<Collection<T>> {
public val from: CollectionQuery<T>,
public val map: (T, ImmutableEntityStorage) -> Iterable<K>,
) : CollectionQuery<K>
public class TrackDiff<T>(override val queryId: QueryId, public val from: CollectionQuery<T>) : CollectionQuery<T>
public class MapTo<T, K> internal constructor(
override val queryId: QueryId,
@@ -71,6 +72,10 @@ internal fun <T> StorageQuery<T>.compile(cellCollector: MutableList<Cell<*>> = m
cellCollector.prepend(MapCell(CellId(), map, persistentHashMapOf()))
this.from.compile(cellCollector)
}
is CollectionQuery.TrackDiff<*> -> {
cellCollector.prepend(DiffCollectorCell<T>(CellId(), emptyList(), emptyList()))
this.from.compile(cellCollector)
}
}
}
is AssociationQuery<*, *> -> {

View File

@@ -262,3 +262,66 @@ internal fun ChangeLog.toTraces(newSnapshot: ImmutableEntityStorageInstrumentati
}
return patternSet
}
@OptIn(EntityStorageInstrumentationApi::class)
internal fun Sequence<EntityChange<*>>.toTraces(newSnapshot: ImmutableEntityStorageInstrumentation): ReadTraceHashSet {
val patternSet = ReadTraceHashSet()
this.forEach { change ->
when (change) {
is EntityChange.Added<*> -> {
val ofClass = change.newEntity.getEntityInterface()
val entityData = change.newEntity.asBase().getData()
patternSet.add(ReadTrace.EntitiesOfType(ofClass).hash)
if (entityData is SoftLinkable) {
entityData.getLinks().forEach { link ->
patternSet.add(ReadTrace.HasSymbolicLinkTo(link, ofClass).hash)
}
}
val entity = entityData.createEntity(newSnapshot)
if (entity is WorkspaceEntityWithSymbolicId) {
patternSet.add(ReadTrace.Resolve(entity.symbolicId).hash)
}
}
is EntityChange.Removed<*> -> {
val ofClass = change.entity.getEntityInterface()
val entityData = change.entity.asBase().getData()
patternSet.add(ReadTrace.EntitiesOfType(ofClass).hash)
if (entityData is SoftLinkable) {
entityData.getLinks().forEach { link ->
patternSet.add(ReadTrace.HasSymbolicLinkTo(link, ofClass).hash)
}
}
val entity = entityData.createEntity(newSnapshot)
if (entity is WorkspaceEntityWithSymbolicId) {
patternSet.add(ReadTrace.Resolve(entity.symbolicId).hash)
}
}
is EntityChange.Replaced<*> -> {
val ofClass = change.newEntity.getEntityInterface()
val entityData = change.newEntity.asBase().getData()
patternSet.add(ReadTrace.SomeFieldAccess(change.newEntity.asBase().id).hash)
// Becase maybe we update the field with links
if (entityData is SoftLinkable) {
entityData.getLinks().forEach { link ->
patternSet.add(ReadTrace.HasSymbolicLinkTo(link, ofClass).hash)
}
}
// Because maybe we update the field that calculates symbolic id
val entity = entityData.createEntity(newSnapshot)
if (entity is WorkspaceEntityWithSymbolicId) {
patternSet.add(ReadTrace.Resolve(entity.symbolicId).hash)
}
}
}
}
return patternSet
}