simplify result handling for better error reporting

GitOrigin-RevId: 87ec05a525de5f2b7dd82a330d4fa6cf2459a43a
This commit is contained in:
Vladimir Krivosheev
2024-09-13 14:52:04 +02:00
committed by intellij-monorepo-bot
parent 406d8a8a67
commit 187cb1ec16
6 changed files with 14 additions and 31 deletions

View File

@@ -38,7 +38,6 @@ internal class Http2ClientConnection internal constructor(
})
stream.writeHeaders(createHeaders(HttpMethod.HEAD, AsciiString.of(path)), endStream = true)
result.await()
}
}
@@ -60,7 +59,6 @@ internal class Http2ClientConnection internal constructor(
})
stream.writeHeaders(createHeaders(HttpMethod.HEAD, AsciiString.of(path)), endStream = true)
result.await()
}
}

View File

@@ -123,7 +123,7 @@ internal class Http2ConnectionProvider(
}
}
suspend fun <T, R> stream(block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<R>) -> T): T {
suspend fun <T> stream(block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<T>) -> Unit): T {
var attempt = 1
var currentDelay = 1_000L
var suppressedExceptions: MutableList<Throwable>? = null
@@ -208,22 +208,24 @@ internal class Http2ConnectionProvider(
}
// must be called with ioDispatcher
private suspend fun <T, R> openStreamAndConsume(
private suspend fun <T> openStreamAndConsume(
connectionState: ConnectionState,
block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<R>) -> T,
block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<T>) -> Unit,
): T {
val streamChannel = connectionState.bootstrap.open().cancellableAwait()
try {
// must be canceled when the parent context is canceled
val deferred = CompletableDeferred<R>(parent = coroutineContext.job)
val result = block(streamChannel, deferred)
val deferred = CompletableDeferred<T>(parent = coroutineContext.job)
block(streamChannel, deferred)
// Ensure the stream is closed before completing the operation.
// This prevents the risk of opening more streams than intended,
// especially when there is a limit on the number of parallel executed tasks.
// Also, avoid explicitly closing the stream in case of a successful operation.
streamChannel.closeFuture().joinCancellable(cancelFutureOnCancellation = false)
deferred.await()
return result
// 1. writer must send the last data frame with endStream=true
// 2. stream now has the half-closed state - we listen for server header response with endStream
// 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server)
return deferred.await()
}
finally {
if (streamChannel.isOpen && connectionState.coroutineScope.isActive) {

View File

@@ -37,7 +37,6 @@ internal suspend fun Http2ClientConnection.getString(path: CharSequence): String
),
endStream = true,
)
result.await()
}
}
@@ -74,7 +73,6 @@ private suspend fun <T : Any> Http2ClientConnection.doGetJsonOrDefaultIfNotFound
),
endStream = true,
)
result.await()
}
}
@@ -93,7 +91,6 @@ private suspend fun <T : Any> Http2ClientConnection.post(path: AsciiString, data
endStream = false,
)
stream.writeData(ByteBufUtil.writeUtf8(bufferAllocator, data), endStream = true)
result.await()
}
}

View File

@@ -32,7 +32,6 @@ internal suspend fun Http2ClientConnection.download(path: String, file: Path): L
stream.pipeline().addLast(FileDownloadHandler(result = result, file = file))
stream.writeHeaders(createHeaders(HttpMethod.GET, AsciiString.of(path)), endStream = true)
result.await()
}
}
@@ -64,7 +63,6 @@ internal suspend fun Http2ClientConnection.download(
)
stream.writeHeaders(createHeaders(HttpMethod.GET, AsciiString.of(path)), endStream = true)
result.await()
}
}
@@ -86,12 +84,6 @@ private class ZstdDecompressingFileDownloadHandler(
result.completeExceptionally(cause)
}
override fun channelInactive(context: ChannelHandlerContext) {
if (!result.isCompleted) {
result.completeExceptionally(IllegalStateException("Stream closed without result (download=$file)"))
}
}
override fun handlerAdded(ctx: ChannelHandlerContext?) {
dataConsumer = if (unzip) ZipDecoder(file) else DataToFileConsumer(file)
zstdDecompressContext = zstdDecompressContextPool.allocate()

View File

@@ -116,12 +116,6 @@ internal abstract class InboundHandlerResultTracker<T : Any>(
override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
result.completeExceptionally(cause)
}
override fun channelInactive(context: ChannelHandlerContext) {
if (!result.isCompleted) {
result.completeExceptionally(IllegalStateException("Stream closed without result ($this)"))
}
}
}
// not suspendCancellableCoroutine - we must close the channel / event loop group

View File

@@ -53,11 +53,12 @@ internal suspend fun Http2ClientConnection.upload(
isDir: Boolean = false,
): UploadResult {
return connection.stream { stream, result ->
stream.pipeline().addLast(WebDavPutStatusChecker(result))
val status = CompletableDeferred<Unit>(parent = result)
stream.pipeline().addLast(WebDavPutStatusChecker(status))
stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM), endStream = false)
if (zstdCompressContextPool == null) {
val uploadResult = if (zstdCompressContextPool == null) {
FileChannel.open(file, READ_OPERATION).use { channel ->
uploadUncompressed(fileChannel = channel, sourceBlockSize = sourceBlockSize, stream = stream, fileSize = channel.size())
}
@@ -78,9 +79,8 @@ internal suspend fun Http2ClientConnection.upload(
compressFile(file = file, zstdCompressContextPool = zstdCompressContextPool, sourceBlockSize = sourceBlockSize, stream = stream)
}
// 1. writer must send the last data frame with endStream=true
// 2. stream now has the half-closed state - we listen for server header response with endStream
// 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server)
status.await()
result.complete(uploadResult)
}
}