asyncFillInvertedNameIndex - use Dispatchers.IO (already pre-allocated and ready to use, don't overuse CPU/IO)

GitOrigin-RevId: 0d9099203f7a4400fe2621ee35dc50dfefcb909e
This commit is contained in:
Vladimir Krivosheev
2024-02-13 08:39:30 +01:00
committed by intellij-monorepo-bot
parent 0bdd610d3e
commit 04c7d725ec
5 changed files with 38 additions and 50 deletions

View File

@@ -1,4 +1,6 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("ReplacePutWithAssignment")
package com.intellij.util.indexing.diagnostic
import com.fasterxml.jackson.annotation.JsonIgnore
@@ -168,7 +170,7 @@ object StorageDiagnosticData {
private fun indexStorageStatistics(mapStats: MutableMap<Path, PersistentHashMapStatistics>,
enumeratorStats: MutableMap<Path, PersistentEnumeratorStatistics>): IndexStorageStats {
val perIndexStats = sortedMapOf<String, StatsPerStorage>()
val perIndexStats = TreeMap<String, StatsPerStorage>()
for (id in ID.getRegisteredIds()) {
val indexStats = listOf(IndexInfrastructure.getIndexRootDir(id), IndexInfrastructure.getPersistentIndexRootDir(id))
.map {

View File

@@ -5,7 +5,6 @@ import com.intellij.ide.actions.cache.RecoverVfsFromLogService;
import com.intellij.openapi.application.ApplicationManager;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.progress.ProcessCanceledException;
import com.intellij.openapi.util.NotNullLazyValue;
import com.intellij.openapi.util.io.ByteArraySequence;
import com.intellij.openapi.util.io.FileAttributes;
import com.intellij.openapi.util.io.FileSystemUtil;
@@ -41,18 +40,13 @@ import com.intellij.util.io.blobstorage.ByteBufferReader;
import com.intellij.util.io.blobstorage.ByteBufferWriter;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import org.jetbrains.annotations.*;
import java.io.*;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -172,7 +166,6 @@ public final class FSRecordsImpl implements Closeable {
ExceptionUtil.rethrow(error);
};
public static ErrorHandler getDefaultErrorHandler() {
if (VfsLog.isVfsTrackingEnabled()) {
return ON_ERROR_MARK_CORRUPTED_AND_SCHEDULE_REBUILD_AND_SUGGEST_CACHE_RECOVERY_IF_ALLOWED;
@@ -182,7 +175,6 @@ public final class FSRecordsImpl implements Closeable {
}
}
private final @NotNull PersistentFSConnection connection;
private final @NotNull PersistentFSContentAccessor contentAccessor;
private final @NotNull PersistentFSAttributeAccessor attributeAccessor;
@@ -228,7 +220,7 @@ public final class FSRecordsImpl implements Closeable {
private volatile Exception closedStackTrace = null;
//@GuardedBy("this")
private final ObjectOpenHashSet<AutoCloseable> closeables = new ObjectOpenHashSet<>();
private final Set<AutoCloseable> closeables = new HashSet<>();
private final CopyOnWriteArraySet<FileIdIndexedStorage> fileIdIndexedStorages = new CopyOnWriteArraySet<>();
private static int nextMask(int value,
@@ -265,7 +257,6 @@ public final class FSRecordsImpl implements Closeable {
//@formatter:on
}
/**
* Factory
*
@@ -298,9 +289,7 @@ public final class FSRecordsImpl implements Closeable {
PersistentFSConnection connection = initializationResult.connection;
Supplier<InvertedNameIndex> invertedNameIndexLazy = asyncFillInvertedNameIndex(
AppExecutorUtil.getAppExecutorService(), connection.getRecords()
);
Supplier<InvertedNameIndex> invertedNameIndexLazy = asyncFillInvertedNameIndex(connection.getRecords());
LOG.info("VFS initialized: " + NANOSECONDS.toMillis(initializationResult.totalInitializationDurationNs) + " ms, " +
initializationResult.attemptsFailures.size() + " failed attempts, " +
@@ -313,7 +302,6 @@ public final class FSRecordsImpl implements Closeable {
new PersistentFSTreeRawAccessor(attributeAccessor, recordAccessor, connection) :
new PersistentFSTreeAccessor(attributeAccessor, recordAccessor, connection);
try {
treeAccessor.ensureLoaded();
@@ -360,7 +348,6 @@ public final class FSRecordsImpl implements Closeable {
}
}
private FSRecordsImpl(@NotNull PersistentFSConnection connection,
@NotNull PersistentFSContentAccessor contentAccessor,
@NotNull PersistentFSAttributeAccessor attributeAccessor,
@@ -1478,33 +1465,32 @@ public final class FSRecordsImpl implements Closeable {
}
@VisibleForTesting
public static @NotNull Supplier<@NotNull InvertedNameIndex> asyncFillInvertedNameIndex(@NotNull ExecutorService pool,
@NotNull PersistentFSRecordsStorage recordsStorage) {
Future<InvertedNameIndex> fillUpInvertedNameIndexTask = pool.submit(() -> {
public static @NotNull Supplier<@NotNull InvertedNameIndex> asyncFillInvertedNameIndex(@NotNull PersistentFSRecordsStorage recordsStorage) {
CompletableFuture<InvertedNameIndex> fillUpInvertedNameIndexTask = PersistentFsConnectorHelper.INSTANCE.executor().async(() -> {
InvertedNameIndex invertedNameIndex = new InvertedNameIndex();
// fill up nameId->fileId index:
int maxAllocatedID = recordsStorage.maxAllocatedID();
for (int fileId = FSRecords.ROOT_FILE_ID; fileId <= maxAllocatedID; fileId++) {
int flags = recordsStorage.getFlags(fileId);
int nameId = recordsStorage.getNameId(fileId);
if (!hasDeletedFlag(flags) && nameId != NULL_NAME_ID) {
invertedNameIndex.updateDataInner(fileId, nameId);
for (int fileId = FSRecords.ROOT_FILE_ID; fileId <= maxAllocatedID; fileId++) {
int flags = recordsStorage.getFlags(fileId);
int nameId = recordsStorage.getNameId(fileId);
if (!hasDeletedFlag(flags) && nameId != NULL_NAME_ID) {
invertedNameIndex.updateDataInner(fileId, nameId);
}
}
}
LOG.info("VFS scanned: file-by-name index was populated");
return invertedNameIndex;
});
// We don't need volatile/atomicLazy, since computation is idempotent: same instance returned always.
// So _there could be_ a data race, but it is a benign race.
return NotNullLazyValue.lazy(() -> {
return () -> {
try {
return fillUpInvertedNameIndexTask.get();
return fillUpInvertedNameIndexTask.join();
}
catch (Throwable e) {
throw new IllegalStateException("Lazy invertedNameIndex computation is failed", e);
}
});
};
}

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.vfs.newvfs.persistent
import com.intellij.openapi.application.ApplicationManager
@@ -7,31 +7,29 @@ import com.intellij.openapi.components.service
import com.intellij.openapi.progress.blockingContext
import com.intellij.platform.util.coroutines.childScope
import com.intellij.util.concurrency.AppExecutorUtil
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.*
import kotlinx.coroutines.future.asCompletableFuture
import org.jetbrains.annotations.ApiStatus.Internal
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
/** Abstracts out specific way to run async tasks during VFS initialization */
interface VFSAsyncTaskExecutor {
// Abstracts out specific way to run async tasks during VFS initialization
internal sealed interface VFSAsyncTaskExecutor {
fun <T> async(task: Callable<T>): CompletableFuture<T>
}
@Service
internal class ExecuteOnCoroutine(coroutineScope: CoroutineScope) : VFSAsyncTaskExecutor {
private class ExecuteOnCoroutine(coroutineScope: CoroutineScope) : VFSAsyncTaskExecutor {
/**
* We need tasks to be independent -- i.e. cancellation/fail of one task must not affect others.
* We need tasks to be independent -- i.e., cancellation/fail of one task must not affect others.
* This is important because:
* a) other implementations behave that way
* b) resource management become quite complex with implicit task cancellation
*/
private val supervisorScope = coroutineScope.childScope(
context = CoroutineName("PersistentFsLoader") + Dispatchers.IO,
supervisor = true //default, but to be explicit -- this is the main reason to create .childScope
supervisor = true // default, but to be explicit -- this is the main reason to create .childScope
)
override fun <T> async(task: Callable<T>): CompletableFuture<T> {
@@ -43,6 +41,7 @@ internal class ExecuteOnCoroutine(coroutineScope: CoroutineScope) : VFSAsyncTask
}
}
@Internal
class ExecuteOnThreadPool(private val pool: ExecutorService) : VFSAsyncTaskExecutor {
override fun <T> async(task: Callable<T>): CompletableFuture<T> {
//MAYBE RC: use Dispatchers.IO-kind pool, with many threads (appExecutor has 1 core thread, so needs time to inflate)
@@ -53,7 +52,7 @@ class ExecuteOnThreadPool(private val pool: ExecutorService) : VFSAsyncTaskExecu
}
}
internal object ExecuteOnCallingThread : VFSAsyncTaskExecutor {
private data object ExecuteOnCallingThread : VFSAsyncTaskExecutor {
override fun <T> async(task: Callable<T>): CompletableFuture<T> {
try {
return CompletableFuture.completedFuture(task.call())
@@ -68,22 +67,22 @@ internal object PersistentFsConnectorHelper {
private val PARALLELIZE_VFS_INITIALIZATION = System.getProperty("vfs.parallelize-initialization", "true").toBoolean()
private val USE_COROUTINES_DISPATCHER = System.getProperty("vfs.use-coroutines-dispatcher", "true").toBoolean()
@OptIn(DelicateCoroutinesApi::class)
fun executor(): VFSAsyncTaskExecutor {
if (!PARALLELIZE_VFS_INITIALIZATION) {
return ExecuteOnCallingThread
}
else if (USE_COROUTINES_DISPATCHER) {
val app = ApplicationManager.getApplication()
if (app != null) {
return app.service<ExecuteOnCoroutine>()
if (app == null) {
return ExecuteOnCoroutine(coroutineScope = GlobalScope)
}
else {
return ExecuteOnCoroutine(coroutineScope = CoroutineScope(Dispatchers.IO))
return app.service<ExecuteOnCoroutine>()
}
}
else {
val pool = AppExecutorUtil.getAppExecutorService()
return ExecuteOnThreadPool(pool)
return ExecuteOnThreadPool(AppExecutorUtil.getAppExecutorService())
}
}
}

View File

@@ -105,7 +105,7 @@ public class VFSInitializationBenchmark {
context.connectionToClose = connection;
Supplier<InvertedNameIndex> invertedNameIndexLazy = FSRecordsImpl.asyncFillInvertedNameIndex(
AppExecutorUtil.getAppExecutorService(), connection.getRecords()
connection.getRecords()
);
int maxAllocatedID = connection.getRecords().maxAllocatedID();

View File

@@ -1,6 +1,7 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:JvmName("AppJavaExecutorUtil")
@file:OptIn(ExperimentalCoroutinesApi::class)
package com.intellij.util.concurrency
import com.intellij.openapi.application.ApplicationManager
@@ -32,7 +33,6 @@ fun createSingleTaskApplicationPoolExecutor(name: String, coroutineScope: Corout
@Internal
@OptIn(ExperimentalCoroutinesApi::class)
class CoroutineDispatcherBackedExecutor(coroutineScope: CoroutineScope, name: String) {
private val childScope = coroutineScope.namedChildScope(name, Dispatchers.IO.limitedParallelism(parallelism = 1))
fun isEmpty(): Boolean = childScope.coroutineContext.job.children.none()
@@ -43,7 +43,8 @@ class CoroutineDispatcherBackedExecutor(coroutineScope: CoroutineScope, name: St
try {
it.run()
}
catch (_: ProcessCanceledException) { }
catch (_: ProcessCanceledException) {
}
}
}