frame-based, HTTP/2-only, Netty-based client utilizing native, high-performance non-JDK transport and TLS engine for uploading and downloading compilation resources

GitOrigin-RevId: 219eca685356ccf2d8bace08411b1a8463b42f81
This commit is contained in:
Vladimir Krivosheev
2024-08-27 17:06:07 +02:00
committed by intellij-monorepo-bot
parent f8f562946a
commit 5b4d0e8b16
28 changed files with 2564 additions and 1074 deletions

126
.idea/libraries/netty.xml generated Normal file
View File

@@ -0,0 +1,126 @@
<component name="libraryTable">
<library name="netty" type="repository">
<properties maven-id="io.netty:netty-all:4.2.0.Alpha3">
<verification>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-all/4.2.0.Alpha3/netty-all-4.2.0.Alpha3.jar">
<sha256sum>92836a17468c54ba07fee5f3325f0f148d175a1c6098e9c4fe5228fad2b2fae1</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-buffer/4.2.0.Alpha3/netty-buffer-4.2.0.Alpha3.jar">
<sha256sum>f8a9afa625134827f93c4b86c2698b99290d5c3e8c532da9d7747cfb92e3d01d</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-codec/4.2.0.Alpha3/netty-codec-4.2.0.Alpha3.jar">
<sha256sum>ede4b23ebe57b7f99f03aeb51771387b6b89b3a1cd05bd3c5f37c37fec5e77f7</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-codec-http/4.2.0.Alpha3/netty-codec-http-4.2.0.Alpha3.jar">
<sha256sum>baf8b8aaa85b3b0701451ad9ef5ab4169832d04bd19f6828b2ff9354d06b293b</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-codec-http2/4.2.0.Alpha3/netty-codec-http2-4.2.0.Alpha3.jar">
<sha256sum>fc82eb46c5ff23f26c81948fbdd270d79eee95386a409e07b3941dc8ac841ef0</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-common/4.2.0.Alpha3/netty-common-4.2.0.Alpha3.jar">
<sha256sum>ae7b0324a91d896f035f00f3b6cb929dccdd3fbebcee5b35282ce75fd09de985</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-handler/4.2.0.Alpha3/netty-handler-4.2.0.Alpha3.jar">
<sha256sum>93ddb37b386a15fce6e2fb533289709885db77818c93a3ceefd2e9ec249dae1c</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-unix-common/4.2.0.Alpha3/netty-transport-native-unix-common-4.2.0.Alpha3.jar">
<sha256sum>b8ec622fc97a50b5ff0afc46f661f0c278fdb2c9744d2ce53a5173ed5b33b204</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-handler-ssl-ocsp/4.2.0.Alpha3/netty-handler-ssl-ocsp-4.2.0.Alpha3.jar">
<sha256sum>1d63bd25dd301730b23556aba2f13294e89d68ed6903702306a78d807b5f228f</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-resolver/4.2.0.Alpha3/netty-resolver-4.2.0.Alpha3.jar">
<sha256sum>2649883b71d6a7372b9f421aab0b7d1933e45df38b91079e8244b76fc399949f</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport/4.2.0.Alpha3/netty-transport-4.2.0.Alpha3.jar">
<sha256sum>6ca2870309fe9c8f762995a27618153a3163ad6282e8ace6d4c53102b530360b</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-epoll/4.2.0.Alpha3/netty-transport-classes-epoll-4.2.0.Alpha3.jar">
<sha256sum>e540dcb34f348c4894b223f6af18a3d27f77932c85d355ab95c77159b71881af</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-kqueue/4.2.0.Alpha3/netty-transport-classes-kqueue-4.2.0.Alpha3.jar">
<sha256sum>4185c2ec260672dbd0a60d65b808a5065d3195b1f60924ae6af920bcaa75e703</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-linux-x86_64.jar">
<sha256sum>47d20282148e7164e74d0a601b18b77ec86df4e4bf3c35245c97488b47f4f547</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-linux-aarch_64.jar">
<sha256sum>a56cfbaaa8156b8dd3c4b6efda389ee08e321732a0c2888d669375bc11c736b8</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-linux-riscv64.jar">
<sha256sum>c6d684065f8c5ad1f5eaf87a7a6f1650934f5d0fec36fb61e2726a39f0eb5c37</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.2.0.Alpha3/netty-transport-native-kqueue-4.2.0.Alpha3-osx-x86_64.jar">
<sha256sum>9441736ca20c82ebff3ec24b62837e7e944d86d0de679359600f75417f6b581a</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.2.0.Alpha3/netty-transport-native-kqueue-4.2.0.Alpha3-osx-aarch_64.jar">
<sha256sum>f9b9107a7d2b4e8816e5da8a6d5fbe229bfa18048572a010e1399a03c5b063f8</sha256sum>
</artifact>
</verification>
<exclude>
<dependency maven-id="io.netty:netty-codec-dns" />
<dependency maven-id="io.netty:netty-codec-haproxy" />
<dependency maven-id="io.netty:netty-codec-compression" />
<dependency maven-id="io.netty:netty-codec-memcache" />
<dependency maven-id="io.netty:netty-codec-mqtt" />
<dependency maven-id="io.netty:netty-codec-redis" />
<dependency maven-id="io.netty:netty-codec-smtp" />
<dependency maven-id="io.netty:netty-codec-socks" />
<dependency maven-id="io.netty:netty-codec-stomp" />
<dependency maven-id="io.netty:netty-codec-xml" />
<dependency maven-id="io.netty:netty-codec-protobuf" />
<dependency maven-id="com.google.protobuf:protobuf-java" />
<dependency maven-id="com.google.protobuf.nano:protobuf-javanano" />
<dependency maven-id="io.netty:netty-codec-marshalling" />
<dependency maven-id="org.jboss.marshalling:jboss-marshalling" />
<dependency maven-id="io.netty:netty-handler-proxy" />
<dependency maven-id="io.netty:netty-resolver-dns" />
<dependency maven-id="io.netty:netty-transport-rxtx" />
<dependency maven-id="io.netty:netty-transport-sctp" />
<dependency maven-id="io.netty:netty-transport-udt" />
<dependency maven-id="io.netty:netty-resolver-dns-classes-macos" />
<dependency maven-id="io.netty:netty-resolver-dns-native-macos" />
</exclude>
</properties>
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-all/4.2.0.Alpha3/netty-all-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-buffer/4.2.0.Alpha3/netty-buffer-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-codec/4.2.0.Alpha3/netty-codec-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-codec-http/4.2.0.Alpha3/netty-codec-http-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-codec-http2/4.2.0.Alpha3/netty-codec-http2-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-common/4.2.0.Alpha3/netty-common-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-handler/4.2.0.Alpha3/netty-handler-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-unix-common/4.2.0.Alpha3/netty-transport-native-unix-common-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-handler-ssl-ocsp/4.2.0.Alpha3/netty-handler-ssl-ocsp-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-resolver/4.2.0.Alpha3/netty-resolver-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport/4.2.0.Alpha3/netty-transport-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-epoll/4.2.0.Alpha3/netty-transport-classes-epoll-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-kqueue/4.2.0.Alpha3/netty-transport-classes-kqueue-4.2.0.Alpha3.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-linux-x86_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-linux-aarch_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-linux-riscv64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.2.0.Alpha3/netty-transport-native-kqueue-4.2.0.Alpha3-osx-x86_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.2.0.Alpha3/netty-transport-native-kqueue-4.2.0.Alpha3-osx-aarch_64.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-buffer/4.2.0.Alpha3/netty-buffer-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-codec/4.2.0.Alpha3/netty-codec-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-codec-http/4.2.0.Alpha3/netty-codec-http-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-codec-http2/4.2.0.Alpha3/netty-codec-http2-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-common/4.2.0.Alpha3/netty-common-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-handler/4.2.0.Alpha3/netty-handler-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-unix-common/4.2.0.Alpha3/netty-transport-native-unix-common-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-handler-ssl-ocsp/4.2.0.Alpha3/netty-handler-ssl-ocsp-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-resolver/4.2.0.Alpha3/netty-resolver-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport/4.2.0.Alpha3/netty-transport-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-epoll/4.2.0.Alpha3/netty-transport-classes-epoll-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-kqueue/4.2.0.Alpha3/netty-transport-classes-kqueue-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.2.0.Alpha3/netty-transport-native-epoll-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.2.0.Alpha3/netty-transport-native-kqueue-4.2.0.Alpha3-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.2.0.Alpha3/netty-transport-native-kqueue-4.2.0.Alpha3-sources.jar!/" />
</SOURCES>
</library>
</component>

View File

@@ -0,0 +1,48 @@
<component name="libraryTable">
<library name="netty-tcnative-boringssl" type="repository">
<properties maven-id="io.netty:netty-tcnative-boringssl-static:2.0.66.Final">
<verification>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final.jar">
<sha256sum>df215103b6082caceef6b83ed5bbf61d2072688b8b248e9d86cc0bbdb785b5e4</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-classes/2.0.66.Final/netty-tcnative-classes-2.0.66.Final.jar">
<sha256sum>669a811a193dc1e7c9ef86cb547a4ab92f0f34cce8f9b842b9029bf5cfa07cc5</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-linux-x86_64.jar">
<sha256sum>407547388ead01c371ae1de7616fa9ce8bc26aa4b180aa5f0452e23ecc02a8f1</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-linux-aarch_64.jar">
<sha256sum>0c18e0f8c70d801f1711ca9fef1ef9bdd5f9b9afb43292f439459ee780d758b6</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-osx-x86_64.jar">
<sha256sum>a34307997449310bcf327c42c46e2db0067c2adf3d66a19fe18e0fd9981fe162</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-osx-aarch_64.jar">
<sha256sum>3cf31a82c0d2c79b48050f02a60d08e9a17db00759ca1f7920cd35d842c7f95e</sha256sum>
</artifact>
<artifact url="file://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-windows-x86_64.jar">
<sha256sum>4f5a5665d3d8c4b2d5ffc40a0c4b07f94399b7d0a4ee01966df0bfc6f49d4524</sha256sum>
</artifact>
</verification>
</properties>
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-classes/2.0.66.Final/netty-tcnative-classes-2.0.66.Final.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-linux-x86_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-linux-aarch_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-osx-x86_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-osx-aarch_64.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-windows-x86_64.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-classes/2.0.66.Final/netty-tcnative-classes-2.0.66.Final-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-sources.jar!/" />
<root url="jar://$MAVEN_REPOSITORY$/io/netty/netty-tcnative-boringssl-static/2.0.66.Final/netty-tcnative-boringssl-static-2.0.66.Final-sources.jar!/" />
</SOURCES>
</library>
</component>

View File

@@ -5,9 +5,10 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import kotlinx.coroutines.delay
import org.jetbrains.intellij.build.dependencies.BuildDependenciesDownloader
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.math.min
import kotlin.random.Random
import kotlin.random.asJavaRandom
suspend fun <T> retryWithExponentialBackOff(
attempts: Int = 5,
@@ -17,7 +18,6 @@ suspend fun <T> retryWithExponentialBackOff(
onException: suspend (attempt: Int, e: Exception) -> Unit = { attempt, e -> defaultExceptionConsumer(attempt, e) },
action: suspend (attempt: Int) -> T
): T {
val random = Random()
var effectiveDelay = initialDelayMs
val exceptions = mutableListOf<Exception>()
for (attempt in 1..attempts) try {
@@ -43,7 +43,7 @@ suspend fun <T> retryWithExponentialBackOff(
delay(effectiveDelay)
}
effectiveDelay = nextDelay(
random, previousDelay = effectiveDelay,
previousDelay = effectiveDelay,
backOffLimitMs = backOffLimitMs,
backOffFactor = backOffFactor,
backOffJitter = backOffJitter,
@@ -63,14 +63,13 @@ private fun defaultExceptionConsumer(attempt: Int, e: Exception) {
}
private fun nextDelay(
random: Random,
previousDelay: Long,
backOffLimitMs: Long,
backOffFactor: Int,
backOffJitter: Double,
exceptions: List<Exception>
): Long {
val nextDelay = min(previousDelay * backOffFactor, backOffLimitMs) + (random.nextGaussian() * previousDelay * backOffJitter).toLong()
val nextDelay = min(previousDelay * backOffFactor, backOffLimitMs) + (Random.asJavaRandom().nextGaussian() * previousDelay * backOffJitter).toLong()
if (nextDelay > backOffLimitMs) {
throw Exception("Back off limit ${backOffLimitMs}ms exceeded, see suppressed exceptions for details").apply {
exceptions.forEach(this::addSuppressed)

View File

@@ -238,7 +238,8 @@
<orderEntry type="module" module-name="intellij.platform.boot" />
<orderEntry type="module" module-name="intellij.platform.ijent.community.buildConstants" />
<orderEntry type="library" name="ktor-client-core" level="project" />
<orderEntry type="library" name="snakeyaml" level="project" />
<orderEntry type="module" module-name="intellij.platform.util.coroutines" />
<orderEntry type="library" scope="RUNTIME" name="snakeyaml" level="project" />
<orderEntry type="library" name="netty" level="project" />
<orderEntry type="library" name="netty-tcnative-boringssl" level="project" />
</component>
</module>

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2022 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("ReplaceGetOrSet")
package org.jetbrains.intellij.build
@@ -110,8 +110,7 @@ data class LibraryLicense(
private const val APACHE_LICENSE_URL = "https://www.apache.org/licenses/LICENSE-2.0"
private val PREDEFINED_LICENSE_URLS = mapOf("Apache 2.0" to APACHE_LICENSE_URL)
@JvmStatic
val JETBRAINS_OWN = "JetBrains"
const val JETBRAINS_OWN = "JetBrains"
/**
* Denotes version of a library built from custom revision
@@ -123,7 +122,6 @@ data class LibraryLicense(
* so there is no way to give a link to their sites.
* For other libraries please fill all necessary fields of [LibraryLicense] instead of using this method.
*/
@JvmStatic
fun jetbrainsLibrary(libraryName: String): LibraryLicense {
return LibraryLicense(
libraryName = libraryName,

View File

@@ -0,0 +1,52 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import java.util.concurrent.CancellationException
internal suspend fun <T> Collection<T>.forEachConcurrent(concurrency: Int = Runtime.getRuntime().availableProcessors(), action: suspend (T) -> Unit) {
coroutineScope {
val itemChannel = produce {
for (item in this@forEachConcurrent) {
send(item)
}
}
repeat(concurrency) {
launch {
for (item in itemChannel) {
try {
action(item)
}
catch (e: CancellationException) {
if (coroutineContext.isActive) {
// well, we are not canceled, only child
throw IllegalStateException("Unexpected cancellation - action is cancelled itself", e)
}
}
}
}
}
}
val semaphore = Semaphore(concurrency)
coroutineScope {
for (item in this@forEachConcurrent) {
launch {
semaphore.withPermit {
try {
action(item)
}
catch (e: CancellationException) {
throw e
}
}
}
}
}
}

View File

@@ -0,0 +1,137 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("SSBasedInspection", "ReplacePutWithAssignment", "ReplaceGetOrSet")
package org.jetbrains.intellij.build.http2Client
import io.netty.buffer.ByteBufUtil
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http2.Http2Headers
import io.netty.handler.codec.http2.Http2HeadersFrame
import io.netty.handler.codec.http2.Http2StreamChannel
import io.netty.handler.codec.http2.ReadOnlyHttp2Headers
import io.netty.util.AsciiString
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.serializer
// https://cabulous.medium.com/http-2-and-how-it-works-9f645458e4b2
internal class Http2ClientConnection internal constructor(
private val scheme: AsciiString,
private val authority: AsciiString,
private val commonHeaders: Array<AsciiString>,
@JvmField internal val connection: Http2ConnectionProvider,
) {
suspend inline fun <T> use(block: (Http2ClientConnection) -> T): T {
try {
return block(this)
}
finally {
withContext(NonCancellable) {
close()
}
}
}
suspend fun close() {
connection.close()
}
internal suspend inline fun <reified T : Any> post(path: String, data: String, contentType: AsciiString): T {
return post(path = AsciiString.of(path), data = data, contentType = contentType, deserializer = serializer<T>())
}
private suspend fun <T : Any> post(path: AsciiString, data: String, contentType: AsciiString, deserializer: DeserializationStrategy<T>): T {
return connection.stream { stream, result ->
val bufferAllocator = stream.alloc()
stream.pipeline().addLast(Http2StreamJsonInboundHandler(result = result, bufferAllocator = bufferAllocator, deserializer = deserializer))
stream.writeHeaders(
headers = ReadOnlyHttp2Headers.clientHeaders(
true,
HttpMethod.POST.asciiName(),
path,
scheme,
authority,
HttpHeaderNames.CONTENT_TYPE, contentType,
HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP,
*commonHeaders
),
endStream = false,
)
stream.writeData(ByteBufUtil.writeUtf8(bufferAllocator, data), endStream = true)
}
}
suspend fun head(path: CharSequence): HttpResponseStatus {
return connection.stream { stream, result ->
stream.pipeline().addLast(object : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) {
result.complete(HttpResponseStatus.parseLine(frame.headers().status()))
}
})
stream.writeHeaders(createHeaders(HttpMethod.HEAD, AsciiString.of(path)), endStream = true)
}
}
suspend fun getRedirectLocation(path: CharSequence): CharSequence? {
return connection.stream { stream, result ->
stream.pipeline().addLast(object : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) {
result.complete(
when (HttpResponseStatus.parseLine(frame.headers().status())) {
HttpResponseStatus.FOUND, HttpResponseStatus.MOVED_PERMANENTLY, HttpResponseStatus.TEMPORARY_REDIRECT, HttpResponseStatus.SEE_OTHER -> {
frame.headers().get(HttpHeaderNames.LOCATION)
}
else -> {
null
}
}
)
}
})
stream.writeHeaders(createHeaders(HttpMethod.HEAD, AsciiString.of(path)), endStream = true)
}
}
suspend fun put(path: AsciiString, writer: suspend (stream: Http2StreamChannel) -> Unit) {
connection.stream { stream, result ->
stream.pipeline().addLast(WebDavPutStatusChecker(result))
stream.writeHeaders(createHeaders(HttpMethod.PUT, path), endStream = false)
writer(stream)
// 1. writer must send the last data frame with endStream=true
// 2. stream now has the half-closed state - we listen for server header response with endStream
// 3. our ChannelInboundHandler above checks status and Netty closes the stream (as endStream was sent by both client and server)
}
}
internal fun createHeaders(method: HttpMethod, path: AsciiString): Http2Headers {
return ReadOnlyHttp2Headers.clientHeaders(true, method.asciiName(), path, scheme, authority, *commonHeaders)
}
}
private class WebDavPutStatusChecker(private val result: CompletableDeferred<Unit>) : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) {
if (!frame.isEndStream) {
return
}
val status = HttpResponseStatus.parseLine(frame.headers().status())
// WebDAV server returns 204 for existing resources
if (status == HttpResponseStatus.CREATED || status == HttpResponseStatus.NO_CONTENT || status == HttpResponseStatus.OK) {
result.complete(Unit)
}
else {
result.completeExceptionally(IllegalStateException("Unexpected response status: $status"))
}
}
}

View File

@@ -0,0 +1,126 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.http2Client
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelOption
import io.netty.channel.MultiThreadIoEventLoopGroup
import io.netty.channel.epoll.EpollIoHandler
import io.netty.channel.epoll.EpollSocketChannel
import io.netty.channel.kqueue.KQueueIoHandler
import io.netty.channel.kqueue.KQueueSocketChannel
import io.netty.channel.nio.NioIoHandler
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http2.Http2SecurityUtil
import io.netty.handler.ssl.*
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import io.netty.util.AsciiString
import io.netty.util.internal.PlatformDependent
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.withContext
import java.net.InetSocketAddress
import kotlin.coroutines.CoroutineContext
private fun nioIoHandlerAndSocketChannel() = NioIoHandler.newFactory() to NioSocketChannel::class.java
internal class Http2ClientConnectionFactory(
private val bootstrapTemplate: Bootstrap,
private val sslContext: SslContext?,
private val ioDispatcher: CoroutineDispatcher,
) {
suspend inline fun <T> use(block: (Http2ClientConnectionFactory) -> T): T {
try {
return block(this)
}
finally {
withContext(NonCancellable) {
close()
}
}
}
fun connect(host: String, port: Int = 443): Http2ClientConnection {
return connect(InetSocketAddress.createUnresolved(host, port.let { if (it == -1) 443 else it }))
}
fun connect(server: InetSocketAddress): Http2ClientConnection {
return Http2ClientConnection(
connection = Http2ConnectionProvider(server = server, bootstrapTemplate = bootstrapTemplate, sslContext = sslContext, ioDispatcher = ioDispatcher),
scheme = AsciiString.of(if (sslContext == null) "http" else "https"),
authority = AsciiString.of(server.hostString + ":" + server.port),
commonHeaders = arrayOf(HttpHeaderNames.USER_AGENT, AsciiString.of("IJ Builder")),
)
}
suspend fun close() {
bootstrapTemplate.config().group().shutdownGracefully().joinNonCancellable()
}
}
internal fun createHttp2ClientSessionFactory(useSsl: Boolean = true, trustAll: Boolean = false): Http2ClientConnectionFactory {
val useNativeTransport = true
val (ioFactory, socketChannel) = try {
when {
useNativeTransport && PlatformDependent.isOsx() -> KQueueIoHandler.newFactory() to KQueueSocketChannel::class.java
useNativeTransport && !PlatformDependent.isWindows() -> EpollIoHandler.newFactory() to EpollSocketChannel::class.java
else -> nioIoHandlerAndSocketChannel()
}
}
catch (e: Throwable) {
Span.current().recordException(e, Attributes.of(AttributeKey.stringKey("warn"), "cannot use native transport, fallback to NIO"))
nioIoHandlerAndSocketChannel()
}
val group = MultiThreadIoEventLoopGroup(ioFactory)
val sslContext: SslContext? = if (useSsl) {
val isOpenSslSupported = SslProvider.isAlpnSupported(SslProvider.OPENSSL)
SslContextBuilder.forClient()
.sslProvider(if (isOpenSslSupported) SslProvider.OPENSSL else SslProvider.JDK)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(
ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
// OpenSSL provider does not support FATAL_ALERT behavior
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2,
)
)
.also {
if (trustAll) {
it.trustManager(InsecureTrustManagerFactory.INSTANCE)
}
}
.build()
}
else {
null
}
// use JDK DNS resolver - custom one is not easy to configure and overkill in our case (just several host names)
val bootstrapTemplate = Bootstrap()
.group(group)
.channel(socketChannel)
.option(ChannelOption.SO_KEEPALIVE, true)
// asCoroutineDispatcher is an overkill - we don't need schedule (`Delay`) features
val dispatcher = object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
group.execute(block)
}
override fun toString(): String = group.toString()
}
return Http2ClientConnectionFactory(
bootstrapTemplate = bootstrapTemplate,
sslContext = sslContext,
ioDispatcher = dispatcher,
)
}

View File

@@ -0,0 +1,216 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("SSBasedInspection")
package org.jetbrains.intellij.build.http2Client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http2.*
import io.netty.handler.ssl.SslContext
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.net.InetSocketAddress
import java.nio.channels.ClosedChannelException
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.coroutineContext
import kotlin.math.min
import kotlin.random.Random
import kotlin.random.asJavaRandom
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
private class ConnectionState(
@JvmField val bootstrap: Http2StreamChannelBootstrap,
@JvmField val channel: Channel,
@JvmField val coroutineScope: CoroutineScope,
)
private const val MAX_ATTEMPTS = 2
// https://stackoverflow.com/questions/55087292/how-to-handle-http-2-goaway-with-java-net-httpclient
// Server can send GOAWAY frame after X streams, that's why we need manager for channel - open a new one in case of such error
internal class Http2ConnectionProvider(
private val server: InetSocketAddress,
private val bootstrapTemplate: Bootstrap,
private val ioDispatcher: CoroutineDispatcher,
sslContext: SslContext?,
) {
private val mutex = Mutex()
private val channelInitializer = Http2ClientFrameInitializer(sslContext, server)
private val connectionRef = AtomicReference<ConnectionState>()
private suspend fun getConnection(): ConnectionState {
connectionRef.get()?.takeIf { it.coroutineScope.isActive }?.let {
return it
}
// not reentrant
mutex.withLock {
return connectionRef.get()?.takeIf { it.coroutineScope.isActive } ?: withContext(ioDispatcher) { openChannel() }
}
}
// must be called with mutex
private suspend fun openChannel(): ConnectionState {
Span.current().addEvent("open TCP connection", Attributes.of(AttributeKey.stringKey("host"), server.hostString, AttributeKey.longKey("port"), server.port.toLong()))
val channel = bootstrapTemplate
.clone()
.remoteAddress(server)
.handler(channelInitializer)
.connectAsync()
val connection = ConnectionState(
bootstrap = Http2StreamChannelBootstrap(channel),
channel = channel,
coroutineScope = CoroutineScope(SupervisorJob() + ioDispatcher),
)
(channel.pipeline().get("Http2FrameCodec") as Http2FrameCodec).connection().addListener(object : Http2ConnectionAdapter() {
override fun onGoAwayReceived(lastStreamId: Int, errorCode: Long, debugData: ByteBuf?) {
connection.coroutineScope.coroutineContext.cancel()
connectionRef.compareAndSet(connection, null)
}
})
val old = connectionRef.getAndSet(connection)
require(old == null || !old.coroutineScope.isActive) {
"Old connection must be inactive before opening a new one"
}
return connection
}
suspend fun close() {
val connection = connectionRef.get() ?: return
try {
connection.coroutineScope.coroutineContext.job.cancelAndJoin()
}
finally {
connection.channel.close().joinNonCancellable()
}
}
suspend fun <T> stream(block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<T>) -> Unit): T {
var attemptIndex = 0
var effectiveDelay = 1.seconds.inWholeMilliseconds
val backOffLimitMs = 500.milliseconds.inWholeMilliseconds
val backOffFactor = 2L
val backOffJitter = 0.1
var suppressedExceptions: MutableList<Throwable>? = null
while (true) {
var currentConnection: ConnectionState? = null
try {
currentConnection = getConnection()
return withContext(currentConnection.coroutineScope.coroutineContext) {
openStreamAndConsume(connectionState = currentConnection, block = block)
}
}
catch (e: Http2Exception) {
handleHttpError(e = e, attemptIndex = attemptIndex, currentConnection = currentConnection)
}
catch (e: CancellationException) {
if (coroutineContext.isActive) {
// task is canceled (due to GoAway or other such reasons), but not parent context - retry (without incrementing attemptIndex)
continue
}
}
catch (e: ClosedChannelException) {
if (attemptIndex >= MAX_ATTEMPTS) {
if (suppressedExceptions != null) {
for (suppressedException in suppressedExceptions) {
e.addSuppressed(suppressedException)
}
}
throw e
}
if (suppressedExceptions == null) {
suppressedExceptions = ArrayList()
}
suppressedExceptions.add(e)
if (attemptIndex != 0) {
delay(effectiveDelay)
}
effectiveDelay = min(effectiveDelay * backOffFactor, backOffLimitMs) + (Random.asJavaRandom().nextGaussian() * effectiveDelay * backOffJitter).toLong()
}
attemptIndex++
}
}
private suspend fun handleHttpError(e: Http2Exception, attemptIndex: Int, currentConnection: ConnectionState?) {
when (val error = e.error()) {
Http2Error.REFUSED_STREAM -> {
// result of goaway frame - open a new connection
if (currentConnection != null) {
currentConnection.coroutineScope.cancel()
connectionRef.compareAndSet(currentConnection, null)
Span.current().addEvent("stream refused", Attributes.of(AttributeKey.longKey("streamId"), Http2Exception.streamId(e).toLong()))
}
}
Http2Error.ENHANCE_YOUR_CALM -> {
delay(100L * (attemptIndex + 1))
}
else -> {
if (attemptIndex >= MAX_ATTEMPTS) {
throw e
}
else {
// log error and continue
Span.current().recordException(e, Attributes.of(AttributeKey.longKey("attemptIndex"), attemptIndex.toLong(), AttributeKey.stringKey("name"), error.name))
}
}
}
}
// must be called with ioDispatcher
private suspend fun <T> openStreamAndConsume(
connectionState: ConnectionState,
block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<T>) -> Unit,
): T {
val streamChannel = connectionState.bootstrap.open().cancellableAwait()
try {
// must be canceled when the parent context is canceled
val result = CompletableDeferred<T>(parent = coroutineContext.job)
block(streamChannel, result)
return result.await()
}
finally {
if (streamChannel.isOpen && connectionState.coroutineScope.isActive) {
withContext(NonCancellable) {
streamChannel.close().joinNonCancellable()
}
}
}
}
}
private class Http2ClientFrameInitializer(
private val sslContext: SslContext?,
private val server: InetSocketAddress,
) : ChannelInitializer<Channel>() {
override fun initChannel(channel: Channel) {
val pipeline = channel.pipeline()
sslContext?.let {
pipeline.addFirst(sslContext.newHandler(channel.alloc(), server.hostString, server.port))
}
pipeline.addLast("Http2FrameCodec", Http2FrameCodecBuilder.forClient().build())
// Http2MultiplexHandler requires not-null handlers for child streams - add noop
pipeline.addLast("Http2MultiplexHandler", Http2MultiplexHandler(object : SimpleChannelInboundHandler<Any>() {
override fun acceptInboundMessage(message: Any?): Boolean = false
override fun channelRead0(ctx: ChannelHandlerContext, message: Any?) {
// noop
}
}))
}
}

View File

@@ -0,0 +1,73 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.http2Client
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.ByteBufInputStream
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http2.Http2DataFrame
import io.netty.handler.codec.http2.Http2HeadersFrame
import io.netty.handler.codec.http2.Http2StreamFrame
import kotlinx.coroutines.CompletableDeferred
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import java.util.zip.GZIPInputStream
private val json = Json {
ignoreUnknownKeys = true
}
internal class Http2StreamJsonInboundHandler<T : Any>(
private val bufferAllocator: ByteBufAllocator,
private val result: CompletableDeferred<T>,
private val deserializer: DeserializationStrategy<T>,
) : InboundHandlerResultTracker<Http2StreamFrame>(result) {
private var isGzip = false
private val cumulativeContent = lazy(LazyThreadSafetyMode.NONE) {
val compositeBuffer = bufferAllocator.compositeBuffer(1024)
result.invokeOnCompletion {
compositeBuffer.release()
}
compositeBuffer
}
override fun channelRead0(context: ChannelHandlerContext, frame: Http2StreamFrame) {
if (frame is Http2DataFrame) {
val frameContent = frame.content()
if (!frame.isEndStream) {
cumulativeContent.value.addComponent(true, frameContent.retain())
return
}
val data = if (cumulativeContent.isInitialized()) {
cumulativeContent.value.addComponent(true, frameContent.retain())
cumulativeContent.value
}
else {
frameContent
}
val response = if (isGzip) {
GZIPInputStream(ByteBufInputStream(data)).use {
json.decodeFromStream(deserializer, it)
}
}
else {
json.decodeFromString(deserializer, data.toString(Charsets.UTF_8))
}
result.complete(response)
}
else if (frame is Http2HeadersFrame) {
val status = HttpResponseStatus.parseLine(frame.headers().status())
if (status != HttpResponseStatus.OK) {
result.completeExceptionally(IllegalStateException("Failed with $status"))
return
}
isGzip = frame.headers().get(HttpHeaderNames.CONTENT_ENCODING) == HttpHeaderValues.GZIP
}
}
}

View File

@@ -0,0 +1,118 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.http2Client
import com.github.luben.zstd.ZstdDecompressCtx
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http2.Http2DataFrame
import io.netty.handler.codec.http2.Http2HeadersFrame
import io.netty.handler.codec.http2.Http2StreamFrame
import io.netty.util.AsciiString
import kotlinx.coroutines.CompletableDeferred
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.security.MessageDigest
import java.util.*
internal const val MAX_BUFFER_SIZE = 4 * 1014 * 1024
private val OVERWRITE_OPERATION = EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
internal data class DownloadResult(@JvmField var size: Long, @JvmField val digest: MessageDigest)
internal suspend fun Http2ClientConnection.download(path: String, file: Path, digestFactory: () -> MessageDigest): DownloadResult {
Files.createDirectories(file.parent)
return connection.stream { stream, result ->
stream.pipeline().addLast(
DownloadHandler(
result = result,
downloadResult = DownloadResult(size = 0, digest = digestFactory()),
file = file,
),
)
stream.writeHeaders(createHeaders(HttpMethod.GET, AsciiString.of(path)), endStream = true)
result.await()
}
}
private class DownloadHandler(
private val result: CompletableDeferred<DownloadResult>,
private val downloadResult: DownloadResult,
private val file: Path,
) : InboundHandlerResultTracker<Http2StreamFrame>(result) {
private var offset = 0L
private var fileChannel: FileChannel? = null
private var zstdDecompressContext: ZstdDecompressCtx? = null
override fun acceptInboundMessage(message: Any): Boolean = message is Http2DataFrame || message is Http2HeadersFrame
override fun handlerAdded(ctx: ChannelHandlerContext?) {
fileChannel = FileChannel.open(file, OVERWRITE_OPERATION)
zstdDecompressContext = ZstdDecompressCtx()
}
override fun handlerRemoved(context: ChannelHandlerContext) {
try {
fileChannel?.close()
fileChannel = null
}
finally {
zstdDecompressContext?.close()
zstdDecompressContext = null
}
}
override fun channelRead0(context: ChannelHandlerContext, frame: Http2StreamFrame) {
if (frame is Http2HeadersFrame) {
val status = HttpResponseStatus.parseLine(frame.headers().status())
if (status != HttpResponseStatus.OK) {
result.completeExceptionally(IllegalStateException("Unexpected response status: $status"))
}
}
else if (frame is Http2DataFrame) {
val content = frame.content()
downloadResult.size += content.readableBytes()
writeChunk(content, context.alloc())
if (frame.isEndStream) {
result.complete(downloadResult)
}
}
}
private fun writeChunk(chunk: ByteBuf, allocator: ByteBufAllocator) {
val sourceBuffer = chunk.nioBuffer()
val zstdDecompressContext = zstdDecompressContext!!
val fileChannel = fileChannel!!
do {
val targetNettyBuffer = allocator.directBuffer((sourceBuffer.remaining() * 4).coerceAtMost(MAX_BUFFER_SIZE))
try {
val targetBuffer = targetNettyBuffer.nioBuffer(0, targetNettyBuffer.capacity())
zstdDecompressContext.decompressDirectByteBufferStream(targetBuffer, sourceBuffer)
targetBuffer.flip()
if (targetBuffer.hasRemaining()) {
targetBuffer.mark()
downloadResult.digest.update(targetBuffer)
targetBuffer.reset()
do {
offset += fileChannel.write(targetBuffer, offset)
}
while (targetBuffer.hasRemaining())
}
}
finally {
targetNettyBuffer.release()
}
}
while (sourceBuffer.hasRemaining())
}
}

View File

@@ -0,0 +1,147 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("SSBasedInspection", "UsePropertyAccessSyntax", "OVERRIDE_DEPRECATION")
package org.jetbrains.intellij.build.http2Client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http2.DefaultHttp2DataFrame
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame
import io.netty.handler.codec.http2.Http2Headers
import io.netty.handler.codec.http2.Http2StreamChannel
import io.netty.util.concurrent.Future
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
internal suspend fun Bootstrap.connectAsync(): Channel = suspendCancellableCoroutine { continuation ->
val future = connect()
if (future.isDone) {
if (future.isSuccess) {
continuation.resume(future.channel())
}
else {
continuation.resumeWithException(future.cause())
}
return@suspendCancellableCoroutine
}
if (future.isCancellable) {
continuation.invokeOnCancellation {
future.cancel(true)
}
}
future.addListener {
if (future.isSuccess) {
continuation.resume(future.channel())
}
else {
continuation.resumeWithException(future.cause())
}
}
}
internal suspend fun <T : Any> Future<T>.cancellableAwait(): T {
if (isDone) {
if (isSuccess) {
return getNow()
}
else {
cause()?.let {
throw it
}
}
}
return suspendCancellableCoroutine { continuation ->
if (isCancellable) {
continuation.invokeOnCancellation {
cancel(true)
}
}
addListener {
if (isSuccess) {
continuation.resume(get())
}
else {
continuation.resumeWithException(cause())
}
}
}
}
internal suspend fun Future<*>.joinCancellable(cancelFutureOnCancellation: Boolean = true) {
if (isDone) {
cause()?.let {
throw it
}
}
suspendCancellableCoroutine { continuation ->
if (cancelFutureOnCancellation && isCancellable) {
continuation.invokeOnCancellation {
cancel(true)
}
}
addListener {
if (isSuccess) {
continuation.resume(Unit)
}
else {
continuation.resumeWithException(cause())
}
}
}
}
internal suspend fun Http2StreamChannel.writeHeaders(headers: Http2Headers, endStream: Boolean) {
writeAndFlush(DefaultHttp2HeadersFrame(headers, endStream)).joinCancellable()
}
internal suspend fun Http2StreamChannel.writeData(data: ByteBuf, endStream: Boolean) {
writeAndFlush(DefaultHttp2DataFrame(data, endStream)).joinCancellable()
}
internal abstract class InboundHandlerResultTracker<T : Any>(
private val result: CompletableDeferred<*>,
) : SimpleChannelInboundHandler<T>() {
override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
result.completeExceptionally(cause)
}
override fun channelInactive(context: ChannelHandlerContext) {
if (!result.isCompleted) {
result.completeExceptionally(IllegalStateException("Stream closed without result ($this)"))
}
}
}
// not suspendCancellableCoroutine - we must close the channel / event loop group
internal suspend fun Future<*>.joinNonCancellable() {
if (isDone) {
cause()?.let {
throw it
}
}
// not suspendCancellableCoroutine - we must close the channel
return suspendCoroutine { continuation ->
addListener { future ->
if (future.isSuccess) {
continuation.resume(Unit)
}
else {
continuation.resumeWithException(future.cause())
}
}
}
}

View File

@@ -8,7 +8,7 @@ import org.jetbrains.intellij.build.BuildMessages
import org.jetbrains.intellij.build.BuildOptions
import org.jetbrains.intellij.build.BuildPaths
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.impl.compilation.ArchivedCompilationOutputsStorage
import org.jetbrains.intellij.build.impl.compilation.ArchivedCompilationOutputStorage
import org.jetbrains.jps.model.module.JpsModule
import java.io.File
import java.nio.file.Path
@@ -17,7 +17,7 @@ import kotlin.io.path.writeLines
@ApiStatus.Internal
class ArchivedCompilationContext(
private val delegate: CompilationContext,
private val storage: ArchivedCompilationOutputsStorage = ArchivedCompilationOutputsStorage(paths = delegate.paths, classesOutputDirectory = delegate.classesOutputDirectory).apply {
private val storage: ArchivedCompilationOutputStorage = ArchivedCompilationOutputStorage(paths = delegate.paths, classesOutputDirectory = delegate.classesOutputDirectory).apply {
delegate.options.pathToCompiledClassesArchivesMetadata?.let {
this.loadMetadataFile(it)
}

View File

@@ -0,0 +1,58 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
import kotlinx.serialization.json.Json
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.intellij.build.BuildPaths
import org.jetbrains.intellij.build.io.AddDirEntriesMode
import java.io.File
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.concurrent.ConcurrentHashMap
import kotlin.io.path.invariantSeparatorsPathString
@ApiStatus.Internal
class ArchivedCompilationOutputStorage(
private val paths: BuildPaths,
private val classesOutputDirectory: Path,
val archivedOutputDirectory: Path = getArchiveStorage(classesOutputDirectory.parent),
) {
private val unarchivedToArchivedMap = ConcurrentHashMap<Path, Path>()
internal fun loadMetadataFile(metadataFile: Path) {
val metadata = Json.decodeFromString<CompilationPartsMetadata>(Files.readString(metadataFile))
for (entry in metadata.files) {
unarchivedToArchivedMap.put(classesOutputDirectory.resolve(entry.key), archivedOutputDirectory.resolve(entry.key).resolve("${entry.value}.jar"))
}
}
suspend fun getArchived(path: Path): Path {
if (Files.isRegularFile(path) || !path.startsWith(classesOutputDirectory)) {
return path
}
unarchivedToArchivedMap.get(path)?.let {
return it
}
val archived = archive(path)
return unarchivedToArchivedMap.putIfAbsent(path, archived) ?: archived
}
private suspend fun archive(path: Path): Path {
val name = classesOutputDirectory.relativize(path).toString()
val archive = Files.createTempFile(paths.tempDir, name.replace(File.separator, "_"), ".jar")
Files.deleteIfExists(archive)
val hash = packAndComputeHash(addDirEntriesMode = AddDirEntriesMode.ALL, name = name, archive = archive, directory = path)
val result = archivedOutputDirectory.resolve(name).resolve("$hash.jar")
Files.createDirectories(result.parent)
Files.move(archive, result, StandardCopyOption.REPLACE_EXISTING)
return result
}
internal fun getMapping(): List<Map.Entry<Path, Path>> = unarchivedToArchivedMap.entries.sortedBy { it.key.invariantSeparatorsPathString }
}

View File

@@ -4,30 +4,29 @@
package org.jetbrains.intellij.build.impl.compilation
import com.intellij.platform.util.coroutines.forEachConcurrent
import com.intellij.platform.util.coroutines.mapConcurrent
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.VisibleForTesting
import org.jetbrains.intellij.build.BuildMessages
import org.jetbrains.intellij.build.BuildPaths
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.http2Client.createHttp2ClientSessionFactory
import org.jetbrains.intellij.build.io.AddDirEntriesMode
import org.jetbrains.intellij.build.io.zip
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import java.io.File
import java.math.BigInteger
import java.net.InetSocketAddress
import java.net.URI
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.*
@@ -42,14 +41,13 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.zip.GZIPOutputStream
import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.deleteRecursively
import kotlin.io.path.invariantSeparatorsPathString
import kotlin.io.path.listDirectoryEntries
internal val uploadParallelism = Runtime.getRuntime().availableProcessors()
internal val downloadParallelism = uploadParallelism
internal val uploadParallelism = Runtime.getRuntime().availableProcessors().coerceIn(4, 32)
internal val downloadParallelism = (Runtime.getRuntime().availableProcessors() * 2).coerceIn(8, 16)
private const val BRANCH_PROPERTY_NAME = "intellij.build.compiled.classes.branch"
private const val SERVER_URL = "intellij.build.compiled.classes.server.url"
private const val SERVER_URL_PROPERTY = "intellij.build.compiled.classes.server.url"
private const val UPLOAD_PREFIX = "intellij.build.compiled.classes.upload.prefix"
class CompilationCacheUploadConfiguration(
@@ -59,9 +57,18 @@ class CompilationCacheUploadConfiguration(
branch: String? = null,
uploadPredix: String? = null,
) {
val serverUrl: String by lazy { serverUrl ?: normalizeServerUrl() }
val serverUrl: String by lazy(LazyThreadSafetyMode.NONE) { serverUrl ?: getAndNormalizeServerUrlBySystemProperty() }
val uploadPrefix: String by lazy {
private val serverUri: URI by lazy(LazyThreadSafetyMode.NONE) { URI(serverUrl ?: getAndNormalizeServerUrlBySystemProperty()) }
// even if empty, a final path must always start with `/` (otherwise, error like "client sent invalid :path header")
val serverUrlPathPrefix: String by lazy(LazyThreadSafetyMode.NONE) { serverUri.path }
val serverAddress: InetSocketAddress by lazy {
InetSocketAddress.createUnresolved(serverUri.host, serverUri.port.let { if (it == -1) 443 else it })
}
val uploadUrlPathPrefix: String by lazy {
uploadPredix ?: System.getProperty(UPLOAD_PREFIX, "intellij-compile/v2").also {
check(!it.isNullOrBlank()) {
"$UPLOAD_PREFIX system property should not be blank."
@@ -78,11 +85,10 @@ class CompilationCacheUploadConfiguration(
}
}
private fun normalizeServerUrl(): String {
val serverUrlPropertyName = SERVER_URL
var result = System.getProperty(serverUrlPropertyName)?.trimEnd('/')
private fun getAndNormalizeServerUrlBySystemProperty(): String {
var result = System.getProperty(SERVER_URL_PROPERTY)?.trimEnd('/')
check(!result.isNullOrBlank()) {
"Compilation cache archive server url is not defined. Please set $serverUrlPropertyName system property."
"Compilation cache archive server url is not defined. Please set $SERVER_URL_PROPERTY system property."
}
if (!result.startsWith("http")) {
@Suppress("HttpUrlsUsage")
@@ -112,10 +118,6 @@ suspend fun packAndUploadToServer(context: CompilationContext, zipDir: Path, con
}
}
private fun createBufferPool(@Suppress("SameParameterValue") maxPoolSize: Int): DirectFixedSizeByteBufferPool {
return DirectFixedSizeByteBufferPool(bufferSize = MAX_BUFFER_SIZE, maxPoolSize = maxPoolSize)
}
private suspend fun packCompilationResult(zipDir: Path, context: CompilationContext, addDirEntriesMode: AddDirEntriesMode = AddDirEntriesMode.ALL): List<PackAndUploadItem> {
val incremental = context.options.incrementalCompilation
if (!incremental) {
@@ -163,15 +165,17 @@ private suspend fun packCompilationResult(zipDir: Path, context: CompilationCont
}
}
spanBuilder("build zip archives").use(Dispatchers.IO) {
items.forEachConcurrent { item ->
item.hash = packAndComputeHash(addDirEntriesMode = addDirEntriesMode, name = item.name, archive = item.archive, directory = item.output)
spanBuilder("build zip archives").use {
for (item in items) {
launch {
item.hash = packAndComputeHash(addDirEntriesMode = addDirEntriesMode, name = item.name, archive = item.archive, directory = item.output)
}
}
}
return items
}
private suspend fun packAndComputeHash(
internal suspend fun packAndComputeHash(
addDirEntriesMode: AddDirEntriesMode,
name: String,
archive: Path,
@@ -214,7 +218,7 @@ private suspend fun upload(
CompilationPartsMetadata(
serverUrl = config.serverUrl,
branch = config.branch,
prefix = config.uploadPrefix,
prefix = config.uploadUrlPathPrefix,
files = items.associateTo(TreeMap()) { item ->
item.name to item.hash!!
},
@@ -236,68 +240,24 @@ private suspend fun upload(
messages.artifactBuilt(gzippedMetadataFile.toString())
}
spanBuilder("upload archives").setAttribute(AttributeKey.stringArrayKey("items"), items.map(PackAndUploadItem::name)).use {
createBufferPool(maxPoolSize = uploadParallelism * 2).use { bufferPool ->
uploadArchives(
reportStatisticValue = messages::reportStatisticValue,
config = config,
metadataJson = metadataJson,
httpClient = httpClient,
items = items,
bufferPool = bufferPool,
)
val serverAddress = config.serverAddress
createHttp2ClientSessionFactory(trustAll = serverAddress.hostString == "127.0.0.1").use { client ->
client.connect(serverAddress).use { connection ->
spanBuilder("upload archives").setAttribute(AttributeKey.stringArrayKey("items"), items.map(PackAndUploadItem::name)).use {
uploadArchives(
reportStatisticValue = messages::reportStatisticValue,
config = config,
metadataJson = metadataJson,
httpConnection = connection,
items = items,
)
}
}
}
}
private fun getArchivesStorage(fallbackPersistentCacheRoot: Path): Path {
return (System.getProperty("agent.persistent.cache")?.let { Path.of(it) } ?: fallbackPersistentCacheRoot)
.resolve("idea-compile-parts-v2")
}
@ApiStatus.Internal
class ArchivedCompilationOutputsStorage(
private val paths: BuildPaths,
private val classesOutputDirectory: Path,
val archivedOutputDirectory: Path = getArchivesStorage(classesOutputDirectory.parent),
) {
private val unarchivedToArchivedMap = ConcurrentHashMap<Path, Path>()
internal fun loadMetadataFile(metadataFile: Path) {
val metadata = Json.decodeFromString<CompilationPartsMetadata>(Files.readString(metadataFile))
for (entry in metadata.files) {
unarchivedToArchivedMap.put(classesOutputDirectory.resolve(entry.key), archivedOutputDirectory.resolve(entry.key).resolve("${entry.value}.jar"))
}
}
suspend fun getArchived(path: Path): Path {
if (Files.isRegularFile(path) || !path.startsWith(classesOutputDirectory)) {
return path
}
unarchivedToArchivedMap.get(path)?.let {
return it
}
val archived = archive(path)
return unarchivedToArchivedMap.putIfAbsent(path, archived) ?: archived
}
private suspend fun archive(path: Path): Path {
val name = classesOutputDirectory.relativize(path).toString()
val archive = Files.createTempFile(paths.tempDir, name.replace(File.separator, "_"), ".jar")
Files.deleteIfExists(archive)
val hash = packAndComputeHash(addDirEntriesMode = AddDirEntriesMode.ALL, name = name, archive = archive, directory = path)
val result = archivedOutputDirectory.resolve(name).resolve("$hash.jar")
Files.createDirectories(result.parent)
Files.move(archive, result, StandardCopyOption.REPLACE_EXISTING)
return result
}
internal fun getMapping(): List<Map.Entry<Path, Path>> = unarchivedToArchivedMap.entries.sortedBy { it.key.invariantSeparatorsPathString }
internal fun getArchiveStorage(fallbackPersistentCacheRoot: Path): Path {
return (System.getProperty("agent.persistent.cache")?.let { Path.of(it) } ?: fallbackPersistentCacheRoot).resolve("idea-compile-parts-v2")
}
@VisibleForTesting
@@ -309,7 +269,7 @@ suspend fun fetchAndUnpackCompiledClasses(
saveHash: Boolean,
) {
val metadata = Json.decodeFromString<CompilationPartsMetadata>(Files.readString(metadataFile))
val tempDownloadStorage = getArchivesStorage(classOutput.parent)
val tempDownloadStorage = getArchiveStorage(classOutput.parent)
val items = metadata.files.mapTo(ArrayList(metadata.files.size)) { entry ->
FetchAndUnpackItem(
@@ -321,10 +281,9 @@ suspend fun fetchAndUnpackCompiledClasses(
}
items.sortBy { it.name }
var verifyTime = 0L
val upToDate = ConcurrentHashMap.newKeySet<String>()
spanBuilder("check previously unpacked directories").use { span ->
verifyTime += checkPreviouslyUnpackedDirectories(
var verifyTime = spanBuilder("check previously unpacked directories").use { span ->
checkPreviouslyUnpackedDirectories(
items = items,
span = span,
upToDate = upToDate,
@@ -336,32 +295,32 @@ suspend fun fetchAndUnpackCompiledClasses(
val toUnpack = LinkedHashSet<FetchAndUnpackItem>(items.size)
val verifyStart = System.nanoTime()
val toDownload = spanBuilder("check previously downloaded archives").use(Dispatchers.IO) { span ->
items
.filter { item ->
if (upToDate.contains(item.name)) {
return@filter false
}
if (!skipUnpack) {
toUnpack.add(item)
}
true
val toDownload = ConcurrentHashMap.newKeySet<FetchAndUnpackItem>()
spanBuilder("check previously downloaded archives").use { span ->
items.filter { item ->
if (upToDate.contains(item.name)) {
return@filter false
}
.mapConcurrent { item ->
if (!skipUnpack) {
toUnpack.add(item)
}
true
}
.forEachConcurrent(Runtime.getRuntime().availableProcessors().coerceAtMost(4)) { item ->
val file = item.file
when {
Files.notExists(file) -> item
item.hash == computeHash(file) -> null
Files.notExists(file) -> toDownload.add(item)
item.hash == computeHash(file) -> return@forEachConcurrent
else -> {
span.addEvent("file has unexpected hash, will refetch", Attributes.of(AttributeKey.stringKey("file"), "${item.name}/${item.hash}.jar"))
Files.deleteIfExists(file)
item
toDownload.add(item)
}
}
}
}.filterNotNull()
verifyTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - verifyStart)
}
verifyTime += System.nanoTime() - verifyStart
// toUnpack is performed as part of download
for (item in toDownload) {
@@ -398,6 +357,7 @@ suspend fun fetchAndUnpackCompiledClasses(
Span.current().addEvent("failed to cleanup outdated archives", Attributes.of(AttributeKey.stringKey("error"), e.message ?: ""))
}
reportStatisticValue("compile-parts:verify:time", TimeUnit.NANOSECONDS.toMillis(verifyTime).toString())
reportStatisticValue("compile-parts:cleanup:time", TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - start)).toString())
reportStatisticValue("compile-parts:removed:bytes", bytes.toString())
reportStatisticValue("compile-parts:removed:count", count.toString())
@@ -406,23 +366,17 @@ suspend fun fetchAndUnpackCompiledClasses(
spanBuilder("fetch compiled classes archives").use {
val start = System.nanoTime()
val prefix = metadata.prefix
val serverUrl = metadata.serverUrl
val downloadedBytes = AtomicLong()
val failed: List<Throwable> = if (toDownload.isEmpty()) {
emptyList()
}
else {
val httpClientWithoutFollowingRedirects = httpClient.newBuilder().followRedirects(false).build()
// 4MB block, x2 of thread count - one buffer to source, another one for target
createBufferPool(downloadParallelism * 2).use { bufferPool ->
createHttp2ClientSessionFactory(trustAll = metadata.serverUrl.contains("127.0.0.1")).use { client ->
downloadCompilationCache(
serverUrl = serverUrl,
prefix = prefix,
client = client,
serverUrl = metadata.serverUrl,
prefix = metadata.prefix,
toDownload = toDownload,
client = httpClientWithoutFollowingRedirects,
bufferPool = bufferPool,
downloadedBytes = downloadedBytes,
skipUnpack = skipUnpack,
saveHash = saveHash,
@@ -442,10 +396,12 @@ suspend fun fetchAndUnpackCompiledClasses(
}
val start = System.nanoTime()
spanBuilder("unpack compiled classes archives").use(Dispatchers.IO) {
toUnpack.forEachConcurrent { item ->
spanBuilder("unpack").setAttribute("name", item.name).use {
unpackArchive(item, saveHash)
spanBuilder("unpack compiled classes archives").use {
for (item in toUnpack) {
launch {
spanBuilder("unpack").setAttribute("name", item.name).use {
unpackArchive(item, saveHash)
}
}
}
}
@@ -466,55 +422,57 @@ private suspend fun checkPreviouslyUnpackedDirectories(
}
val start = System.nanoTime()
withContext(Dispatchers.IO) {
launch {
coroutineScope {
launch(Dispatchers.IO) {
spanBuilder("remove stalled directories not present in metadata").setAttribute(AttributeKey.stringArrayKey("keys"), java.util.List.copyOf(metadata.files.keys)).use {
removeStalledDirs(metadata, classOutput)
}
}
items.forEachConcurrent { item ->
val out = item.output
if (Files.notExists(out)) {
span.addEvent("output directory doesn't exist", Attributes.of(AttributeKey.stringKey("name"), item.name, AttributeKey.stringKey("outDir"), out.toString()))
return@forEachConcurrent
}
val hashFile = out.resolve(".hash")
if (!Files.isRegularFile(hashFile)) {
span.addEvent("no .hash file in output directory", Attributes.of(AttributeKey.stringKey("name"), item.name))
out.deleteRecursively()
return@forEachConcurrent
}
try {
val actual = Files.readString(hashFile)
if (actual == item.hash) {
upToDate.add(item.name)
for (item in items) {
launch {
val out = item.output
if (Files.notExists(out)) {
span.addEvent("output directory doesn't exist", Attributes.of(AttributeKey.stringKey("name"), item.name, AttributeKey.stringKey("outDir"), out.toString()))
return@launch
}
else {
span.addEvent(
"output directory hash mismatch",
Attributes.of(
AttributeKey.stringKey("name"), item.name,
AttributeKey.stringKey("expected"), item.hash,
AttributeKey.stringKey("actual"), actual,
val hashFile = out.resolve(".hash")
if (!Files.isRegularFile(hashFile)) {
span.addEvent("no .hash file in output directory", Attributes.of(AttributeKey.stringKey("name"), item.name))
out.deleteRecursively()
return@launch
}
try {
val actual = Files.readString(hashFile)
if (actual == item.hash) {
upToDate.add(item.name)
}
else {
span.addEvent(
"output directory hash mismatch",
Attributes.of(
AttributeKey.stringKey("name"), item.name,
AttributeKey.stringKey("expected"), item.hash,
AttributeKey.stringKey("actual"), actual,
)
)
)
out.deleteRecursively()
}
}
catch (e: CancellationException) {
throw e
}
catch (e: Throwable) {
span.addEvent("output directory hash calculation failed", Attributes.of(AttributeKey.stringKey("name"), item.name))
span.recordException(e, Attributes.of(AttributeKey.stringKey("name"), item.name))
out.deleteRecursively()
}
}
catch (e: CancellationException) {
throw e
}
catch (e: Throwable) {
span.addEvent("output directory hash calculation failed", Attributes.of(AttributeKey.stringKey("name"), item.name))
span.recordException(e, Attributes.of(AttributeKey.stringKey("name"), item.name))
out.deleteRecursively()
}
}
}
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)
return System.nanoTime() - start
}
private fun CoroutineScope.removeStalledDirs(
@@ -557,7 +515,7 @@ private fun CoroutineScope.removeStalledDirs(
private val sharedDigest = MessageDigest.getInstance("SHA-256", java.security.Security.getProvider("SUN"))
internal fun sha256() = sharedDigest.clone() as MessageDigest
private fun computeHash(file: Path): String {
internal fun computeHash(file: Path): String {
val messageDigest = sha256()
FileChannel.open(file, READ_OPERATION).use { channel ->
val fileSize = channel.size()
@@ -583,7 +541,7 @@ private fun computeHash(file: Path): String {
// we cannot change file extension or prefix, so, add suffix
internal fun digestToString(digest: MessageDigest): String = BigInteger(1, digest.digest()).toString(36) + "-z"
data class PackAndUploadItem(
internal data class PackAndUploadItem(
@JvmField val output: Path,
@JvmField val name: String,
@JvmField val archive: Path,
@@ -604,7 +562,7 @@ internal data class FetchAndUnpackItem(
* URL for each part should be constructed like: <pre>${serverUrl}/${prefix}/${files.key}/${files.value}.jar</pre>
*/
@Serializable
private data class CompilationPartsMetadata(
internal data class CompilationPartsMetadata(
@JvmField @SerialName("server-url") val serverUrl: String,
@JvmField val branch: String,
@JvmField val prefix: String,

View File

@@ -1,38 +1,53 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
import com.intellij.util.lang.ByteBufferCleaner
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.getOrElse
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import org.jetbrains.intellij.build.io.unmapBuffer
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
internal class DirectFixedSizeByteBufferPool(private val bufferSize: Int, maxPoolSize: Int) : AutoCloseable {
private val pool = Channel<ByteBuffer>(capacity = maxPoolSize)
internal class DirectFixedSizeByteBufferPool(private val bufferSize: Int, private val maxPoolSize: Int) : AutoCloseable {
private val pool = ConcurrentLinkedQueue<ByteBuffer>()
private val count = AtomicInteger()
@JvmField
val semaphore: Semaphore = Semaphore(maxPoolSize)
fun allocate(): ByteBuffer {
val result = pool.tryReceive()
return when {
result.isSuccess -> result.getOrThrow()
result.isClosed -> throw IllegalStateException("Pool is closed")
else -> ByteBuffer.allocateDirect(bufferSize)
private fun allocate(): ByteBuffer {
val result = pool.poll() ?: return ByteBuffer.allocateDirect(bufferSize)
count.decrementAndGet()
return result
}
suspend inline fun <T> withBuffer(task: (buffer: ByteBuffer) -> T): T {
return semaphore.withPermit {
val buffer = allocate()
try {
task(buffer)
}
finally {
release(buffer)
}
}
}
fun release(buffer: ByteBuffer) {
private fun release(buffer: ByteBuffer) {
buffer.clear()
buffer.order(ByteOrder.BIG_ENDIAN)
pool.trySend(buffer).getOrElse {
// if the pool is full, we simply discard the buffer
ByteBufferCleaner.unmapBuffer(buffer)
if (count.incrementAndGet() < maxPoolSize) {
pool.offer(buffer)
}
else {
count.decrementAndGet()
unmapBuffer(buffer)
}
}
// pool is not expected to be used during releaseAll call
override fun close() {
while (true) {
ByteBufferCleaner.unmapBuffer(pool.tryReceive().getOrNull() ?: break)
unmapBuffer(pool.poll() ?: return)
}
pool.close()
}
}

View File

@@ -3,7 +3,6 @@
package org.jetbrains.intellij.build.impl.compilation
import com.intellij.platform.util.coroutines.forEachConcurrent
import com.intellij.util.io.Decompressor
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
@@ -12,6 +11,7 @@ import kotlinx.coroutines.*
import okhttp3.Request
import okio.sink
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.impl.compilation.cache.CommitsHistory
import org.jetbrains.intellij.build.impl.compilation.cache.getAllCompilationOutputs
import org.jetbrains.intellij.build.impl.compilation.cache.parseSourcesStateFile

View File

@@ -3,7 +3,6 @@
package org.jetbrains.intellij.build.impl.compilation
import com.google.gson.stream.JsonReader
import com.intellij.platform.util.coroutines.forEachConcurrent
import com.intellij.util.io.Compressor
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
@@ -11,12 +10,14 @@ import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.Request
import okhttp3.RequestBody
import okio.BufferedSink
import okio.source
import org.jetbrains.intellij.build.BuildMessages
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.impl.compilation.cache.CommitsHistory
import org.jetbrains.intellij.build.impl.compilation.cache.getAllCompilationOutputs
import org.jetbrains.intellij.build.io.copyFile
@@ -37,6 +38,7 @@ import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.deleteRecursively
private const val SOURCES_STATE_FILE_NAME = "target_sources_state.json"
private val MEDIA_TYPE_BINARY = "application/octet-stream".toMediaType()
internal class PortableCompilationCacheUploader(
private val context: CompilationContext,

View File

@@ -0,0 +1,43 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
import com.github.luben.zstd.ZstdCompressCtx
import java.util.concurrent.ConcurrentLinkedQueue
// we cannot use Netty Recycler as we must close ZstdCompressCtx after use of pool
internal class ZstdCompressContextPool(private val level: Int = 3) : AutoCloseable {
private val pool = ConcurrentLinkedQueue<ZstdCompressCtx>()
inline fun <T> withZstd(task: (zstd: ZstdCompressCtx) -> T): T {
val zstd = allocate()
try {
return task(zstd)
}
finally {
zstd.reset()
pool.offer(zstd)
}
}
private fun allocate(): ZstdCompressCtx {
pool.poll()?.let {
configure(it)
return it
}
val zstd = ZstdCompressCtx()
configure(zstd)
return zstd
}
private fun configure(zstd: ZstdCompressCtx) {
zstd.setLevel(level)
//zstd.setLong(64)
}
override fun close() {
while (true) {
(pool.poll() ?: return).close()
}
}
}

View File

@@ -1,31 +1,26 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
import com.github.luben.zstd.ZstdDirectBufferDecompressingStreamNoFinalizer
import com.intellij.platform.util.coroutines.mapConcurrent
import com.intellij.util.lang.HashMapZipFile
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okio.IOException
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
import org.jetbrains.intellij.build.http2Client.Http2ClientConnectionFactory
import org.jetbrains.intellij.build.http2Client.download
import org.jetbrains.intellij.build.io.INDEX_FILENAME
import org.jetbrains.intellij.build.retryWithExponentialBackOff
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import java.net.HttpURLConnection
import java.nio.ByteBuffer
import java.math.BigInteger
import java.net.URI
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.security.MessageDigest
import java.util.*
import java.util.concurrent.CancellationException
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicLong
import kotlin.io.path.name
@@ -33,46 +28,54 @@ private val OVERWRITE_OPERATION = EnumSet.of(StandardOpenOption.WRITE, StandardO
internal suspend fun downloadCompilationCache(
serverUrl: String,
client: Http2ClientConnectionFactory,
prefix: String,
toDownload: List<FetchAndUnpackItem>,
client: OkHttpClient,
bufferPool: DirectFixedSizeByteBufferPool,
toDownload: Collection<FetchAndUnpackItem>,
downloadedBytes: AtomicLong,
skipUnpack: Boolean,
saveHash: Boolean,
): List<Throwable> {
var urlWithPrefix = "$serverUrl/$prefix/"
var urlPathWithPrefix = "/$prefix/"
// first let's check for initial redirect (mirror selection)
spanBuilder("mirror selection").use { span ->
client.newCall(Request.Builder().url(urlWithPrefix).head().build()).executeAsync().use { response ->
val statusCode = response.code
val locationHeader = response.header("location")
if (locationHeader != null && (statusCode == HttpURLConnection.HTTP_MOVED_TEMP ||
statusCode == HttpURLConnection.HTTP_MOVED_PERM ||
statusCode == 307 ||
statusCode == HttpURLConnection.HTTP_SEE_OTHER)) {
urlWithPrefix = locationHeader
span.addEvent("redirected to mirror", Attributes.of(AttributeKey.stringKey("url"), urlWithPrefix))
val initialServerUri = URI(serverUrl)
var effectiveServerUri = initialServerUri
var connection: Http2ClientConnection? = client.connect(effectiveServerUri.host, effectiveServerUri.port)
try {
spanBuilder("mirror selection").use { span ->
val newLocation = connection!!.getRedirectLocation(urlPathWithPrefix)
if (newLocation == null) {
span.addEvent("origin server will be used", Attributes.of(AttributeKey.stringKey("url"), urlPathWithPrefix))
}
else {
span.addEvent("origin server will be used", Attributes.of(AttributeKey.stringKey("url"), urlWithPrefix))
effectiveServerUri = URI(newLocation.toString())
urlPathWithPrefix = effectiveServerUri.path
span.addEvent("redirected to mirror", Attributes.of(AttributeKey.stringKey("url"), urlPathWithPrefix))
}
}
}
finally {
if (initialServerUri != effectiveServerUri) {
connection?.close()
connection = null
}
}
return withContext(Dispatchers.IO) {
toDownload.mapConcurrent(downloadParallelism) { item ->
val url = "$urlWithPrefix${item.name}/${item.file.fileName}"
spanBuilder("download").setAttribute("name", item.name).setAttribute("url", url).use {
if (connection == null) {
connection = client.connect(effectiveServerUri.host, effectiveServerUri.port)
}
try {
val errors = CopyOnWriteArrayList<Throwable>()
toDownload.forEachConcurrent(downloadParallelism) { item ->
val urlPath = "$urlPathWithPrefix${item.name}/${item.file.fileName}"
spanBuilder("download").setAttribute("name", item.name).setAttribute("urlPath", urlPath).use { span ->
try {
downloadedBytes.getAndAdd(
download(
item = item,
url = url,
bufferPool = bufferPool,
urlPath = urlPath,
skipUnpack = skipUnpack,
saveHash = saveHash,
client = client,
connection = connection,
)
)
}
@@ -80,43 +83,41 @@ internal suspend fun downloadCompilationCache(
throw e
}
catch (e: Throwable) {
return@use CompilePartDownloadFailedError(item, e)
span.recordException(e)
errors.add(CompilePartDownloadFailedError(item, e))
}
null
}
}
}.filterNotNull()
return errors
}
finally {
connection.close()
}
}
private suspend fun download(
item: FetchAndUnpackItem,
url: String,
bufferPool: DirectFixedSizeByteBufferPool,
urlPath: String,
skipUnpack: Boolean,
saveHash: Boolean,
client: OkHttpClient,
connection: Http2ClientConnection,
): Long {
val downloaded = retryWithExponentialBackOff(onException = ::onDownloadException) {
client.newCall(Request.Builder().url(url).build()).executeAsync().useSuccessful { response ->
val digest = sha256()
writeFile(file = item.file, response = response, bufferPool = bufferPool, url = url, digest = digest)
val computedHash = digestToString(digest)
if (computedHash != item.hash) {
throw HashMismatchException("hash mismatch") { span, attempt ->
span.addEvent(
"hash mismatch",
Attributes.of(
AttributeKey.longKey("attemptNumber"), attempt.toLong(),
AttributeKey.stringKey("name"), item.file.name,
AttributeKey.stringKey("expected"), item.hash,
AttributeKey.stringKey("computed"), computedHash,
)
)
}
}
response.body.contentLength()
}
val (downloaded, digest) = connection.download(path = urlPath, file = item.file, digestFactory = { sha256() })
val digestBytes = digest.digest()
val computedHash = BigInteger(1, digestBytes).toString(36) + "-z"
if (computedHash != item.hash) {
println("actualHash : ${computeHash(item.file)}")
println("expectedHash: ${item.hash}")
println("computedHash: $computedHash")
val spanAttributes = Attributes.of(
AttributeKey.stringKey("name"), item.file.name,
AttributeKey.stringKey("expected"), item.hash,
AttributeKey.stringKey("computed"), computedHash,
)
throw HashMismatchException("hash mismatch ($spanAttributes)")
}
if (!skipUnpack) {
spanBuilder("unpack").setAttribute("name", item.name).use {
unpackArchive(item, saveHash)
@@ -125,23 +126,9 @@ private suspend fun download(
return downloaded
}
private suspend fun onDownloadException(attempt: Int, e: Exception) {
spanBuilder("Retrying download with exponential back off").use { span ->
if (e is HashMismatchException) {
e.eventLogger.invoke(span, attempt)
}
else {
span.addEvent("Attempt failed", Attributes.of(
AttributeKey.longKey("attemptNumber"), attempt.toLong(),
AttributeKey.stringKey("error"), e.toString()
))
}
}
}
internal class CompilePartDownloadFailedError(@JvmField val item: FetchAndUnpackItem, cause: Throwable) : RuntimeException(cause)
internal class HashMismatchException(message: String, @JvmField val eventLogger: (Span, Int) -> Unit) : IOException(message)
internal class HashMismatchException(message: String) : IOException(message)
internal fun unpackArchive(item: FetchAndUnpackItem, saveHash: Boolean) {
HashMapZipFile.load(item.file).use { zipFile ->
@@ -170,66 +157,4 @@ internal fun unpackArchive(item: FetchAndUnpackItem, saveHash: Boolean) {
// save actual hash
Files.writeString(item.output.resolve(".hash"), item.hash)
}
}
private fun writeFile(file: Path, response: Response, bufferPool: DirectFixedSizeByteBufferPool, url: String, digest: MessageDigest) {
Files.createDirectories(file.parent)
FileChannel.open(file, OVERWRITE_OPERATION).use { channel ->
val source = response.body.source()
val sourceBuffer = bufferPool.allocate()
object : ZstdDirectBufferDecompressingStreamNoFinalizer(sourceBuffer) {
public override fun refill(toRefill: ByteBuffer): ByteBuffer {
toRefill.clear()
do {
if (source.read(toRefill) == -1) {
break
}
}
while (!source.exhausted() && toRefill.hasRemaining())
toRefill.flip()
return toRefill
}
override fun close() {
try {
super.close()
}
finally {
bufferPool.release(sourceBuffer)
}
}
}.use { decompressor ->
var offset = 0L
val targetBuffer = bufferPool.allocate()
try {
// refill is not called on start
decompressor.refill(sourceBuffer)
do {
do {
// decompressor can consume not the whole source buffer if target buffer size is not enough
decompressor.read(targetBuffer)
targetBuffer.flip()
targetBuffer.mark()
digest.update(targetBuffer)
targetBuffer.reset()
do {
offset += channel.write(targetBuffer, offset)
}
while (targetBuffer.hasRemaining())
targetBuffer.clear()
}
while (sourceBuffer.hasRemaining())
}
while (!source.exhausted())
}
catch (e: IOException) {
throw IOException("Cannot unpack $url", e)
}
finally {
bufferPool.release(targetBuffer)
}
}
}
}

View File

@@ -4,15 +4,12 @@ package org.jetbrains.intellij.build.impl.compilation
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.suspendCancellableCoroutine
import okhttp3.*
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.internal.closeQuietly
import org.jetbrains.intellij.build.NoMoreRetriesException
import java.io.IOException
import java.util.concurrent.TimeUnit
import kotlin.coroutines.resumeWithException
internal val MEDIA_TYPE_BINARY = "application/octet-stream".toMediaType()
internal suspend fun OkHttpClient.head(url: String, authHeader: String): Int {
return newCall(Request.Builder().url(url).head().header("Authorization", authHeader).build()).executeAsync().use { response ->
if (response.code != 200 && response.code != 404) {
@@ -43,6 +40,11 @@ internal val httpClient: OkHttpClient by lazy {
.connectTimeout(timeout, unit)
.writeTimeout(timeout, unit)
.readTimeout(timeout, unit)
.dispatcher(Dispatcher().apply {
// we upload/download to the same host - increase `maxRequestsPerHost`
//maxRequestsPerHost = Runtime.getRuntime().availableProcessors().coerceIn(5, 16)
//... but in the same time it can increase the bill for ALB, so, leave it as is
})
.addInterceptor { chain ->
var request = chain.request()
if (request.header("User-Agent").isNullOrBlank()) {
@@ -83,7 +85,7 @@ internal suspend fun Call.executeAsync(): Response {
continuation.invokeOnCancellation {
this.cancel()
}
this.enqueue(object : Callback {
enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
continuation.resumeWithException(e)
}

View File

@@ -4,47 +4,42 @@
package org.jetbrains.intellij.build.impl.compilation
import com.github.luben.zstd.Zstd
import com.github.luben.zstd.ZstdDirectBufferCompressingStreamNoFinalizer
import com.intellij.platform.util.coroutines.forEachConcurrent
import com.github.luben.zstd.ZstdCompressCtx
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http2.Http2StreamChannel
import io.netty.util.AsciiString
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.RequestBody.Companion.toRequestBody
import okio.BufferedSink
import okio.use
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
import org.jetbrains.intellij.build.http2Client.MAX_BUFFER_SIZE
import org.jetbrains.intellij.build.http2Client.writeData
import org.jetbrains.intellij.build.io.unmapBuffer
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import java.nio.ByteBuffer
import java.nio.MappedByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.*
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.min
private val MEDIA_TYPE_JSON = "application/json".toMediaType()
internal val READ_OPERATION = EnumSet.of(StandardOpenOption.READ)
internal const val MAX_BUFFER_SIZE = 4 * 1014 * 1024
internal const val ZSTD_LEVEL = 3
internal suspend fun uploadArchives(
reportStatisticValue: (key: String, value: String) -> Unit,
config: CompilationCacheUploadConfiguration,
metadataJson: String,
httpClient: OkHttpClient,
httpConnection: Http2ClientConnection,
items: List<PackAndUploadItem>,
bufferPool: DirectFixedSizeByteBufferPool,
) {
val uploadedCount = AtomicInteger()
val uploadedBytes = AtomicLong()
@@ -56,20 +51,25 @@ internal suspend fun uploadArchives(
val alreadyUploaded: Set<String> = try {
if (config.checkFiles) {
spanBuilder("fetch info about already uploaded files").use {
HashSet(getFoundAndMissingFiles(metadataJson, config.serverUrl, httpClient).found)
getFoundAndMissingFiles(metadataJson = metadataJson, urlPathPrefix = config.serverUrlPathPrefix, connection = httpConnection).found
}
}
else {
emptySet()
}
}
catch (e: CancellationException) {
throw e
}
catch (e: Throwable) {
Span.current().recordException(e, Attributes.of(AttributeKey.stringKey("message"), "failed to fetch info about already uploaded files, will fallback to HEAD requests"))
fallbackToHeads = true
emptySet()
}
withContext(Dispatchers.IO) {
val sourceBlockSize = MAX_BUFFER_SIZE
val urlPathPrefix = "${config.serverUrlPathPrefix}/${config.uploadUrlPathPrefix}"
ZstdCompressContextPool().use { zstdCompressContextPool ->
items.forEachConcurrent(uploadParallelism) { item ->
if (alreadyUploaded.contains(item.name)) {
reusedCount.getAndIncrement()
@@ -77,16 +77,18 @@ internal suspend fun uploadArchives(
return@forEachConcurrent
}
val urlPath = "$urlPathPrefix/${item.name}/${item.hash!!}.jar"
spanBuilder("upload archive").setAttribute("name", item.name).setAttribute("hash", item.hash!!).use {
val size = Files.size(item.archive)
val isUploaded = uploadFile(
url = "${config.serverUrl}/${config.uploadPrefix}/${item.name}/${item.hash!!}.jar",
urlPath = urlPath,
file = item.archive,
useHead = fallbackToHeads,
span = Span.current(),
httpClient = httpClient,
bufferPool = bufferPool,
httpSession = httpConnection,
fileSize = size,
sourceBlockSize = sourceBlockSize,
zstdCompressContextPool = zstdCompressContextPool,
)
if (isUploaded) {
uploadedCount.getAndIncrement()
@@ -125,172 +127,100 @@ internal suspend fun uploadArchives(
reportStatisticValue("compile-parts:total:count", (reusedCount.get() + uploadedCount.get()).toString())
}
private suspend fun getFoundAndMissingFiles(metadataJson: String, serverUrl: String, httpClient: OkHttpClient): CheckFilesResponse {
httpClient.newCall(Request.Builder()
.url("$serverUrl/check-files")
.post(metadataJson.toRequestBody(MEDIA_TYPE_JSON))
.build()).executeAsync().useSuccessful {
return Json.decodeFromStream(it.body.byteStream())
}
private suspend fun getFoundAndMissingFiles(metadataJson: String, urlPathPrefix: String, connection: Http2ClientConnection): CheckFilesResponse {
return connection.post(path = "$urlPathPrefix/check-files", data = metadataJson, contentType = HttpHeaderValues.APPLICATION_JSON)
}
// Using ZSTD dictionary doesn't make the difference, even slightly worse (default compression level 3).
// That's because in our case, we compress a relatively large archive of class files.
private suspend fun uploadFile(
url: String,
urlPath: String,
file: Path,
useHead: Boolean,
span: Span,
httpClient: OkHttpClient,
bufferPool: DirectFixedSizeByteBufferPool,
httpSession: Http2ClientConnection,
fileSize: Long,
sourceBlockSize: Int,
zstdCompressContextPool: ZstdCompressContextPool,
): Boolean {
if (useHead) {
val request = Request.Builder().url(url).head().build()
val code = httpClient.newCall(request).executeAsync().use {
it.code
val status = httpSession.head(urlPath)
if (status == HttpResponseStatus.OK) {
span.addEvent("already exist on server, nothing to upload", Attributes.of(AttributeKey.stringKey("urlPath"), urlPath))
return false
}
when {
code == 200 -> {
span.addEvent("already exist on server, nothing to upload", Attributes.of(AttributeKey.stringKey("url"), url))
return false
}
code != 404 -> {
span.addEvent("responded with unexpected", Attributes.of(
AttributeKey.longKey("code"), code.toLong(),
AttributeKey.stringKey("url"), url,
))
}
else if (status != HttpResponseStatus.NOT_FOUND) {
span.addEvent(
"responded with unexpected",
Attributes.of(
AttributeKey.stringKey("status"), status.toString(),
AttributeKey.stringKey("urlPath"), urlPath
),
)
}
}
if (Zstd.compressBound(fileSize) <= MAX_BUFFER_SIZE) {
compressSmallFile(file = file, fileSize = fileSize, bufferPool = bufferPool, url = url)
}
else {
val request = Request.Builder()
.url(url)
.put(object : RequestBody() {
override fun contentType() = MEDIA_TYPE_BINARY
require(fileSize > 0)
override fun writeTo(sink: BufferedSink) {
compressFile(file = file, output = sink, bufferPool = bufferPool)
}
})
.build()
httpClient.newCall(request).executeAsync().useSuccessful { }
val fileBuffer = FileChannel.open(file, READ_OPERATION).use { channel ->
channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize)
}
try {
zstdCompressContextPool.withZstd { zstd ->
httpSession.put(AsciiString.of(urlPath)) { stream ->
compressAndUpload(
fileSize = fileSize,
fileBuffer = fileBuffer,
sourceBlockSize = sourceBlockSize,
zstd = zstd,
stream = stream,
)
}
}
}
finally {
unmapBuffer(fileBuffer)
}
return true
}
private suspend fun compressSmallFile(file: Path, fileSize: Long, bufferPool: DirectFixedSizeByteBufferPool, url: String) {
val targetBuffer = bufferPool.allocate()
try {
var readOffset = 0L
val sourceBuffer = bufferPool.allocate()
try {
FileChannel.open(file, READ_OPERATION).use { input ->
do {
readOffset += input.read(sourceBuffer, readOffset)
}
while (readOffset < fileSize)
}
sourceBuffer.flip()
private suspend fun compressAndUpload(
fileSize: Long,
fileBuffer: MappedByteBuffer,
sourceBlockSize: Int,
zstd: ZstdCompressCtx,
stream: Http2StreamChannel,
) {
var position = 0
while (true) {
val chunkSize = min(fileSize - position, sourceBlockSize.toLong()).toInt()
val targetSize = Zstd.compressBound(chunkSize.toLong()).toInt()
val targetNettyBuffer = stream.alloc().directBuffer(targetSize)
val targetBuffer = targetNettyBuffer.nioBuffer(0, targetSize)
val compressedSize = zstd.compressDirectByteBuffer(
targetBuffer, // compress into targetBuffer
targetBuffer.position(), // write compressed data starting at offset position()
targetSize, // write no more than target block size bytes
fileBuffer, // read data to compress from fileBuffer
position, // start reading at position()
chunkSize, // read chunk size bytes
)
assert(compressedSize > 0)
targetNettyBuffer.writerIndex(targetNettyBuffer.writerIndex() + compressedSize)
assert(targetNettyBuffer.readableBytes() == compressedSize)
Zstd.compress(targetBuffer, sourceBuffer, ZSTD_LEVEL, false)
targetBuffer.flip()
}
finally {
bufferPool.release(sourceBuffer)
}
position += chunkSize
val compressedSize = targetBuffer.remaining()
val request = Request.Builder()
.url(url)
.put(object : RequestBody() {
override fun contentLength() = compressedSize.toLong()
override fun contentType() = MEDIA_TYPE_BINARY
override fun writeTo(sink: BufferedSink) {
targetBuffer.mark()
sink.write(targetBuffer)
targetBuffer.reset()
}
})
.build()
httpClient.newCall(request).executeAsync().useSuccessful { }
}
finally {
bufferPool.release(targetBuffer)
}
}
private fun compressFile(file: Path, output: BufferedSink, bufferPool: DirectFixedSizeByteBufferPool) {
val targetBuffer = bufferPool.allocate()
CompilationCacheZstdCompressingStream(targetBuffer = targetBuffer, output = output, bufferPool = bufferPool).use { compressor ->
val sourceBuffer = bufferPool.allocate()
try {
var offset = 0L
FileChannel.open(file, READ_OPERATION).use { input ->
val fileSize = input.size()
while (offset < fileSize) {
val actualBlockSize = (fileSize - offset).toInt()
if (sourceBuffer.remaining() > actualBlockSize) {
sourceBuffer.limit(sourceBuffer.position() + actualBlockSize)
}
var readOffset = offset
do {
readOffset += input.read(sourceBuffer, readOffset)
}
while (sourceBuffer.hasRemaining())
sourceBuffer.flip()
compressor.compress(sourceBuffer)
sourceBuffer.clear()
offset = readOffset
}
}
}
finally {
bufferPool.release(sourceBuffer)
}
}
}
private class CompilationCacheZstdCompressingStream(
private val targetBuffer: ByteBuffer,
private val output: BufferedSink,
private val bufferPool: DirectFixedSizeByteBufferPool,
) : ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, ZSTD_LEVEL) {
override fun flushBuffer(toFlush: ByteBuffer): ByteBuffer {
toFlush.flip()
while (toFlush.hasRemaining()) {
output.write(toFlush)
}
toFlush.clear()
return toFlush
}
override fun close() {
try {
super.close()
}
finally {
bufferPool.release(targetBuffer)
val endStream = position >= fileSize
stream.writeData(targetNettyBuffer, endStream)
if (endStream) {
break
}
}
}
@Serializable
private data class CheckFilesResponse(
@JvmField val found: List<String> = emptyList(),
@JvmField val missing: List<String> = emptyList(),
)
@JvmField val found: HashSet<String> = HashSet(),
)

View File

@@ -4,10 +4,8 @@
package org.jetbrains.intellij.build.impl.sbom
import com.intellij.openapi.util.SystemInfoRt
import com.intellij.platform.util.coroutines.forEachConcurrent
import com.intellij.util.io.DigestUtil
import com.intellij.util.io.DigestUtil.sha1Hex
import com.intellij.util.io.DigestUtil.updateContentHash
import com.intellij.util.io.bytesToHex
import com.intellij.util.io.sha256Hex
import com.jetbrains.plugin.structure.base.utils.exists

View File

@@ -1,19 +1,35 @@
daemon off;
worker_processes auto;
events {
worker_connections 1024;
}
http {
log_format main '[$time_local] "$request" $status $body_bytes_sent ';
access_log /dev/stdout main;
error_log /dev/stderr;
gzip on;
gzip_types text/plain text/css application/x-javascript text/xml application/xml application/xml+rss text/javascript application/json application/javascript text/x-js;
server {
listen 1900;
listen 127.0.0.1:1900 ssl;
http2 on;
server_name 127.0.0.1;
ssl_certificate ./server.crt;
ssl_certificate_key ./server.key;
location / {
root /tmp/webdav;
client_max_body_size 0;
create_full_put_path on;
dav_methods PUT DELETE MKCOL COPY MOVE;
dav_methods PUT;
autoindex on;
}
location /check-files {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_pass http://127.0.0.1:8082/;
}
}
}

View File

@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDhjCCAm6gAwIBAgIJAJN8H+8liHX0MA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV
BAYTAlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2Vs
Zi1zaWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyNy4wLjAuMTogU2VsZi1z
aWduZWQgY2VydGlmaWNhdGUwHhcNMjQwODI2MTEyMzEzWhcNMjYwODI2MTEyMzEz
WjB4MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAe
BgNVBAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjcuMC4w
LjE6IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAvRyBeGto8nsEMkr14NK+g9hIyFdiA+j+m1fQ4D/y+zI+N+7Z
2NgVTK5TJlxoZkmjbfqZ4Dmv9nqL/OQ8LF3Jw+gkInEhX6lb/NX+IbywOSzMpkuu
pIzGN0UWhhfb3oF3YfD1HTSrpEcpOP8VeUGRt6XNNK2XgcrFAuQVoSNQktdCOJuF
VC20dYBT2ngk02uW+6Vs0/q4rMpMGalRTOel1aOLJrlCocITe/iUUOwoTugcCaro
T+7hKofSrNLI+vwN9s8H/YGj2QyssfhBqHOsah080Be5jkRP7TolrIlIY72XDe9b
VA/vfHoJ1ry6n4aHNouHU1BC8Kyy416/wPCZJwIDAQABoxMwETAPBgNVHREECDAG
hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQBZ+EbXZyIvUGhSdxqbRpUSUX+gVu4h
QlgVNlk0w7s+SnSzru3MLXYV0KW/nbLmW1EUe2zaAbWHPS85VvsZszYlBtOzqUh3
6uyEXmGr9bq4laMo7mK6pDGxsI72Xl4X3Vm/gUw547z5+gh4/ggzer+lQrOPgLRJ
vhXwCSD26wB2qx4pgQAV0oGtC8vcm7hSkP2FJH1HveOvivdwDQfeo5+pc8YJvsUG
pZK/2QqZY2QkQ4iv1rreaxWv3p7W2EVkioyjxbgCZBglxTC8RwPwH4Kxhs+8tXzi
E2AUE5ryQCwGbQw6dElJvIFY9TpgPPEeCu8l/JzJxxkkR1TNK58rH8rN
-----END CERTIFICATE-----

View File

@@ -0,0 +1,29 @@
-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQC9HIF4a2jyewQy
SvXg0r6D2EjIV2ID6P6bV9DgP/L7Mj437tnY2BVMrlMmXGhmSaNt+pngOa/2eov8
5DwsXcnD6CQicSFfqVv81f4hvLA5LMymS66kjMY3RRaGF9vegXdh8PUdNKukRyk4
/xV5QZG3pc00rZeBysUC5BWhI1CS10I4m4VULbR1gFPaeCTTa5b7pWzT+risykwZ
qVFM56XVo4smuUKhwhN7+JRQ7ChO6BwJquhP7uEqh9Ks0sj6/A32zwf9gaPZDKyx
+EGoc6xqHTzQF7mORE/tOiWsiUhjvZcN71tUD+98egnWvLqfhoc2i4dTUELwrLLj
Xr/A8JknAgMBAAECggEALSaEJtsGKHaEbvmEsNPAFrxpzCNIzIQxXadewFukSKMb
RcFqE6Krmy43vf3sExfbxCND38wGHgPuLkfTsggGZxaiofJ+tFc8FiaFUUq6jDwM
9Fs3bCQMIyAEm6lQnlQsy5569ykfHc67odcNKnEkOEOGteAIPz3JQcJxA5Lp5tTC
HmyZOcrbFaDSpn1h/HXsq3OIP98TCT2KYOhFuWy0JQTr9uvZDuq8lqoZ53fMNjj/
0RwesJdHpeRb7GnWW66GQrOmMCFZkKqWOLUK8tcwCYfo9w5GV9BC8sSUx3RF4f1H
6oO8v3caScOChrznnx7b5KnRkS7O0UUxuTt1+h1HOQKBgQD5fBDSfm+YvMJyAP0Z
bmIdqqchMyW6xlF1tfhpfLkgFj1ufpIE/G8q+ZUVfah+TMhPUAyAc3AznYHCiHVe
suu1QJ7tIuBkRfWZgdLo8XdREhpKnm8xAUXOs43t6/CxmPTao+ylN0WGmUHVgQae
OHcpftNN3zLio0rNm1xOlmdDSwKBgQDCDNA0pxzOvELHHFCMG+N7Sjdy/HK2h1LW
UQ/FvP/qx/iPJ0fyW0LZkLF2Y9nc7FxVqiA/5W2sy6MQ3ZuOzpobVobeb8SC3fBm
Q6TSCIKd69E7vLCb9BMfnYPYqW2WP3fEYp8A3MXMyjDJxZSharNeP7/YSSk85VfE
ruq/ege8FQKBgBnslGrrDHmYk7P4+lPcLoHaq8c9Y1xHI0vR/uAnP61f4j5LFK1D
9eFHUgCLsCh/ngjvznzCghQ697LZLykJ+og5EMqfZyXER0MORHZEMRvRf73lPLSg
5zoVWlgwvjAWLstRYVPBrI3R+w9OevuR7n/3V8mtucHnKey3ih34bv6FAoGABtxH
HCVwWkrDnaB9pIZz4277SOBt+dAM+LDC+v20motZWU5NN99MHL8F1yaulCXzGcA7
BadJ2lsUt8rt7f2V6zOC7yhKbUoFbsgjcp2EaKrmqdMA93KInoyGFcnfqvkxdcr6
ziAACj53vRp0J8TK9KESWkYz5AhDsxtwBzb8QQUCgYAawMvTQRDPGwMLtoSBEyDt
j7ytvxzWPdMhf9/d+xT6wtyfVHnc1CSWermfmN3BXTHTBskTeAXVA8l2sjM5lMOG
obekKJy31dNz+4ysu1ZiwspxS22iNLgii6L2Qy7u6I16Kb2ZHhmVMwSVcTABzNxS
//ymT1boXSvNKgTWwe30qg==
-----END PRIVATE KEY-----

View File

@@ -1,10 +1,11 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build
import com.intellij.testFramework.utils.io.deleteRecursively
import com.intellij.util.SystemProperties
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import org.jetbrains.intellij.build.impl.compilation.fetchAndUnpackCompiledClasses
import org.jetbrains.intellij.build.io.deleteDir
import org.jetbrains.intellij.build.telemetry.TraceManager
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Assumptions.assumeTrue
@@ -25,7 +26,7 @@ class CompilationCacheTest {
@Test
fun testUnpack() = runBlocking(Dispatchers.Default) {
val metadataFile = Path.of("/Volumes/data/Documents/idea/out/compilation-archive/metadata.json")
val metadataFile = Path.of(SystemProperties.getUserHome(), "projects/idea/out/compilation-archive/metadata.json")
assumeTrue(Files.exists(metadataFile))
// do not use Junit TempDir - it is very slow
@@ -41,10 +42,7 @@ class CompilationCacheTest {
)
}
finally {
Files.list(outDir).parallel().use { stream ->
stream.forEach(::deleteDir)
}
Files.delete(outDir)
outDir.deleteRecursively()
}
}
}