diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt index 9b469ed1c31c..450228dc44fa 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ClientConnection.kt @@ -38,7 +38,6 @@ internal class Http2ClientConnection internal constructor( }) stream.writeHeaders(createHeaders(HttpMethod.HEAD, AsciiString.of(path)), endStream = true) - result.await() } } @@ -60,7 +59,6 @@ internal class Http2ClientConnection internal constructor( }) stream.writeHeaders(createHeaders(HttpMethod.HEAD, AsciiString.of(path)), endStream = true) - result.await() } } diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt index 0aec0d7f96e3..f5b48c3793ba 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2ConnectionProvider.kt @@ -123,7 +123,7 @@ internal class Http2ConnectionProvider( } } - suspend fun stream(block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred) -> T): T { + suspend fun stream(block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred) -> Unit): T { var attempt = 1 var currentDelay = 1_000L var suppressedExceptions: MutableList? = null @@ -208,22 +208,24 @@ internal class Http2ConnectionProvider( } // must be called with ioDispatcher - private suspend fun openStreamAndConsume( + private suspend fun openStreamAndConsume( connectionState: ConnectionState, - block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred) -> T, + block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred) -> Unit, ): T { val streamChannel = connectionState.bootstrap.open().cancellableAwait() try { // must be canceled when the parent context is canceled - val deferred = CompletableDeferred(parent = coroutineContext.job) - val result = block(streamChannel, deferred) + val deferred = CompletableDeferred(parent = coroutineContext.job) + block(streamChannel, deferred) // Ensure the stream is closed before completing the operation. // This prevents the risk of opening more streams than intended, // especially when there is a limit on the number of parallel executed tasks. // Also, avoid explicitly closing the stream in case of a successful operation. streamChannel.closeFuture().joinCancellable(cancelFutureOnCancellation = false) - deferred.await() - return result + // 1. writer must send the last data frame with endStream=true + // 2. stream now has the half-closed state - we listen for server header response with endStream + // 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server) + return deferred.await() } finally { if (streamChannel.isOpen && connectionState.coroutineScope.isActive) { diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2StreamJsonInboundHandler.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2StreamJsonInboundHandler.kt index b0f6aebb2976..d2fe0c75962b 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2StreamJsonInboundHandler.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/Http2StreamJsonInboundHandler.kt @@ -37,7 +37,6 @@ internal suspend fun Http2ClientConnection.getString(path: CharSequence): String ), endStream = true, ) - result.await() } } @@ -74,7 +73,6 @@ private suspend fun Http2ClientConnection.doGetJsonOrDefaultIfNotFound ), endStream = true, ) - result.await() } } @@ -93,7 +91,6 @@ private suspend fun Http2ClientConnection.post(path: AsciiString, data endStream = false, ) stream.writeData(ByteBufUtil.writeUtf8(bufferAllocator, data), endStream = true) - result.await() } } diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt index 5e28b8364444..56cbf3d69d24 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/downloadFile.kt @@ -32,7 +32,6 @@ internal suspend fun Http2ClientConnection.download(path: String, file: Path): L stream.pipeline().addLast(FileDownloadHandler(result = result, file = file)) stream.writeHeaders(createHeaders(HttpMethod.GET, AsciiString.of(path)), endStream = true) - result.await() } } @@ -64,7 +63,6 @@ internal suspend fun Http2ClientConnection.download( ) stream.writeHeaders(createHeaders(HttpMethod.GET, AsciiString.of(path)), endStream = true) - result.await() } } @@ -86,12 +84,6 @@ private class ZstdDecompressingFileDownloadHandler( result.completeExceptionally(cause) } - override fun channelInactive(context: ChannelHandlerContext) { - if (!result.isCompleted) { - result.completeExceptionally(IllegalStateException("Stream closed without result (download=$file)")) - } - } - override fun handlerAdded(ctx: ChannelHandlerContext?) { dataConsumer = if (unzip) ZipDecoder(file) else DataToFileConsumer(file) zstdDecompressContext = zstdDecompressContextPool.allocate() diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt index 5a5314e0aa2b..b33137f5167c 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/netty-util.kt @@ -116,12 +116,6 @@ internal abstract class InboundHandlerResultTracker( override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) { result.completeExceptionally(cause) } - - override fun channelInactive(context: ChannelHandlerContext) { - if (!result.isCompleted) { - result.completeExceptionally(IllegalStateException("Stream closed without result ($this)")) - } - } } // not suspendCancellableCoroutine - we must close the channel / event loop group diff --git a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt index de1e09faa5b2..6882d443190e 100644 --- a/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt +++ b/platform/build-scripts/src/org/jetbrains/intellij/build/http2Client/uploadFile.kt @@ -53,11 +53,12 @@ internal suspend fun Http2ClientConnection.upload( isDir: Boolean = false, ): UploadResult { return connection.stream { stream, result -> - stream.pipeline().addLast(WebDavPutStatusChecker(result)) + val status = CompletableDeferred(parent = result) + stream.pipeline().addLast(WebDavPutStatusChecker(status)) stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM), endStream = false) - if (zstdCompressContextPool == null) { + val uploadResult = if (zstdCompressContextPool == null) { FileChannel.open(file, READ_OPERATION).use { channel -> uploadUncompressed(fileChannel = channel, sourceBlockSize = sourceBlockSize, stream = stream, fileSize = channel.size()) } @@ -78,9 +79,8 @@ internal suspend fun Http2ClientConnection.upload( compressFile(file = file, zstdCompressContextPool = zstdCompressContextPool, sourceBlockSize = sourceBlockSize, stream = stream) } - // 1. writer must send the last data frame with endStream=true - // 2. stream now has the half-closed state - we listen for server header response with endStream - // 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server) + status.await() + result.complete(uploadResult) } }