mirror of
https://gitflic.ru/project/openide/openide.git
synced 2025-12-16 22:51:17 +07:00
extract uploadFile
GitOrigin-RevId: 5daf31c9b0b60865db544a8a4bf1342e77b8ae69
This commit is contained in:
committed by
intellij-monorepo-bot
parent
bfe75a78d3
commit
82b30feb4e
@@ -34,4 +34,4 @@ internal suspend fun <T> Collection<T>.forEachConcurrent(
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,10 +11,8 @@ import io.netty.handler.codec.http.HttpMethod
|
||||
import io.netty.handler.codec.http.HttpResponseStatus
|
||||
import io.netty.handler.codec.http2.Http2Headers
|
||||
import io.netty.handler.codec.http2.Http2HeadersFrame
|
||||
import io.netty.handler.codec.http2.Http2StreamChannel
|
||||
import io.netty.handler.codec.http2.ReadOnlyHttp2Headers
|
||||
import io.netty.util.AsciiString
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.NonCancellable
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.serialization.DeserializationStrategy
|
||||
@@ -101,40 +99,8 @@ internal class Http2ClientConnection internal constructor(
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun put(path: AsciiString, writer: suspend (stream: Http2StreamChannel) -> Long): Long {
|
||||
return connection.stream { stream, result ->
|
||||
val handler = WebDavPutStatusChecker(result)
|
||||
stream.pipeline().addLast(handler)
|
||||
|
||||
stream.writeHeaders(createHeaders(HttpMethod.PUT, path), endStream = false)
|
||||
handler.uploadedSize = writer(stream)
|
||||
|
||||
// 1. writer must send the last data frame with endStream=true
|
||||
// 2. stream now has the half-closed state - we listen for server header response with endStream
|
||||
// 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun createHeaders(method: HttpMethod, path: AsciiString): Http2Headers {
|
||||
return ReadOnlyHttp2Headers.clientHeaders(true, method.asciiName(), path, scheme, authority, *commonHeaders)
|
||||
}
|
||||
}
|
||||
|
||||
private class WebDavPutStatusChecker(private val result: CompletableDeferred<Long>) : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
|
||||
@JvmField var uploadedSize: Long = 0
|
||||
|
||||
override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) {
|
||||
if (!frame.isEndStream) {
|
||||
return
|
||||
}
|
||||
|
||||
val status = HttpResponseStatus.parseLine(frame.headers().status())
|
||||
// WebDAV server returns 204 for existing resources
|
||||
if (status == HttpResponseStatus.CREATED || status == HttpResponseStatus.NO_CONTENT || status == HttpResponseStatus.OK) {
|
||||
result.complete(uploadedSize)
|
||||
}
|
||||
else {
|
||||
result.completeExceptionally(IllegalStateException("Unexpected response status: $status"))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -181,6 +181,11 @@ internal class Http2ConnectionProvider(
|
||||
// must be canceled when the parent context is canceled
|
||||
val result = CompletableDeferred<T>(parent = coroutineContext.job)
|
||||
block(streamChannel, result)
|
||||
// Ensure the stream is closed before completing the operation.
|
||||
// This prevents the risk of opening more streams than intended,
|
||||
// especially when there is a limit on the number of parallel executed tasks.
|
||||
// Also, avoid explicitly closing the stream in case of a successful operation.
|
||||
streamChannel.closeFuture().joinCancellable(cancelFutureOnCancellation = false)
|
||||
return result.await()
|
||||
}
|
||||
finally {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
|
||||
package org.jetbrains.intellij.build.impl.compilation
|
||||
package org.jetbrains.intellij.build.http2Client
|
||||
|
||||
import com.github.luben.zstd.ZstdCompressCtx
|
||||
import com.github.luben.zstd.ZstdDecompressCtx
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
|
||||
// we cannot use Netty Recycler as we must close ZstdCompressCtx after use of pool
|
||||
@@ -40,4 +41,27 @@ internal class ZstdCompressContextPool(private val level: Int = 3) : AutoCloseab
|
||||
(pool.poll() ?: return).close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal class ZstdDecompressContextPool : AutoCloseable {
|
||||
private val pool = ConcurrentLinkedQueue<ZstdDecompressCtx>()
|
||||
|
||||
fun allocate(): ZstdDecompressCtx {
|
||||
pool.poll()?.let {
|
||||
return it
|
||||
}
|
||||
|
||||
return ZstdDecompressCtx()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
while (true) {
|
||||
(pool.poll() ?: return).close()
|
||||
}
|
||||
}
|
||||
|
||||
fun release(zstd: ZstdDecompressCtx) {
|
||||
zstd.reset()
|
||||
pool.offer(zstd)
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,12 @@ private val OVERWRITE_OPERATION = EnumSet.of(StandardOpenOption.WRITE, StandardO
|
||||
|
||||
internal data class DownloadResult(@JvmField var size: Long, @JvmField val digest: MessageDigest)
|
||||
|
||||
internal suspend fun Http2ClientConnection.download(path: String, file: Path, digestFactory: () -> MessageDigest): DownloadResult {
|
||||
internal suspend fun Http2ClientConnection.download(
|
||||
path: String,
|
||||
file: Path,
|
||||
zstdDecompressContextPool: ZstdDecompressContextPool,
|
||||
digestFactory: () -> MessageDigest,
|
||||
): DownloadResult {
|
||||
Files.createDirectories(file.parent)
|
||||
|
||||
return connection.stream { stream, result ->
|
||||
@@ -33,6 +38,7 @@ internal suspend fun Http2ClientConnection.download(path: String, file: Path, di
|
||||
result = result,
|
||||
downloadResult = DownloadResult(size = 0, digest = digestFactory()),
|
||||
file = file,
|
||||
zstdDecompressContextPool = zstdDecompressContextPool,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -46,6 +52,7 @@ private class DownloadHandler(
|
||||
private val result: CompletableDeferred<DownloadResult>,
|
||||
private val downloadResult: DownloadResult,
|
||||
private val file: Path,
|
||||
private val zstdDecompressContextPool: ZstdDecompressContextPool,
|
||||
) : InboundHandlerResultTracker<Http2StreamFrame>(result) {
|
||||
private var offset = 0L
|
||||
private var fileChannel: FileChannel? = null
|
||||
@@ -55,7 +62,7 @@ private class DownloadHandler(
|
||||
|
||||
override fun handlerAdded(ctx: ChannelHandlerContext?) {
|
||||
fileChannel = FileChannel.open(file, OVERWRITE_OPERATION)
|
||||
zstdDecompressContext = ZstdDecompressCtx()
|
||||
zstdDecompressContext = zstdDecompressContextPool.allocate()
|
||||
}
|
||||
|
||||
override fun handlerRemoved(context: ChannelHandlerContext) {
|
||||
@@ -64,8 +71,10 @@ private class DownloadHandler(
|
||||
fileChannel = null
|
||||
}
|
||||
finally {
|
||||
zstdDecompressContext?.close()
|
||||
zstdDecompressContext = null
|
||||
zstdDecompressContext?.let {
|
||||
zstdDecompressContext = null
|
||||
zstdDecompressContextPool.release(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,9 +54,7 @@ internal suspend fun <T : Any> Future<T>.cancellableAwait(): T {
|
||||
return getNow()
|
||||
}
|
||||
else {
|
||||
cause()?.let {
|
||||
throw it
|
||||
}
|
||||
throw cause()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,6 +81,7 @@ internal suspend fun Future<*>.joinCancellable(cancelFutureOnCancellation: Boole
|
||||
cause()?.let {
|
||||
throw it
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
suspendCancellableCoroutine { continuation ->
|
||||
@@ -116,6 +115,7 @@ internal abstract class InboundHandlerResultTracker<T : Any>(
|
||||
) : SimpleChannelInboundHandler<T>() {
|
||||
override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
|
||||
result.completeExceptionally(cause)
|
||||
context.close()
|
||||
}
|
||||
|
||||
override fun channelInactive(context: ChannelHandlerContext) {
|
||||
@@ -131,6 +131,7 @@ internal suspend fun Future<*>.joinNonCancellable() {
|
||||
cause()?.let {
|
||||
throw it
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// not suspendCancellableCoroutine - we must close the channel
|
||||
|
||||
@@ -0,0 +1,123 @@
|
||||
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
|
||||
package org.jetbrains.intellij.build.http2Client
|
||||
|
||||
import com.github.luben.zstd.Zstd
|
||||
import com.github.luben.zstd.ZstdCompressCtx
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.handler.codec.http.HttpMethod
|
||||
import io.netty.handler.codec.http.HttpResponseStatus
|
||||
import io.netty.handler.codec.http2.DefaultHttp2DataFrame
|
||||
import io.netty.handler.codec.http2.Http2HeadersFrame
|
||||
import io.netty.handler.codec.http2.Http2StreamChannel
|
||||
import io.netty.util.AsciiString
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import org.jetbrains.intellij.build.io.unmapBuffer
|
||||
import java.nio.MappedByteBuffer
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import java.util.*
|
||||
import kotlin.math.min
|
||||
|
||||
internal val READ_OPERATION = EnumSet.of(StandardOpenOption.READ)
|
||||
internal data class UploadResult(@JvmField var uploadedSize: Long, @JvmField var fileSize: Long)
|
||||
|
||||
internal suspend fun Http2ClientConnection.upload(
|
||||
path: AsciiString,
|
||||
file: Path,
|
||||
sourceBlockSize: Int,
|
||||
zstd: ZstdCompressCtx,
|
||||
): UploadResult {
|
||||
return connection.stream { stream, result ->
|
||||
val handler = WebDavPutStatusChecker(result)
|
||||
stream.pipeline().addLast(handler)
|
||||
|
||||
stream.writeHeaders(createHeaders(HttpMethod.PUT, path), endStream = false)
|
||||
|
||||
val fileBuffer = FileChannel.open(file, READ_OPERATION).use { channel ->
|
||||
channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size())
|
||||
}
|
||||
try {
|
||||
val fileSize = fileBuffer.remaining()
|
||||
compressAndUpload(
|
||||
fileBuffer = fileBuffer,
|
||||
sourceBlockSize = sourceBlockSize,
|
||||
zstd = zstd,
|
||||
stream = stream,
|
||||
fileSize = fileSize.toLong(),
|
||||
) {
|
||||
handler.uploadedResult = it
|
||||
}
|
||||
}
|
||||
finally {
|
||||
unmapBuffer(fileBuffer)
|
||||
}
|
||||
|
||||
// 1. writer must send the last data frame with endStream=true
|
||||
// 2. stream now has the half-closed state - we listen for server header response with endStream
|
||||
// 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server)
|
||||
}
|
||||
}
|
||||
|
||||
private class WebDavPutStatusChecker(private val result: CompletableDeferred<UploadResult>) : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
|
||||
@JvmField
|
||||
var uploadedResult: UploadResult? = null
|
||||
|
||||
override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) {
|
||||
if (!frame.isEndStream) {
|
||||
return
|
||||
}
|
||||
|
||||
val status = HttpResponseStatus.parseLine(frame.headers().status())
|
||||
// WebDAV server returns 204 for existing resources
|
||||
if (status == HttpResponseStatus.CREATED || status == HttpResponseStatus.NO_CONTENT || status == HttpResponseStatus.OK) {
|
||||
result.complete(uploadedResult!!)
|
||||
}
|
||||
else {
|
||||
result.completeExceptionally(IllegalStateException("Unexpected response status: $status"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun compressAndUpload(
|
||||
fileBuffer: MappedByteBuffer,
|
||||
sourceBlockSize: Int,
|
||||
zstd: ZstdCompressCtx,
|
||||
stream: Http2StreamChannel,
|
||||
fileSize: Long,
|
||||
uploadResultConsumer: (UploadResult) -> Unit,
|
||||
) {
|
||||
var position = 0
|
||||
var uploadedSize = 0L
|
||||
while (true) {
|
||||
val chunkSize = min(fileSize - position, sourceBlockSize.toLong()).toInt()
|
||||
val targetSize = Zstd.compressBound(chunkSize.toLong()).toInt()
|
||||
val targetNettyBuffer = stream.alloc().directBuffer(targetSize)
|
||||
val targetBuffer = targetNettyBuffer.nioBuffer(0, targetSize)
|
||||
val compressedSize = zstd.compressDirectByteBuffer(
|
||||
targetBuffer, // compress into targetBuffer
|
||||
targetBuffer.position(), // write compressed data starting at offset position()
|
||||
targetSize, // write no more than target block size bytes
|
||||
fileBuffer, // read data to compress from fileBuffer
|
||||
position, // start reading at position()
|
||||
chunkSize, // read chunk size bytes
|
||||
)
|
||||
assert(compressedSize > 0)
|
||||
targetNettyBuffer.writerIndex(targetNettyBuffer.writerIndex() + compressedSize)
|
||||
assert(targetNettyBuffer.readableBytes() == compressedSize)
|
||||
|
||||
position += chunkSize
|
||||
uploadedSize += compressedSize
|
||||
|
||||
val endStream = position >= fileSize
|
||||
val writeFuture = stream.writeAndFlush(DefaultHttp2DataFrame(targetNettyBuffer, endStream))
|
||||
if (endStream) {
|
||||
uploadResultConsumer(UploadResult(uploadedSize = uploadedSize, fileSize = fileSize))
|
||||
}
|
||||
writeFuture.joinCancellable()
|
||||
|
||||
if (endStream) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import org.jetbrains.annotations.VisibleForTesting
|
||||
import org.jetbrains.intellij.build.BuildMessages
|
||||
import org.jetbrains.intellij.build.CompilationContext
|
||||
import org.jetbrains.intellij.build.forEachConcurrent
|
||||
import org.jetbrains.intellij.build.http2Client.READ_OPERATION
|
||||
import org.jetbrains.intellij.build.http2Client.createHttp2ClientSessionFactory
|
||||
import org.jetbrains.intellij.build.io.AddDirEntriesMode
|
||||
import org.jetbrains.intellij.build.io.zip
|
||||
@@ -43,8 +44,9 @@ import kotlin.io.path.ExperimentalPathApi
|
||||
import kotlin.io.path.deleteRecursively
|
||||
import kotlin.io.path.listDirectoryEntries
|
||||
|
||||
internal val uploadParallelism = Runtime.getRuntime().availableProcessors().coerceIn(4, 32)
|
||||
internal val downloadParallelism = (Runtime.getRuntime().availableProcessors() * 2).coerceIn(8, 16)
|
||||
private val nettyMax = Runtime.getRuntime().availableProcessors() * 2
|
||||
internal val uploadParallelism = nettyMax.coerceIn(4, 32)
|
||||
internal val downloadParallelism = (nettyMax * 2).coerceIn(8, 16)
|
||||
|
||||
private const val BRANCH_PROPERTY_NAME = "intellij.build.compiled.classes.branch"
|
||||
private const val SERVER_URL_PROPERTY = "intellij.build.compiled.classes.server.url"
|
||||
|
||||
@@ -8,11 +8,11 @@ import okio.IOException
|
||||
import org.jetbrains.intellij.build.forEachConcurrent
|
||||
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
|
||||
import org.jetbrains.intellij.build.http2Client.Http2ClientConnectionFactory
|
||||
import org.jetbrains.intellij.build.http2Client.ZstdDecompressContextPool
|
||||
import org.jetbrains.intellij.build.http2Client.download
|
||||
import org.jetbrains.intellij.build.io.INDEX_FILENAME
|
||||
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
|
||||
import org.jetbrains.intellij.build.telemetry.use
|
||||
import java.math.BigInteger
|
||||
import java.net.URI
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.Files
|
||||
@@ -65,26 +65,29 @@ internal suspend fun downloadCompilationCache(
|
||||
}
|
||||
try {
|
||||
val errors = CopyOnWriteArrayList<Throwable>()
|
||||
toDownload.forEachConcurrent(downloadParallelism) { item ->
|
||||
val urlPath = "$urlPathWithPrefix${item.name}/${item.file.fileName}"
|
||||
spanBuilder("download").setAttribute("name", item.name).setAttribute("urlPath", urlPath).use { span ->
|
||||
try {
|
||||
downloadedBytes.getAndAdd(
|
||||
download(
|
||||
item = item,
|
||||
urlPath = urlPath,
|
||||
skipUnpack = skipUnpack,
|
||||
saveHash = saveHash,
|
||||
connection = connection,
|
||||
ZstdDecompressContextPool().use { zstdDecompressContextPool ->
|
||||
toDownload.forEachConcurrent(downloadParallelism) { item ->
|
||||
val urlPath = "$urlPathWithPrefix${item.name}/${item.file.fileName}"
|
||||
spanBuilder("download").setAttribute("name", item.name).setAttribute("urlPath", urlPath).use { span ->
|
||||
try {
|
||||
downloadedBytes.getAndAdd(
|
||||
download(
|
||||
item = item,
|
||||
urlPath = urlPath,
|
||||
skipUnpack = skipUnpack,
|
||||
saveHash = saveHash,
|
||||
connection = connection,
|
||||
zstdDecompressContextPool = zstdDecompressContextPool,
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
catch (e: CancellationException) {
|
||||
throw e
|
||||
}
|
||||
catch (e: Throwable) {
|
||||
span.recordException(e)
|
||||
errors.add(CompilePartDownloadFailedError(item, e))
|
||||
}
|
||||
catch (e: CancellationException) {
|
||||
throw e
|
||||
}
|
||||
catch (e: Throwable) {
|
||||
span.recordException(e)
|
||||
errors.add(CompilePartDownloadFailedError(item, e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -101,14 +104,14 @@ private suspend fun download(
|
||||
skipUnpack: Boolean,
|
||||
saveHash: Boolean,
|
||||
connection: Http2ClientConnection,
|
||||
zstdDecompressContextPool: ZstdDecompressContextPool,
|
||||
): Long {
|
||||
val (downloaded, digest) = connection.download(path = urlPath, file = item.file, digestFactory = { sha256() })
|
||||
val digestBytes = digest.digest()
|
||||
val computedHash = BigInteger(1, digestBytes).toString(36) + "-z"
|
||||
val (downloaded, digest) = connection.download(path = urlPath, file = item.file, zstdDecompressContextPool = zstdDecompressContextPool, digestFactory = { sha256() })
|
||||
val computedHash = digestToString(digest)
|
||||
if (computedHash != item.hash) {
|
||||
println("actualHash : ${computeHash(item.file)}")
|
||||
println("expectedHash: ${item.hash}")
|
||||
println("computedHash: $computedHash")
|
||||
//println("actualHash : ${computeHash(item.file)}")
|
||||
//println("expectedHash: ${item.hash}")
|
||||
//println("computedHash: $computedHash")
|
||||
|
||||
val spanAttributes = Attributes.of(
|
||||
AttributeKey.stringKey("name"), item.file.name,
|
||||
|
||||
@@ -3,11 +3,8 @@
|
||||
|
||||
package org.jetbrains.intellij.build.impl.compilation
|
||||
|
||||
import com.github.luben.zstd.Zstd
|
||||
import com.github.luben.zstd.ZstdCompressCtx
|
||||
import io.netty.handler.codec.http.HttpHeaderValues
|
||||
import io.netty.handler.codec.http.HttpResponseStatus
|
||||
import io.netty.handler.codec.http2.Http2StreamChannel
|
||||
import io.netty.util.AsciiString
|
||||
import io.opentelemetry.api.common.AttributeKey
|
||||
import io.opentelemetry.api.common.Attributes
|
||||
@@ -16,22 +13,15 @@ import kotlinx.serialization.Serializable
|
||||
import org.jetbrains.intellij.build.forEachConcurrent
|
||||
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
|
||||
import org.jetbrains.intellij.build.http2Client.MAX_BUFFER_SIZE
|
||||
import org.jetbrains.intellij.build.http2Client.writeData
|
||||
import org.jetbrains.intellij.build.io.unmapBuffer
|
||||
import org.jetbrains.intellij.build.http2Client.ZstdCompressContextPool
|
||||
import org.jetbrains.intellij.build.http2Client.upload
|
||||
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
|
||||
import org.jetbrains.intellij.build.telemetry.use
|
||||
import java.nio.MappedByteBuffer
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import java.util.*
|
||||
import java.util.concurrent.CancellationException
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.LongAdder
|
||||
import kotlin.math.min
|
||||
|
||||
internal val READ_OPERATION = EnumSet.of(StandardOpenOption.READ)
|
||||
|
||||
internal suspend fun uploadArchives(
|
||||
reportStatisticValue: (key: String, value: String) -> Unit,
|
||||
@@ -158,68 +148,12 @@ private suspend fun uploadFile(
|
||||
zstdCompressContextPool: ZstdCompressContextPool,
|
||||
uncompressedBytes: LongAdder,
|
||||
): Long {
|
||||
val fileBuffer = FileChannel.open(file, READ_OPERATION).use { channel ->
|
||||
channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size())
|
||||
val result = zstdCompressContextPool.withZstd { zstd ->
|
||||
httpConnection.upload(path = AsciiString.of(urlPath), file = file, sourceBlockSize = sourceBlockSize, zstd = zstd)
|
||||
}
|
||||
try {
|
||||
val fileSize = fileBuffer.remaining()
|
||||
require(fileSize > 0)
|
||||
uncompressedBytes.add(fileSize.toLong())
|
||||
|
||||
return zstdCompressContextPool.withZstd { zstd ->
|
||||
httpConnection.put(AsciiString.of(urlPath)) { stream ->
|
||||
compressAndUpload(
|
||||
fileBuffer = fileBuffer,
|
||||
sourceBlockSize = sourceBlockSize,
|
||||
zstd = zstd,
|
||||
stream = stream,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
unmapBuffer(fileBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun compressAndUpload(
|
||||
fileBuffer: MappedByteBuffer,
|
||||
sourceBlockSize: Int,
|
||||
zstd: ZstdCompressCtx,
|
||||
stream: Http2StreamChannel,
|
||||
): Long {
|
||||
var position = 0
|
||||
val fileSize = fileBuffer.remaining()
|
||||
var uploadedSize = 0L
|
||||
while (true) {
|
||||
val chunkSize = min(fileSize - position, sourceBlockSize)
|
||||
val targetSize = Zstd.compressBound(chunkSize.toLong()).toInt()
|
||||
val targetNettyBuffer = stream.alloc().directBuffer(targetSize)
|
||||
val targetBuffer = targetNettyBuffer.nioBuffer(0, targetSize)
|
||||
val compressedSize = zstd.compressDirectByteBuffer(
|
||||
targetBuffer, // compress into targetBuffer
|
||||
targetBuffer.position(), // write compressed data starting at offset position()
|
||||
targetSize, // write no more than target block size bytes
|
||||
fileBuffer, // read data to compress from fileBuffer
|
||||
position, // start reading at position()
|
||||
chunkSize, // read chunk size bytes
|
||||
)
|
||||
assert(compressedSize > 0)
|
||||
targetNettyBuffer.writerIndex(targetNettyBuffer.writerIndex() + compressedSize)
|
||||
assert(targetNettyBuffer.readableBytes() == compressedSize)
|
||||
|
||||
position += chunkSize
|
||||
|
||||
val endStream = position >= fileSize
|
||||
stream.writeData(targetNettyBuffer, endStream)
|
||||
|
||||
uploadedSize += compressedSize
|
||||
if (endStream) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return uploadedSize
|
||||
require(result.fileSize > 0)
|
||||
uncompressedBytes.add(result.fileSize)
|
||||
return result.uploadedSize
|
||||
}
|
||||
|
||||
@Serializable
|
||||
|
||||
Reference in New Issue
Block a user