[core] IJPL-51: AppendableObjectStorage refactoring & tests

+ unit-tests for AppendableObjectStorage
+ unit-tests for LimitedInputStream (+fixed small bug)

GitOrigin-RevId: 45b0b90d88a1ede5d12380dab6a5b63aa79894ea
This commit is contained in:
Ruslan Cheremin
2023-05-19 16:06:46 +02:00
committed by intellij-monorepo-bot
parent 98f4e3427c
commit 48858962f2
14 changed files with 860 additions and 103 deletions

View File

@@ -0,0 +1,54 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.io;
import com.intellij.util.io.pagecache.impl.PageContentLockingStrategy.SharedLockLockingStrategy;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class InputStreamOverPagedStorageTest extends InputStreamOverPagedStorageTestBase {
private static final StorageLockContext CONTEXT = new StorageLockContext(true, true, false);
private PagedFileStorageWithRWLockedPageContent storage;
private ReentrantReadWriteLock storageLock;
@Before
public void setUp() throws Exception {
storageLock = new ReentrantReadWriteLock();
storage = new PagedFileStorageWithRWLockedPageContent(
temporaryFolder.newFile().toPath(),
CONTEXT,
PAGE_SIZE,
false,
new SharedLockLockingStrategy(storageLock)
);
storageLock.writeLock().lock();
}
@After
public void tearDown() throws Exception {
if (storage != null) {
storage.close();
storageLock.writeLock().unlock();
}
}
@Override
protected @NotNull InputStreamOverPagedStorage streamOverStorage(long position,
long limit) {
return new InputStreamOverPagedStorage(storage, position, limit);
}
@Override
protected byte[] writeRandomBytesToStorage(int bytesCount) throws IOException {
byte[] bytesToWrite = randomBytes(bytesCount);
storage.put(0, bytesToWrite, 0, bytesToWrite.length);
return bytesToWrite;
}
}

View File

@@ -0,0 +1,95 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.io;
import org.jetbrains.annotations.NotNull;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.*;
public abstract class InputStreamOverPagedStorageTestBase {
protected static final int PAGE_SIZE = 1024;
protected static final int BYTES_SIZE_TO_TEST = 16 << 20;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void bytesWrittenIntoStorage_ReadBackAsIs_OneByOne() throws Exception {
byte[] bytesWritten = writeRandomBytesToStorage(BYTES_SIZE_TO_TEST);
try (InputStream stream = streamOverStorage(0, bytesWritten.length)) {
for (int i = 0; i < bytesWritten.length; i++) {
final byte writtenByte = bytesWritten[i];
final int readByte = stream.read();
assertNotEquals(
"Must not return EoF marker",
-1,
readByte
);
assertEquals(
"written[" + i + "] != read[" + i + "]",
writtenByte,
(byte)readByte
);
}
}
}
@Test
public void bytesWrittenIntoStorage_ReadBackAsIs_AsArray() throws Exception {
byte[] bytesWritten = writeRandomBytesToStorage(BYTES_SIZE_TO_TEST);
try (InputStream stream = streamOverStorage(0, bytesWritten.length)) {
byte[] bytesReadBack = new byte[bytesWritten.length];
int bytesActuallyRead = stream.read(bytesReadBack);
assertEquals(
"Bytes count actually read must be same as were written",
bytesWritten.length,
bytesActuallyRead
);
assertArrayEquals(
"Bytes actually read must be same as were written",
bytesWritten,
bytesReadBack
);
}
}
@Test
public void failToCreateStreamLargerThanMaxInteger() {
assertThrows(
IllegalArgumentException.class,
() -> streamOverStorage(0, (long)Integer.MAX_VALUE + 1)
);
}
@Test
public void failToCreateStreamLargerThanStorage() throws IOException {
byte[] bytesWritten = writeRandomBytesToStorage(BYTES_SIZE_TO_TEST);
assertThrows(
IllegalArgumentException.class,
() -> streamOverStorage(0, bytesWritten.length + 1)
);
}
//================== infrastructure: ====================================================
protected abstract @NotNull InputStream streamOverStorage(long position, long limit);
protected abstract byte[] writeRandomBytesToStorage(int bytesCount) throws IOException;
protected static byte[] randomBytes(int size) {
byte[] byteWritten = new byte[size];
ThreadLocalRandom.current().nextBytes(byteWritten);
return byteWritten;
}
}

View File

@@ -0,0 +1,53 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.io;
import com.intellij.util.io.keyStorage.MappedFileInputStream;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
public class MappedFileInputStreamTest extends InputStreamOverPagedStorageTestBase {
private static final StorageLockContext CONTEXT = new StorageLockContext(true, true, false);
private ResizeableMappedFile storage;
@Before
public void setUp() throws Exception {
storage = new ResizeableMappedFile(
temporaryFolder.newFile().toPath(),
PAGE_SIZE,
CONTEXT,
PAGE_SIZE,
false,
false
);
storage.lockWrite();
}
@After
public void tearDown() throws Exception {
if (storage != null) {
storage.close();
storage.unlockWrite();
}
}
@Override
protected @NotNull InputStream streamOverStorage(long position, long limit) {
return new MappedFileInputStream(storage, position, limit, false);
}
@Override
protected byte[] writeRandomBytesToStorage(int bytesCount) throws IOException {
byte[] bytesToWrite = randomBytes(bytesCount);
storage.put(0, bytesToWrite, 0, bytesToWrite.length);
return bytesToWrite;
}
}

View File

@@ -0,0 +1,38 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.io.keyStorage;
import com.intellij.openapi.vfs.newvfs.persistent.dev.blobstorage.BlobStorageTestBase;
import com.intellij.util.io.EnumeratorStringDescriptor;
import com.intellij.util.io.StorageLockContext;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class AppendableObjectStorageBackedPagedStorageTest extends AppendableObjectStorageTestBase<String> {
public static final int PAGE_SIZE = 1024;
private final StorageLockContext context = new StorageLockContext(true, true, false);
@Override
protected @NotNull AppendableObjectStorage<String> createStorage(Path path) throws IOException {
return new AppendableStorageBackedByPagedStorageLockFree<>(
path,
context,
PAGE_SIZE,
/* valuesArePageAligned: */ false,
EnumeratorStringDescriptor.INSTANCE,
new ReentrantReadWriteLock()
);
}
@Override
protected @NotNull String generateValue() {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
return BlobStorageTestBase.randomString(rnd, rnd.nextInt(128));
}
}

View File

@@ -19,7 +19,7 @@ import static org.hamcrest.Matchers.*;
public abstract class AppendableObjectStorageTestBase<V> {
public static final int ENOUGH_VALUES = 32 << 10;
public static final int ENOUGH_VALUES = 128 << 10;
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -35,8 +35,8 @@ public abstract class AppendableObjectStorageTestBase<V> {
appendableStorage.lockWrite();
try {
final int offset = appendableStorage.append(valueAppended);
final V valueReadBack = appendableStorage.read(offset, false);
final int valueId = appendableStorage.append(valueAppended);
final V valueReadBack = appendableStorage.read(valueId, false);
assertThat(
"Value appended must be read back as-is",
valueReadBack,
@@ -53,10 +53,10 @@ public abstract class AppendableObjectStorageTestBase<V> {
final V valueWritten = generateValue();
appendableStorage.lockWrite();
try {
final int offset = appendableStorage.append(valueWritten);
final int valueId = appendableStorage.append(valueWritten);
assertThat(
"Value just appended must have same bytes",
appendableStorage.checkBytesAreTheSame(offset, valueWritten),
appendableStorage.checkBytesAreTheSame(valueId, valueWritten),
is(true)
);
}
@@ -72,21 +72,21 @@ public abstract class AppendableObjectStorageTestBase<V> {
appendableStorage.lockWrite();
try {
int offsetAppended = appendableStorage.append(valueAppended);
int valueIdAppended = appendableStorage.append(valueAppended);
//RC: must flush data before .processAll()!
appendableStorage.force();
List<Pair<Integer, V>> valuesAndOffsets = new ArrayList<>();
appendableStorage.processAll((valueOffset, value) -> {
valuesAndOffsets.add(Pair.pair(valueOffset, value));
List<Pair<Integer, V>> valuesAndValueIds = new ArrayList<>();
appendableStorage.processAll((valueId, value) -> {
valuesAndValueIds.add(Pair.pair(valueId, value));
return true;
});
assertThat(
"Value appended must be read back as-is by .processAll()",
valuesAndOffsets,
contains(Pair.pair(offsetAppended, valueAppended))
valuesAndValueIds,
contains(Pair.pair(valueIdAppended, valueAppended))
);
}
finally {
@@ -100,17 +100,17 @@ public abstract class AppendableObjectStorageTestBase<V> {
public void manyValuesAppendedToStorage_AllReadBackAsIs() throws Exception {
List<V> valuesAppended = generateValues(ENOUGH_VALUES);
List<Pair<Integer, V>> valuesAndOffsets = new ArrayList<>();
List<Pair<Integer, V>> valuesAndvalueIds = new ArrayList<>();
appendableStorage.lockWrite();
try {
for (V valueToAppend : valuesAppended) {
int appendedAtOffset = appendableStorage.append(valueToAppend);
valuesAndOffsets.add(Pair.pair(appendedAtOffset, valueToAppend));
int valueId = appendableStorage.append(valueToAppend);
valuesAndvalueIds.add(Pair.pair(valueId, valueToAppend));
}
for (Pair<Integer, V> valuesAndOffset : valuesAndOffsets) {
final V valueAppended = valuesAndOffset.second;
final Integer appendedAtOffset = valuesAndOffset.first;
V valueReadBack = appendableStorage.read(appendedAtOffset, false);
for (Pair<Integer, V> valuesAndvalueId : valuesAndvalueIds) {
final V valueAppended = valuesAndvalueId.second;
final int valueId = valuesAndvalueId.first;
V valueReadBack = appendableStorage.read(valueId, false);
assertThat(
"Value appended must be read back as-is",
valueReadBack,
@@ -127,19 +127,19 @@ public abstract class AppendableObjectStorageTestBase<V> {
public void manyValuesAppendedToStorage_MustBeAllEqualToThemself() throws Exception {
List<V> valuesAppended = generateValues(ENOUGH_VALUES);
List<Pair<Integer, V>> valuesAndOffsets = new ArrayList<>();
List<Pair<Integer, V>> valuesAndValueIds = new ArrayList<>();
appendableStorage.lockWrite();
try {
for (V valueToAppend : valuesAppended) {
int appendedAtOffset = appendableStorage.append(valueToAppend);
valuesAndOffsets.add(Pair.pair(appendedAtOffset, valueToAppend));
int valueId = appendableStorage.append(valueToAppend);
valuesAndValueIds.add(Pair.pair(valueId, valueToAppend));
}
for (Pair<Integer, V> valuesAndOffset : valuesAndOffsets) {
final V valueAppended = valuesAndOffset.second;
final Integer appendedAtOffset = valuesAndOffset.first;
for (Pair<Integer, V> valuesAndvalueId : valuesAndValueIds) {
final V valueAppended = valuesAndvalueId.second;
final int valueId = valuesAndvalueId.first;
assertThat(
"Value appended ([" + valueAppended + "] at offset " + appendableStorage + ") must have same bytes",
appendableStorage.checkBytesAreTheSame(appendedAtOffset, valueAppended),
"Value appended ([" + valueAppended + "] at valueId " + appendableStorage + ") must have same bytes",
appendableStorage.checkBytesAreTheSame(valueId, valueAppended),
is(true)
);
}
@@ -153,27 +153,27 @@ public abstract class AppendableObjectStorageTestBase<V> {
public void manyValuesAppendedToStorage_AllReadBackAsIs_WithProcessAll() throws Exception {
List<V> valuesAppended = generateValues(ENOUGH_VALUES);
List<Pair<Integer, V>> valuesAndOffsetsAppended = new ArrayList<>();
List<Pair<Integer, V>> valuesAndValueIdsAppended = new ArrayList<>();
appendableStorage.lockWrite();
try {
for (V valueToAppend : valuesAppended) {
int appendedAtOffset = appendableStorage.append(valueToAppend);
valuesAndOffsetsAppended.add(Pair.pair(appendedAtOffset, valueToAppend));
int valueId = appendableStorage.append(valueToAppend);
valuesAndValueIdsAppended.add(Pair.pair(valueId, valueToAppend));
}
//RC: must flush data before .processAll()!
appendableStorage.force();
List<Pair<Integer, V>> valuesAndOffsetsReadBack = new ArrayList<>();
appendableStorage.processAll((valueOffset, value) -> {
valuesAndOffsetsReadBack.add(Pair.pair(valueOffset, value));
List<Pair<Integer, V>> valuesAndValueIdsReadBack = new ArrayList<>();
appendableStorage.processAll((valueId, value) -> {
valuesAndValueIdsReadBack.add(Pair.pair(valueId, value));
return true;
});
assertThat(
"Values appended must be all read as-is (with apt offsets) by .processAll()",
valuesAndOffsetsReadBack,
containsInAnyOrder(valuesAndOffsetsAppended.toArray())
"Values appended must be all read as-is (with apt valueIds) by .processAll()",
valuesAndValueIdsReadBack,
containsInAnyOrder(valuesAndValueIdsAppended.toArray())
);
}
finally {

View File

@@ -59,6 +59,10 @@ public class LimitedInputStream extends FilterInputStream {
@Override
public int available() throws IOException {
//RC: super.available() may be expensive (=syscall for FileIS), hence in many cases we
// overwrite this method to just return remainingLimit() -- if we're sure remainingLimit()
// is precise estimate of remaining file size.
// (In fact, we do that so often that maybe we should add ctor param for that purpose?)
return Math.min(super.available(), remainingLimit());
}

View File

@@ -0,0 +1,76 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.io;
import com.intellij.util.io.pagecache.PagedStorage;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.InputStream;
/** {@link InputStream} over {@link PagedStorage}: reads bytes through {@link PagedStorage#get(long, byte[], int, int)} */
@ApiStatus.Internal
public final class InputStreamOverPagedStorage extends InputStream {
private final @NotNull PagedStorage pagedStorage;
private final long limit;
private long position;
public InputStreamOverPagedStorage(@NotNull PagedStorage pagedStorage,
long position,
long limit) {
long totalBytes = limit - position;
if (totalBytes > Integer.MAX_VALUE) {
throw new IllegalArgumentException("limit(=" + limit + ")-position(=" + position + ") = " + totalBytes + " > MAX_INT");
}
long storageLength = pagedStorage.length();
if (limit > storageLength) {
throw new IllegalArgumentException("limit(=" + limit + ") > storage.length(=" + storageLength + ")");
}
this.pagedStorage = pagedStorage;
this.position = position;
this.limit = limit;
}
@Override
public void close() {
//do nothing because we want to leave the paged storage open.
}
@Override
public int available() {
return (int)(limit - position);
}
@Override
public int read() throws IOException {
if (position < limit) {
byte b = pagedStorage.get(position);
position++;
return b & 0xFF;
}
return -1;
}
@Override
public int read(byte @NotNull [] buffer, int offset, int length) throws IOException {
//only allow a read of the amount available.
if (length > available()) {
length = available();
}
if (available() > 0) {
pagedStorage.get(position, buffer, offset, length);
position += length;
}
return length;
}
@Override
public long skip(long amountToSkip) {
long amountSkipped = Math.min(amountToSkip, available());
position += amountSkipped;
return amountSkipped;
}
}

View File

@@ -326,7 +326,7 @@ public class PagedFileStorage implements Forceable/*, PagedStorage*/ {
public void close() throws IOException {
ExceptionUtil.runAllAndRethrowAllExceptions(
new IOException("Failed to close appendable storage[" + getFile() + "]"),
new IOException("Failed to close PagedFileStorage[" + getFile() + "]"),
this::force,
() -> {

View File

@@ -7,9 +7,17 @@ import org.jetbrains.annotations.NotNull;
import java.io.Closeable;
import java.io.IOException;
/** Every single method call must be guarded by lockRead/lockWrite -- including .close() and .force()! */
/**
* Every single method call must be guarded by lockRead/lockWrite -- including .close() and .force()!
* <p>
* TODO RC: there is inconsistency in interpreting offset/ids by this class: from the usage, all
* int params/return values here are kind of 'id'. I.e. append(value) returns valueId, something
* that could be used to access value later on -- read(valueId) it back, checkBytesAreTheSame(valueId, value),
* enumerate all values with processAll(). But all apt parameters are named 'addr' or 'offset',
* which is
*/
public interface AppendableObjectStorage<Data> extends Forceable, Closeable {
Data read(int addr, boolean checkAccess) throws IOException;
Data read(int valueId, boolean checkAccess) throws IOException;
/**
* Method now has quite convoluted semantics (inferred from implementation and use-cases):
@@ -27,9 +35,10 @@ public interface AppendableObjectStorage<Data> extends Forceable, Closeable {
*/
boolean processAll(@NotNull StorageObjectProcessor<? super Data> processor) throws IOException;
/** @return ID of value appended, by which value could be referred later on */
int append(Data value) throws IOException;
boolean checkBytesAreTheSame(int addr, Data value) throws IOException;
boolean checkBytesAreTheSame(int valueId, Data value) throws IOException;
void clear() throws IOException;
@@ -50,6 +59,6 @@ public interface AppendableObjectStorage<Data> extends Forceable, Closeable {
@FunctionalInterface
interface StorageObjectProcessor<Data> {
boolean process(int offset, Data data);
boolean process(int valueId, Data value);
}
}

View File

@@ -0,0 +1,389 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.io.keyStorage;
import com.intellij.openapi.util.io.BufferExposingByteArrayOutputStream;
import com.intellij.util.ExceptionUtil;
import com.intellij.util.io.DataOutputStream;
import com.intellij.util.io.*;
import com.intellij.util.io.pagecache.Page;
import com.intellij.util.io.pagecache.PagedStorage;
import com.intellij.util.io.pagecache.PagedStorageWithPageUnalignedAccess;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import java.io.*;
import java.nio.file.Path;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.intellij.util.io.pagecache.impl.PageContentLockingStrategy.SharedLockLockingStrategy;
/**
* {@link AppendableObjectStorage} implementation on the top of {@link PagedStorage}
* valueId == offset of value in a file
*/
//@NotThreadSafe
public class AppendableStorageBackedByPagedStorageLockFree<Data> implements AppendableObjectStorage<Data> {
@VisibleForTesting
@ApiStatus.Internal
public static final int APPEND_BUFFER_SIZE = 4096;
private static final ThreadLocal<DataStreamOverPagedStorage> TLOCAL_READ_STREAMS =
ThreadLocal.withInitial(() -> new DataStreamOverPagedStorage());
private final PagedStorage storage;
private final ReentrantReadWriteLock storageLock;
private int fileLength;
private @Nullable AppendMemoryBuffer appendBuffer;
private final @NotNull DataExternalizer<Data> dataDescriptor;
public AppendableStorageBackedByPagedStorageLockFree(@NotNull Path file,
@Nullable StorageLockContext lockContext,
int pageSize,
boolean valuesAreBufferAligned,
@NotNull DataExternalizer<Data> dataDescriptor,
@NotNull ReentrantReadWriteLock storageLock) throws IOException {
this.storageLock = storageLock;
PagedFileStorageWithRWLockedPageContent storage = new PagedFileStorageWithRWLockedPageContent(
file,
lockContext,
pageSize,
/*nativeByteOrder: */false, //TODO RC: why not native order?
new SharedLockLockingStrategy(this.storageLock)
);
this.storage = valuesAreBufferAligned ? storage : new PagedStorageWithPageUnalignedAccess(storage);
this.dataDescriptor = dataDescriptor;
fileLength = Math.toIntExact(this.storage.length());
}
@Override
public void clear() throws IOException {
storage.clear();
fileLength = 0;
}
@Override
public void lockRead() {
storageLock.readLock().lock();
}
@Override
public void unlockRead() {
storageLock.readLock().unlock();
}
@Override
public void lockWrite() {
storageLock.writeLock().lock();
}
@Override
public void unlockWrite() {
storageLock.writeLock().unlock();
}
@Override
public boolean isDirty() {
return AppendMemoryBuffer.hasChanges(appendBuffer) || storage.isDirty();
}
private void flushAppendBuffer() throws IOException {
if (AppendMemoryBuffer.hasChanges(appendBuffer)) {
int bufferPosition = appendBuffer.getBufferPosition();
storage.put(fileLength, appendBuffer.getAppendBuffer(), 0, bufferPosition);
fileLength += bufferPosition;
appendBuffer = appendBuffer.rewind(fileLength);
}
}
@Override
public void force() throws IOException {
flushAppendBuffer();
storage.force();
}
@Override
public void close() throws IOException {
try {
ExceptionUtil.runAllAndRethrowAllExceptions(
new IOException("Failed to .close() appendable storage [" + storage.getFile() + "]"),
this::flushAppendBuffer,
storage::close
);
}
finally {
TLOCAL_READ_STREAMS.remove();
}
}
@Override
public Data read(int valueId, boolean checkAccess) throws IOException {
AppendMemoryBuffer buffer = appendBuffer;
int offset = valueId;
if (buffer != null
&& (long)offset >= buffer.startingOffsetInFile) {
AppendMemoryBuffer copyForRead = buffer.copy();
int bufferOffset = offset - copyForRead.startingOffsetInFile;
if (bufferOffset > copyForRead.bufferPosition) {
throw new NoDataException("Requested address(=" + offset + ") points to un-existed data: " + appendBuffer);
}
UnsyncByteArrayInputStream is = new UnsyncByteArrayInputStream(
copyForRead.getAppendBuffer(),
bufferOffset,
copyForRead.getBufferPosition()
);
return dataDescriptor.read(new DataInputStream(is));
}
if (offset >= fileLength) {
throw new NoDataException("Requested address(=" + offset + ") points to un-existed data (file length: " + fileLength + ")");
}
// we do not need to flushAppendBuffer() since we store complete records
DataStreamOverPagedStorage rs = TLOCAL_READ_STREAMS.get();
rs.setup(storage, offset, fileLength);
return dataDescriptor.read(rs);
}
@Override
public boolean processAll(@NotNull StorageObjectProcessor<? super Data> processor) throws IOException {
if (isDirty()) {
//RC: why not .force() right here? Probably because of the locks: processAll is a read method, so
// it is likely readLock is acquired, but force() requires writeLock, and one can't upgrade read
// lock to the write one. So the responsibility was put on a caller.
//MAYBE RC: we really don't need full flush here -- flushAppendBuffer() is enough, since
//PagedStorage.readInputStream() reads over cached pages, not over on-disk file, as
//legacy PagedFileStorage does. But still requires writeLock, hence comment before still
//applies
throw new IllegalStateException("Must be .force()-ed first");
}
if (fileLength == 0) {
return true;
}
IOCancellationCallbackHolder.checkCancelled();
//throw new UnsupportedOperationException("Method not implemented yet");
return storage.readInputStream(is -> {
// calculation may restart few times, so it's expected that processor processes duplicates
LimitedInputStream lis = new LimitedInputStream(new BufferedInputStream(is), fileLength) {
@Override
public int available() {
return remainingLimit();
}
};
DataInputStream valuesStream = new DataInputStream(lis);
try {
while (true) {
int offset = lis.getBytesRead();
Data value = dataDescriptor.read(valuesStream);
if (!processor.process(offset, value)) return false;
}
}
catch (EOFException e) {
// Done
}
return true;
});
}
@Override
public int getCurrentLength() {
return AppendMemoryBuffer.getBufferPosition(appendBuffer) + fileLength;
}
@Override
public int append(Data value) throws IOException {
BufferExposingByteArrayOutputStream bos = new BufferExposingByteArrayOutputStream();
DataOutput out = new DataOutputStream(bos);
dataDescriptor.save(out, value);
int size = bos.size();
byte[] buffer = bos.getInternalBuffer();
int currentLength = getCurrentLength();
if (size > APPEND_BUFFER_SIZE) {
flushAppendBuffer();
storage.put(currentLength, buffer, 0, size);
fileLength += size;
if (appendBuffer != null) {
appendBuffer = appendBuffer.rewind(fileLength);
}
}
else {
if (size > APPEND_BUFFER_SIZE - AppendMemoryBuffer.getBufferPosition(appendBuffer)) {
flushAppendBuffer();
}
// myAppendBuffer will contain complete records
if (appendBuffer == null) {
appendBuffer = new AppendMemoryBuffer(fileLength);
}
appendBuffer.append(buffer, size);
}
return currentLength;
}
@Override
public boolean checkBytesAreTheSame(int valueId,
Data value) throws IOException {
int offset = valueId;
try (CheckerOutputStream comparer = buildOldComparerStream(offset)) {
DataOutput out = new DataOutputStream(comparer);
dataDescriptor.save(out, value);
return comparer.same;
}
}
private abstract static class CheckerOutputStream extends OutputStream {
boolean same = true;
}
/**
* @return fake OutputStream impl that doesn't write anything, but compare bytes to be written against
* bytes already in a file on the same positions, and set .same to be true or false
*/
private @NotNull CheckerOutputStream buildOldComparerStream(int startingOffsetInFile) throws IOException {
if (fileLength <= startingOffsetInFile) {
return new CheckerOutputStream() {
private int address = startingOffsetInFile - fileLength;
@Override
public void write(int b) {
if (same) {
AppendMemoryBuffer buffer = appendBuffer;
same = address < AppendMemoryBuffer.getBufferPosition(buffer)
&& buffer.getAppendBuffer()[address++] == (byte)b;
}
}
};
}
else {
return new CheckerOutputStream() {
private int offsetInFile = startingOffsetInFile;
private int offsetInPage = storage.toOffsetInPage(startingOffsetInFile);
private Page currentPage = storage.pageByOffset(startingOffsetInFile, /*forWrite: */false);
private final int pageSize = storage.getPageSize();
@Override
public void write(int b) throws IOException {
if (same) {
if (pageSize == offsetInPage && offsetInFile < fileLength) { // reached end of current byte buffer
offsetInFile += offsetInPage;
currentPage.close();
currentPage = storage.pageByOffset(offsetInFile, /*forWrite: */ false);
offsetInPage = 0;
}
same = offsetInPage < fileLength && currentPage.get(offsetInPage) == (byte)b;
offsetInPage++;
}
}
@Override
public void close() {
currentPage.close();
}
};
}
}
private static final class DataStreamOverPagedStorage extends DataInputStream {
private DataStreamOverPagedStorage() {
super(new BufferedInputStreamOverPagedStorage());
}
void setup(PagedStorage storage, long pos, long limit) {
((BufferedInputStreamOverPagedStorage)in).setup(storage, pos, limit);
}
}
private static class BufferedInputStreamOverPagedStorage extends BufferedInputStream {
BufferedInputStreamOverPagedStorage() {
super(TOMBSTONE, 512);
}
void setup(@NotNull PagedStorage storage,
long position,
long limit) {
this.pos = 0;
this.count = 0;
this.in = new InputStreamOverPagedStorage(storage, position, limit);
}
}
private static final InputStream TOMBSTONE = new InputStream() {
@Override
public int read() {
throw new IllegalStateException("should not happen");
}
};
/**
* The buffer caches in memory a region of a file [startingOffsetInFile..startingOffsetInFile+bufferPosition],
* with both ends inclusive.
*/
private static class AppendMemoryBuffer {
private final byte[] buffer;
/**
* Similar to ByteBuffer.position: a cursor pointing to the last written byte of a buffer.
* I.e. (bufferPosition+1) is the next byte to be written.
*/
private int bufferPosition;
private final int startingOffsetInFile;
private AppendMemoryBuffer(int startingOffsetInFile) {
this(new byte[APPEND_BUFFER_SIZE], 0, startingOffsetInFile);
}
private AppendMemoryBuffer(byte[] buffer,
int bufferPosition,
int startingOffsetInFile) {
this.buffer = buffer;
this.startingOffsetInFile = startingOffsetInFile;
this.bufferPosition = bufferPosition;
}
private byte[] getAppendBuffer() {
return buffer;
}
private int getBufferPosition() {
return bufferPosition;
}
public void append(byte[] buffer, int size) {
System.arraycopy(buffer, 0, this.buffer, bufferPosition, size);
bufferPosition += size;
}
public @NotNull AppendMemoryBuffer copy() {
return new AppendMemoryBuffer(ByteArrays.copy(buffer), bufferPosition, startingOffsetInFile);
}
public @NotNull AppendMemoryBuffer rewind(int offsetInFile) {
return new AppendMemoryBuffer(buffer, 0, offsetInFile);
}
@Override
public String toString() {
return "AppendMemoryBuffer[" + startingOffsetInFile + ".." + (startingOffsetInFile + bufferPosition) + "]";
}
private static int getBufferPosition(@Nullable AppendMemoryBuffer buffer) {
return buffer != null ? buffer.bufferPosition : 0;
}
private static boolean hasChanges(@Nullable AppendMemoryBuffer buffer) {
return buffer != null && buffer.getBufferPosition() > 0;
}
}
}

View File

@@ -14,10 +14,7 @@ import org.jetbrains.annotations.VisibleForTesting;
import java.io.*;
import java.nio.file.Path;
//TODO RC: this class is suspicious because it uses some multi-threading constructs (synchronized, volatile), but
// the class overall is not thread safe in any reasonable sense. I.e. authors seem to consider the class
// for multithreaded use, but it is really not safe for it. We should either made it really thread-safe,
// or remove all synchronized/volatile, and rely on caller for synchronization.
/** valueId == offset of value in a file */
public class AppendableStorageBackedByResizableMappedFile<Data> implements AppendableObjectStorage<Data> {
@VisibleForTesting
@ApiStatus.Internal
@@ -27,6 +24,11 @@ public class AppendableStorageBackedByResizableMappedFile<Data> implements Appen
private final ResizeableMappedFile storage;
//TODO RC: the class is suspicious because it uses some multi-threading constructs (synchronized, volatile), but
// the class overall is not thread safe in any reasonable sense. I.e. authors seem to consider the class
// for multithreaded use, but it is really not safe for it. We should either made it really thread-safe,
// or remove all synchronized/volatile, and rely on caller for synchronization.
private volatile int fileLength;
private volatile @Nullable AppendMemoryBuffer appendBuffer;
@@ -60,7 +62,8 @@ public class AppendableStorageBackedByResizableMappedFile<Data> implements Appen
@Override
public Data read(int offset, boolean checkAccess) throws IOException {
public Data read(int valueId, boolean checkAccess) throws IOException {
int offset = valueId;
AppendMemoryBuffer buffer = appendBuffer;
if (buffer != null
&& (long)offset >= buffer.startingOffsetInFile) {
@@ -101,6 +104,9 @@ public class AppendableStorageBackedByResizableMappedFile<Data> implements Appen
// strict consistency -- items added after last flush but before .processAll() could be not listed
//
//if (isDirty()) {
// //RC: why not .force() right here? Probably because of the locks: processAll is a read method, so
// // it is likely readLock is acquired, but force() requires writeLock, and one can't upgrade read
// // lock to the write one. So the responsibility was put on a caller.
// throw new IllegalStateException("Must be .force()-ed first");
//}
if (fileLength == 0) {
@@ -108,19 +114,19 @@ public class AppendableStorageBackedByResizableMappedFile<Data> implements Appen
}
IOCancellationCallbackHolder.checkCancelled();
return storage.readInputStream(is -> {
// calculation may restart few times, so it's expected that processor processes duplicated
// calculation may restart few times, so it's expected that processor processes duplicates
LimitedInputStream lis = new LimitedInputStream(new BufferedInputStream(is), fileLength) {
@Override
public int available() {
return remainingLimit();
}
};
DataInputStream keyStream = new DataInputStream(lis);
DataInputStream valuesStream = new DataInputStream(lis);
try {
while (true) {
int offset = lis.getBytesRead();
Data key = dataDescriptor.read(keyStream);
if (!processor.process(offset, key)) return false;
Data value = dataDescriptor.read(valuesStream);
if (!processor.process(offset, value)) return false;
}
}
catch (EOFException e) {
@@ -168,8 +174,9 @@ public class AppendableStorageBackedByResizableMappedFile<Data> implements Appen
}
@Override
public boolean checkBytesAreTheSame(int addr, Data value) throws IOException {
try (CheckerOutputStream comparer = buildOldComparerStream(addr)) {
public boolean checkBytesAreTheSame(int valueId, Data value) throws IOException {
int offset = valueId;
try (CheckerOutputStream comparer = buildOldComparerStream(offset)) {
DataOutput out = new DataOutputStream(comparer);
dataDescriptor.save(out, value);
return comparer.same;

View File

@@ -6,6 +6,11 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
/**
* {@link AppendableObjectStorage} implementation for values that could be bijectively mapped
* to-from int, see {@link InlineKeyDescriptor}.
* valueId == value itself converted to int
*/
public class InlinedKeyStorage<Data> implements AppendableObjectStorage<Data> {
private final InlineKeyDescriptor<Data> myDescriptor;
@@ -15,8 +20,8 @@ public class InlinedKeyStorage<Data> implements AppendableObjectStorage<Data> {
}
@Override
public Data read(int addr, boolean checkAccess) throws IOException {
return myDescriptor.fromInt(addr);
public Data read(int valueId, boolean checkAccess) throws IOException {
return myDescriptor.fromInt(valueId);
}
@Override
@@ -30,7 +35,7 @@ public class InlinedKeyStorage<Data> implements AppendableObjectStorage<Data> {
}
@Override
public boolean checkBytesAreTheSame(int addr, Data value) {
public boolean checkBytesAreTheSame(int valueId, Data value) {
return false;
}

View File

@@ -3,70 +3,82 @@
package com.intellij.util.io.keyStorage;
import com.intellij.util.io.ResizeableMappedFile;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.InputStream;
final class MappedFileInputStream extends InputStream {
/** {@link InputStream} over {@link ResizeableMappedFile}: reads bytes through {@link ResizeableMappedFile#get(long, byte[], int, int, boolean)} */
@ApiStatus.Internal
public final class MappedFileInputStream extends InputStream {
private final ResizeableMappedFile raf;
private final boolean checkAccess;
private final long limit;
private final boolean myCheckAccess;
private int cur;
MappedFileInputStream(@NotNull ResizeableMappedFile raf, final long pos, final long limit, boolean checkAccess) {
this.raf = raf;
this.cur = (int)pos;
this.limit = limit;
myCheckAccess = checkAccess;
}
private long position;
@Override
public int available()
{
return (int)(limit - cur);
}
@Override
public void close()
{
//do nothing because we want to leave the random access file open.
}
@Override
public int read() throws IOException
{
int retval = -1;
if( cur < limit )
{
retval = raf.get(cur++, myCheckAccess);
public MappedFileInputStream(@NotNull ResizeableMappedFile raf,
long position,
long limit,
boolean checkAccess) {
long totalBytes = limit - position;
if (totalBytes > Integer.MAX_VALUE) {
throw new IllegalArgumentException("limit(=" + limit + ")-position(=" + position + ") = " + totalBytes + " > MAX_INT");
}
return retval;
long fileLength = raf.length();
if (limit > fileLength) {
throw new IllegalArgumentException("limit(=" + limit + ") > file.length(=" + fileLength + ")");
}
this.raf = raf;
this.position = (int)position;
this.limit = limit;
this.checkAccess = checkAccess;
}
@Override
public int read(byte @NotNull [] b, int offset, int length ) throws IOException
{
//only allow a read of the amount available.
if( length > available() )
{
length = available();
}
if( available() > 0 )
{
raf.get(cur, b, offset, length, myCheckAccess);
cur += length;
}
return length;
public int available() {
return (int)(limit - position);
}
@Override
public long skip( long amountToSkip )
{
long amountSkipped = Math.min( amountToSkip, available() );
cur+= amountSkipped;
return amountSkipped;
public void close() {
//do nothing because we want to leave the random access file open.
}
@Override
public int read() throws IOException {
if (position < limit) {
byte b = raf.get(position, checkAccess);
position++;
return b & 0xFF;
}
return -1;
}
@Override
public int read(byte @NotNull [] b, int offset, int length) throws IOException {
//only allow a read of the amount available.
if (length > available()) {
length = available();
}
if (available() > 0) {
raf.get(position, b, offset, length, checkAccess);
position += length;
}
return length;
}
@Override
public long skip(long amountToSkip) {
long amountSkipped = Math.min(amountToSkip, available());
position += amountSkipped;
return amountSkipped;
}
}

View File

@@ -2,10 +2,13 @@
package com.intellij.util.io.pagecache;
import com.intellij.openapi.Forceable;
import com.intellij.openapi.util.ThrowableNotNullFunction;
import com.intellij.util.io.InputStreamOverPagedStorage;
import com.intellij.util.io.StorageLockContext;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
@@ -71,4 +74,16 @@ public interface PagedStorage extends Forceable, AutoCloseable {
@Override
void close() throws IOException;
default <R> @NotNull R readInputStream(
@NotNull ThrowableNotNullFunction<? super InputStream, R, ? extends IOException> consumer
) throws IOException {
//MAYBE RC: it is likely suboptimal way to read through storage -- potentially many pages are load only
// to be abandoned, aka 'cache trashing'. The better way would be to use already cached
// pages as-is, but read not-currently-cached pages just temporary, re-using single transient
// page object, without evicting currently cached pages.
try (final InputStreamOverPagedStorage stream = new InputStreamOverPagedStorage(this, 0, length())) {
return consumer.fun(stream);
}
}
}