From 82b30feb4e07d6a3820baea6d24f1d56f5282d9e Mon Sep 17 00:00:00 2001 From: Vladimir Krivosheev Date: Sun, 1 Sep 2024 19:46:10 +0200 Subject: [PATCH] extract uploadFile GitOrigin-RevId: 5daf31c9b0b60865db544a8a4bf1342e77b8ae69 --- .../jetbrains/intellij/build/concurrency.kt | 2 +- .../http2Client/Http2ClientConnection.kt | 34 ----- .../http2Client/Http2ConnectionProvider.kt | 5 + .../ZstdCompressContextPool.kt | 26 +++- .../{download.kt => downloadFile.kt} | 17 ++- .../intellij/build/http2Client/netty-util.kt | 7 +- .../intellij/build/http2Client/uploadFile.kt | 123 ++++++++++++++++++ .../impl/compilation/CompilationPartsUtil.kt | 6 +- .../build/impl/compilation/download.kt | 55 ++++---- .../intellij/build/impl/compilation/upload.kt | 80 +----------- 10 files changed, 211 insertions(+), 144 deletions(-) rename platform/build-scripts/src/org/jetbrains/intellij/build/{impl/compilation => http2Client}/ZstdCompressContextPool.kt (66%) rename platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/{download.kt => downloadFile.kt} (88%) create mode 100644 platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/concurrency.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/concurrency.kt index 0fc5120b6e40..89448ac32b57 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/concurrency.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/concurrency.kt @@ -34,4 +34,4 @@ internal suspend fun Collection.forEachConcurrent( } } } -} +} \ No newline at end of file diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt index 67bd9d30d6ac..ccf976025e62 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt @@ -11,10 +11,8 @@ import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http2.Http2Headers import io.netty.handler.codec.http2.Http2HeadersFrame -import io.netty.handler.codec.http2.Http2StreamChannel import io.netty.handler.codec.http2.ReadOnlyHttp2Headers import io.netty.util.AsciiString -import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.withContext import kotlinx.serialization.DeserializationStrategy @@ -101,40 +99,8 @@ internal class Http2ClientConnection internal constructor( } } - suspend fun put(path: AsciiString, writer: suspend (stream: Http2StreamChannel) -> Long): Long { - return connection.stream { stream, result -> - val handler = WebDavPutStatusChecker(result) - stream.pipeline().addLast(handler) - - stream.writeHeaders(createHeaders(HttpMethod.PUT, path), endStream = false) - handler.uploadedSize = writer(stream) - - // 1. writer must send the last data frame with endStream=true - // 2. stream now has the half-closed state - we listen for server header response with endStream - // 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server) - } - } internal fun createHeaders(method: HttpMethod, path: AsciiString): Http2Headers { return ReadOnlyHttp2Headers.clientHeaders(true, method.asciiName(), path, scheme, authority, *commonHeaders) } -} - -private class WebDavPutStatusChecker(private val result: CompletableDeferred) : InboundHandlerResultTracker(result) { - @JvmField var uploadedSize: Long = 0 - - override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) { - if (!frame.isEndStream) { - return - } - - val status = HttpResponseStatus.parseLine(frame.headers().status()) - // WebDAV server returns 204 for existing resources - if (status == HttpResponseStatus.CREATED || status == HttpResponseStatus.NO_CONTENT || status == HttpResponseStatus.OK) { - result.complete(uploadedSize) - } - else { - result.completeExceptionally(IllegalStateException("Unexpected response status: $status")) - } - } } \ No newline at end of file diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt index 7b76accc8909..5b8161a80610 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt @@ -181,6 +181,11 @@ internal class Http2ConnectionProvider( // must be canceled when the parent context is canceled val result = CompletableDeferred(parent = coroutineContext.job) block(streamChannel, result) + // Ensure the stream is closed before completing the operation. + // This prevents the risk of opening more streams than intended, + // especially when there is a limit on the number of parallel executed tasks. + // Also, avoid explicitly closing the stream in case of a successful operation. + streamChannel.closeFuture().joinCancellable(cancelFutureOnCancellation = false) return result.await() } finally { diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/ZstdCompressContextPool.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/ZstdCompressContextPool.kt similarity index 66% rename from platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/ZstdCompressContextPool.kt rename to platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/ZstdCompressContextPool.kt index 829352f7b044..87c2ab9232a5 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/ZstdCompressContextPool.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/ZstdCompressContextPool.kt @@ -1,7 +1,8 @@ // Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. -package org.jetbrains.intellij.build.impl.compilation +package org.jetbrains.intellij.build.http2Client import com.github.luben.zstd.ZstdCompressCtx +import com.github.luben.zstd.ZstdDecompressCtx import java.util.concurrent.ConcurrentLinkedQueue // we cannot use Netty Recycler as we must close ZstdCompressCtx after use of pool @@ -40,4 +41,27 @@ internal class ZstdCompressContextPool(private val level: Int = 3) : AutoCloseab (pool.poll() ?: return).close() } } +} + +internal class ZstdDecompressContextPool : AutoCloseable { + private val pool = ConcurrentLinkedQueue() + + fun allocate(): ZstdDecompressCtx { + pool.poll()?.let { + return it + } + + return ZstdDecompressCtx() + } + + override fun close() { + while (true) { + (pool.poll() ?: return).close() + } + } + + fun release(zstd: ZstdDecompressCtx) { + zstd.reset() + pool.offer(zstd) + } } \ No newline at end of file diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/download.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt similarity index 88% rename from platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/download.kt rename to platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt index f01bd4f2b898..12a2eae65e0b 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/download.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt @@ -24,7 +24,12 @@ private val OVERWRITE_OPERATION = EnumSet.of(StandardOpenOption.WRITE, StandardO internal data class DownloadResult(@JvmField var size: Long, @JvmField val digest: MessageDigest) -internal suspend fun Http2ClientConnection.download(path: String, file: Path, digestFactory: () -> MessageDigest): DownloadResult { +internal suspend fun Http2ClientConnection.download( + path: String, + file: Path, + zstdDecompressContextPool: ZstdDecompressContextPool, + digestFactory: () -> MessageDigest, +): DownloadResult { Files.createDirectories(file.parent) return connection.stream { stream, result -> @@ -33,6 +38,7 @@ internal suspend fun Http2ClientConnection.download(path: String, file: Path, di result = result, downloadResult = DownloadResult(size = 0, digest = digestFactory()), file = file, + zstdDecompressContextPool = zstdDecompressContextPool, ), ) @@ -46,6 +52,7 @@ private class DownloadHandler( private val result: CompletableDeferred, private val downloadResult: DownloadResult, private val file: Path, + private val zstdDecompressContextPool: ZstdDecompressContextPool, ) : InboundHandlerResultTracker(result) { private var offset = 0L private var fileChannel: FileChannel? = null @@ -55,7 +62,7 @@ private class DownloadHandler( override fun handlerAdded(ctx: ChannelHandlerContext?) { fileChannel = FileChannel.open(file, OVERWRITE_OPERATION) - zstdDecompressContext = ZstdDecompressCtx() + zstdDecompressContext = zstdDecompressContextPool.allocate() } override fun handlerRemoved(context: ChannelHandlerContext) { @@ -64,8 +71,10 @@ private class DownloadHandler( fileChannel = null } finally { - zstdDecompressContext?.close() - zstdDecompressContext = null + zstdDecompressContext?.let { + zstdDecompressContext = null + zstdDecompressContextPool.release(it) + } } } diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt index c6d96978dba2..d084f49c0b46 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt @@ -54,9 +54,7 @@ internal suspend fun Future.cancellableAwait(): T { return getNow() } else { - cause()?.let { - throw it - } + throw cause() } } @@ -83,6 +81,7 @@ internal suspend fun Future<*>.joinCancellable(cancelFutureOnCancellation: Boole cause()?.let { throw it } + return } suspendCancellableCoroutine { continuation -> @@ -116,6 +115,7 @@ internal abstract class InboundHandlerResultTracker( ) : SimpleChannelInboundHandler() { override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) { result.completeExceptionally(cause) + context.close() } override fun channelInactive(context: ChannelHandlerContext) { @@ -131,6 +131,7 @@ internal suspend fun Future<*>.joinNonCancellable() { cause()?.let { throw it } + return } // not suspendCancellableCoroutine - we must close the channel diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt new file mode 100644 index 000000000000..6d50a5a1ac8d --- /dev/null +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt @@ -0,0 +1,123 @@ +// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. +package org.jetbrains.intellij.build.http2Client + +import com.github.luben.zstd.Zstd +import com.github.luben.zstd.ZstdCompressCtx +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpResponseStatus +import io.netty.handler.codec.http2.DefaultHttp2DataFrame +import io.netty.handler.codec.http2.Http2HeadersFrame +import io.netty.handler.codec.http2.Http2StreamChannel +import io.netty.util.AsciiString +import kotlinx.coroutines.CompletableDeferred +import org.jetbrains.intellij.build.io.unmapBuffer +import java.nio.MappedByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.Path +import java.nio.file.StandardOpenOption +import java.util.* +import kotlin.math.min + +internal val READ_OPERATION = EnumSet.of(StandardOpenOption.READ) +internal data class UploadResult(@JvmField var uploadedSize: Long, @JvmField var fileSize: Long) + +internal suspend fun Http2ClientConnection.upload( + path: AsciiString, + file: Path, + sourceBlockSize: Int, + zstd: ZstdCompressCtx, +): UploadResult { + return connection.stream { stream, result -> + val handler = WebDavPutStatusChecker(result) + stream.pipeline().addLast(handler) + + stream.writeHeaders(createHeaders(HttpMethod.PUT, path), endStream = false) + + val fileBuffer = FileChannel.open(file, READ_OPERATION).use { channel -> + channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size()) + } + try { + val fileSize = fileBuffer.remaining() + compressAndUpload( + fileBuffer = fileBuffer, + sourceBlockSize = sourceBlockSize, + zstd = zstd, + stream = stream, + fileSize = fileSize.toLong(), + ) { + handler.uploadedResult = it + } + } + finally { + unmapBuffer(fileBuffer) + } + + // 1. writer must send the last data frame with endStream=true + // 2. stream now has the half-closed state - we listen for server header response with endStream + // 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server) + } +} + +private class WebDavPutStatusChecker(private val result: CompletableDeferred) : InboundHandlerResultTracker(result) { + @JvmField + var uploadedResult: UploadResult? = null + + override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) { + if (!frame.isEndStream) { + return + } + + val status = HttpResponseStatus.parseLine(frame.headers().status()) + // WebDAV server returns 204 for existing resources + if (status == HttpResponseStatus.CREATED || status == HttpResponseStatus.NO_CONTENT || status == HttpResponseStatus.OK) { + result.complete(uploadedResult!!) + } + else { + result.completeExceptionally(IllegalStateException("Unexpected response status: $status")) + } + } +} + +private suspend fun compressAndUpload( + fileBuffer: MappedByteBuffer, + sourceBlockSize: Int, + zstd: ZstdCompressCtx, + stream: Http2StreamChannel, + fileSize: Long, + uploadResultConsumer: (UploadResult) -> Unit, +) { + var position = 0 + var uploadedSize = 0L + while (true) { + val chunkSize = min(fileSize - position, sourceBlockSize.toLong()).toInt() + val targetSize = Zstd.compressBound(chunkSize.toLong()).toInt() + val targetNettyBuffer = stream.alloc().directBuffer(targetSize) + val targetBuffer = targetNettyBuffer.nioBuffer(0, targetSize) + val compressedSize = zstd.compressDirectByteBuffer( + targetBuffer, // compress into targetBuffer + targetBuffer.position(), // write compressed data starting at offset position() + targetSize, // write no more than target block size bytes + fileBuffer, // read data to compress from fileBuffer + position, // start reading at position() + chunkSize, // read chunk size bytes + ) + assert(compressedSize > 0) + targetNettyBuffer.writerIndex(targetNettyBuffer.writerIndex() + compressedSize) + assert(targetNettyBuffer.readableBytes() == compressedSize) + + position += chunkSize + uploadedSize += compressedSize + + val endStream = position >= fileSize + val writeFuture = stream.writeAndFlush(DefaultHttp2DataFrame(targetNettyBuffer, endStream)) + if (endStream) { + uploadResultConsumer(UploadResult(uploadedSize = uploadedSize, fileSize = fileSize)) + } + writeFuture.joinCancellable() + + if (endStream) { + return + } + } +} \ No newline at end of file diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/CompilationPartsUtil.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/CompilationPartsUtil.kt index 58cd650d62b3..c3c3e3ab83d5 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/CompilationPartsUtil.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/CompilationPartsUtil.kt @@ -19,6 +19,7 @@ import org.jetbrains.annotations.VisibleForTesting import org.jetbrains.intellij.build.BuildMessages import org.jetbrains.intellij.build.CompilationContext import org.jetbrains.intellij.build.forEachConcurrent +import org.jetbrains.intellij.build.http2Client.READ_OPERATION import org.jetbrains.intellij.build.http2Client.createHttp2ClientSessionFactory import org.jetbrains.intellij.build.io.AddDirEntriesMode import org.jetbrains.intellij.build.io.zip @@ -43,8 +44,9 @@ import kotlin.io.path.ExperimentalPathApi import kotlin.io.path.deleteRecursively import kotlin.io.path.listDirectoryEntries -internal val uploadParallelism = Runtime.getRuntime().availableProcessors().coerceIn(4, 32) -internal val downloadParallelism = (Runtime.getRuntime().availableProcessors() * 2).coerceIn(8, 16) +private val nettyMax = Runtime.getRuntime().availableProcessors() * 2 +internal val uploadParallelism = nettyMax.coerceIn(4, 32) +internal val downloadParallelism = (nettyMax * 2).coerceIn(8, 16) private const val BRANCH_PROPERTY_NAME = "intellij.build.compiled.classes.branch" private const val SERVER_URL_PROPERTY = "intellij.build.compiled.classes.server.url" diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/download.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/download.kt index 50edf7f73149..7b8384ae5ff6 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/download.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/download.kt @@ -8,11 +8,11 @@ import okio.IOException import org.jetbrains.intellij.build.forEachConcurrent import org.jetbrains.intellij.build.http2Client.Http2ClientConnection import org.jetbrains.intellij.build.http2Client.Http2ClientConnectionFactory +import org.jetbrains.intellij.build.http2Client.ZstdDecompressContextPool import org.jetbrains.intellij.build.http2Client.download import org.jetbrains.intellij.build.io.INDEX_FILENAME import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder import org.jetbrains.intellij.build.telemetry.use -import java.math.BigInteger import java.net.URI import java.nio.channels.FileChannel import java.nio.file.Files @@ -65,26 +65,29 @@ internal suspend fun downloadCompilationCache( } try { val errors = CopyOnWriteArrayList() - toDownload.forEachConcurrent(downloadParallelism) { item -> - val urlPath = "$urlPathWithPrefix${item.name}/${item.file.fileName}" - spanBuilder("download").setAttribute("name", item.name).setAttribute("urlPath", urlPath).use { span -> - try { - downloadedBytes.getAndAdd( - download( - item = item, - urlPath = urlPath, - skipUnpack = skipUnpack, - saveHash = saveHash, - connection = connection, + ZstdDecompressContextPool().use { zstdDecompressContextPool -> + toDownload.forEachConcurrent(downloadParallelism) { item -> + val urlPath = "$urlPathWithPrefix${item.name}/${item.file.fileName}" + spanBuilder("download").setAttribute("name", item.name).setAttribute("urlPath", urlPath).use { span -> + try { + downloadedBytes.getAndAdd( + download( + item = item, + urlPath = urlPath, + skipUnpack = skipUnpack, + saveHash = saveHash, + connection = connection, + zstdDecompressContextPool = zstdDecompressContextPool, + ) ) - ) - } - catch (e: CancellationException) { - throw e - } - catch (e: Throwable) { - span.recordException(e) - errors.add(CompilePartDownloadFailedError(item, e)) + } + catch (e: CancellationException) { + throw e + } + catch (e: Throwable) { + span.recordException(e) + errors.add(CompilePartDownloadFailedError(item, e)) + } } } } @@ -101,14 +104,14 @@ private suspend fun download( skipUnpack: Boolean, saveHash: Boolean, connection: Http2ClientConnection, + zstdDecompressContextPool: ZstdDecompressContextPool, ): Long { - val (downloaded, digest) = connection.download(path = urlPath, file = item.file, digestFactory = { sha256() }) - val digestBytes = digest.digest() - val computedHash = BigInteger(1, digestBytes).toString(36) + "-z" + val (downloaded, digest) = connection.download(path = urlPath, file = item.file, zstdDecompressContextPool = zstdDecompressContextPool, digestFactory = { sha256() }) + val computedHash = digestToString(digest) if (computedHash != item.hash) { - println("actualHash : ${computeHash(item.file)}") - println("expectedHash: ${item.hash}") - println("computedHash: $computedHash") + //println("actualHash : ${computeHash(item.file)}") + //println("expectedHash: ${item.hash}") + //println("computedHash: $computedHash") val spanAttributes = Attributes.of( AttributeKey.stringKey("name"), item.file.name, diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/upload.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/upload.kt index 503583491da3..6b1aff079a1c 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/upload.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/impl/compilation/upload.kt @@ -3,11 +3,8 @@ package org.jetbrains.intellij.build.impl.compilation -import com.github.luben.zstd.Zstd -import com.github.luben.zstd.ZstdCompressCtx import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpResponseStatus -import io.netty.handler.codec.http2.Http2StreamChannel import io.netty.util.AsciiString import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes @@ -16,22 +13,15 @@ import kotlinx.serialization.Serializable import org.jetbrains.intellij.build.forEachConcurrent import org.jetbrains.intellij.build.http2Client.Http2ClientConnection import org.jetbrains.intellij.build.http2Client.MAX_BUFFER_SIZE -import org.jetbrains.intellij.build.http2Client.writeData -import org.jetbrains.intellij.build.io.unmapBuffer +import org.jetbrains.intellij.build.http2Client.ZstdCompressContextPool +import org.jetbrains.intellij.build.http2Client.upload import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder import org.jetbrains.intellij.build.telemetry.use -import java.nio.MappedByteBuffer -import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.file.Path -import java.nio.file.StandardOpenOption -import java.util.* import java.util.concurrent.CancellationException import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.LongAdder -import kotlin.math.min - -internal val READ_OPERATION = EnumSet.of(StandardOpenOption.READ) internal suspend fun uploadArchives( reportStatisticValue: (key: String, value: String) -> Unit, @@ -158,68 +148,12 @@ private suspend fun uploadFile( zstdCompressContextPool: ZstdCompressContextPool, uncompressedBytes: LongAdder, ): Long { - val fileBuffer = FileChannel.open(file, READ_OPERATION).use { channel -> - channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size()) + val result = zstdCompressContextPool.withZstd { zstd -> + httpConnection.upload(path = AsciiString.of(urlPath), file = file, sourceBlockSize = sourceBlockSize, zstd = zstd) } - try { - val fileSize = fileBuffer.remaining() - require(fileSize > 0) - uncompressedBytes.add(fileSize.toLong()) - - return zstdCompressContextPool.withZstd { zstd -> - httpConnection.put(AsciiString.of(urlPath)) { stream -> - compressAndUpload( - fileBuffer = fileBuffer, - sourceBlockSize = sourceBlockSize, - zstd = zstd, - stream = stream, - ) - } - } - } - finally { - unmapBuffer(fileBuffer) - } -} - -private suspend fun compressAndUpload( - fileBuffer: MappedByteBuffer, - sourceBlockSize: Int, - zstd: ZstdCompressCtx, - stream: Http2StreamChannel, -): Long { - var position = 0 - val fileSize = fileBuffer.remaining() - var uploadedSize = 0L - while (true) { - val chunkSize = min(fileSize - position, sourceBlockSize) - val targetSize = Zstd.compressBound(chunkSize.toLong()).toInt() - val targetNettyBuffer = stream.alloc().directBuffer(targetSize) - val targetBuffer = targetNettyBuffer.nioBuffer(0, targetSize) - val compressedSize = zstd.compressDirectByteBuffer( - targetBuffer, // compress into targetBuffer - targetBuffer.position(), // write compressed data starting at offset position() - targetSize, // write no more than target block size bytes - fileBuffer, // read data to compress from fileBuffer - position, // start reading at position() - chunkSize, // read chunk size bytes - ) - assert(compressedSize > 0) - targetNettyBuffer.writerIndex(targetNettyBuffer.writerIndex() + compressedSize) - assert(targetNettyBuffer.readableBytes() == compressedSize) - - position += chunkSize - - val endStream = position >= fileSize - stream.writeData(targetNettyBuffer, endStream) - - uploadedSize += compressedSize - if (endStream) { - break - } - } - - return uploadedSize + require(result.fileSize > 0) + uncompressedBytes.add(result.fileSize) + return result.uploadedSize } @Serializable