use http2 client for JPS cache (part 3)

GitOrigin-RevId: b17ae4795ec9bdd088331d737fee90c06566e63f
This commit is contained in:
Vladimir Krivosheev
2024-09-06 08:41:47 +02:00
committed by intellij-monorepo-bot
parent 29ef8fa3e3
commit f220d6d1b5
22 changed files with 418 additions and 366 deletions

View File

@@ -24,18 +24,23 @@ import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration.Companion.seconds
// don't use JaegerJsonSpanExporter - not needed for clients, should be enabled only if needed to avoid writing a ~500KB JSON file
fun withTracer(block: suspend () -> Unit): Unit = runBlocking(Dispatchers.Default) {
fun withTracer(serviceName: String, traceFile: Path? = null, block: suspend () -> Unit): Unit = runBlocking(Dispatchers.Default) {
val batchSpanProcessorScope = CoroutineScope(SupervisorJob(parent = coroutineContext.job)) + CoroutineName("BatchSpanProcessor")
@Suppress("ReplaceJavaStaticMethodWithKotlinAnalog")
val spanProcessor = BatchSpanProcessor(
coroutineScope = batchSpanProcessorScope,
spanExporters = java.util.List.of(ConsoleSpanExporter()),
spanExporters = if (traceFile == null) {
java.util.List.of(ConsoleSpanExporter())
}
else {
java.util.List.of(ConsoleSpanExporter(), JaegerJsonSpanExporter(file = traceFile, serviceName = serviceName))
},
scheduleDelay = 10.seconds,
)
try {
val tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.setResource(Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), "builder")))
.setResource(Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName)))
.build()
traceManagerInitializer = {

View File

@@ -21,7 +21,7 @@ fun buildDevMain(): Collection<Path> {
var homePath: String? = null
var newClassPath: Collection<Path>? = null
withTracer {
withTracer(serviceName = "builder") {
buildProductInProcess(
BuildRequest(
platformPrefix = System.getProperty("idea.platform.prefix", "idea"),

View File

@@ -24,10 +24,16 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.supervisorScope
import java.net.InetSocketAddress
import java.net.URI
import kotlin.coroutines.CoroutineContext
private fun nioIoHandlerAndSocketChannel() = NioIoHandler.newFactory() to NioSocketChannel::class.java
private val commonHeaders = arrayOf(
HttpHeaderNames.USER_AGENT, AsciiString.of("IJ Builder"),
AsciiString.of("x-tc-build-id"), AsciiString.of(System.getProperty("teamcity.buildType.id", ""))
)
internal class Http2ClientConnectionFactory(
private val bootstrapTemplate: Bootstrap,
private val sslContext: SslContext?,
@@ -38,8 +44,12 @@ internal class Http2ClientConnectionFactory(
return connect(InetSocketAddress.createUnresolved(host, port.let { if (it == -1) 443 else it }), authHeader)
}
fun connect(address: URI, authHeader: CharSequence? = null): Http2ClientConnection {
return connect(host = address.host, port = address.port, authHeader = authHeader)
}
fun connect(server: InetSocketAddress, auth: CharSequence? = null): Http2ClientConnection {
var commonHeaders = arrayOf(HttpHeaderNames.USER_AGENT, AsciiString.of("IJ Builder"))
var commonHeaders = commonHeaders
if (auth != null) {
commonHeaders += arrayOf(HttpHeaderNames.AUTHORIZATION, AsciiString.of(auth))
}

View File

@@ -6,10 +6,7 @@ package org.jetbrains.intellij.build.http2Client
import com.intellij.platform.util.coroutines.childScope
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.*
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpStatusClass
import io.netty.handler.codec.http2.*
@@ -31,7 +28,8 @@ import kotlin.random.asJavaRandom
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
internal class UnexpectedHttpStatus(@JvmField val status: HttpResponseStatus) : RuntimeException("Unexpected HTTP response status: $status")
internal class UnexpectedHttpStatus(urlPath: CharSequence?, @JvmField val status: HttpResponseStatus)
: RuntimeException("Unexpected HTTP response status: $status" + (if (urlPath == null) "" else " (urlPath=$urlPath)"))
private class ConnectionState(
@JvmField val bootstrap: Http2StreamChannelBootstrap,
@@ -41,9 +39,13 @@ private class ConnectionState(
private const val MAX_ATTEMPTS = 2
private val backOffLimitMs = 500.milliseconds.inWholeMilliseconds
private const val backOffFactor = 2L
private const val backOffJitter = 0.1
// https://cabulous.medium.com/http-2-and-how-it-works-9f645458e4b2
// https://stackoverflow.com/questions/55087292/how-to-handle-http-2-goaway-with-java-net-httpclient
// Server can send GOAWAY frame after X streams, that's why we need manager for channel - open a new one in case of such error
// Server can send GOAWAY frame after X streams, that's why we need manager for channel - open a new one in case of such an error
internal class Http2ConnectionProvider(
private val server: InetSocketAddress,
private val bootstrapTemplate: Bootstrap,
@@ -127,10 +129,7 @@ internal class Http2ConnectionProvider(
suspend fun <T> stream(block: suspend (streamChannel: Http2StreamChannel, result: CompletableDeferred<T>) -> Unit): T {
var attemptIndex = 0
var effectiveDelay = 1.seconds.inWholeMilliseconds
val backOffLimitMs = 500.milliseconds.inWholeMilliseconds
val backOffFactor = 2L
val backOffJitter = 0.1
var effectiveDelay = 5.seconds.inWholeMilliseconds
var suppressedExceptions: MutableList<Throwable>? = null
while (true) {
var currentConnection: ConnectionState? = null
@@ -138,13 +137,7 @@ internal class Http2ConnectionProvider(
currentConnection = getConnection()
// use a single-thread executor as Netty does for handlers for thread safety and fewer context switches
val eventLoop = bootstrapTemplate.config().group().next()
return withContext(currentConnection.coroutineScope.coroutineContext + object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
eventLoop.execute(block)
}
override fun isDispatchNeeded(context: CoroutineContext) = !eventLoop.inEventLoop()
}) {
return withContext(currentConnection.coroutineScope.coroutineContext + EventLoopCoroutineDispatcher(eventLoop)) {
openStreamAndConsume(connectionState = currentConnection, block = block)
}
}
@@ -156,9 +149,12 @@ internal class Http2ConnectionProvider(
// task is canceled (due to GoAway or other such reasons), but not parent context - retry (without incrementing attemptIndex)
continue
}
else {
throw e
}
}
catch (e: UnexpectedHttpStatus) {
// retry only for sever errors
// retry only for server errors
if (e.status.codeClass() != HttpStatusClass.SERVER_ERROR) {
throw e
}
@@ -170,17 +166,15 @@ internal class Http2ConnectionProvider(
e.addSuppressed(suppressedException)
}
}
throw e
throw RuntimeException("${attemptIndex + 1} attempts failed", e)
}
if (suppressedExceptions == null) {
suppressedExceptions = ArrayList()
}
suppressedExceptions.add(e)
if (attemptIndex != 0) {
delay(effectiveDelay)
}
Span.current().recordException(e, Attributes.of(AttributeKey.longKey("attemptIndex"), attemptIndex.toLong(), AttributeKey.longKey("delay"), effectiveDelay))
delay(effectiveDelay)
effectiveDelay = min(effectiveDelay * backOffFactor, backOffLimitMs) + (Random.asJavaRandom().nextGaussian() * effectiveDelay * backOffJitter).toLong()
}
@@ -260,4 +254,12 @@ private class Http2ClientFrameInitializer(
}
}))
}
}
}
private class EventLoopCoroutineDispatcher(private val eventLoop: EventLoop) : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
eventLoop.execute(block)
}
override fun isDispatchNeeded(context: CoroutineContext) = !eventLoop.inEventLoop()
}

View File

@@ -27,7 +27,7 @@ private val json = Json {
internal suspend fun Http2ClientConnection.getString(path: CharSequence): String {
return connection.stream { stream, result ->
stream.pipeline().addLast(Http2StreamJsonInboundHandler(result = result, bufferAllocator = stream.alloc(), deserializer = null))
stream.pipeline().addLast(Http2StreamJsonInboundHandler(urlPath = path, result = result, bufferAllocator = stream.alloc(), deserializer = null))
stream.writeHeaders(
headers = createHeaders(
@@ -58,6 +58,7 @@ private suspend fun <T : Any> Http2ClientConnection.doGetJsonOrDefaultIfNotFound
): T {
return connection.stream { stream, result ->
stream.pipeline().addLast(Http2StreamJsonInboundHandler(
urlPath = path,
result = result,
bufferAllocator = stream.alloc(),
deserializer = deserializer,
@@ -78,7 +79,7 @@ private suspend fun <T : Any> Http2ClientConnection.doGetJsonOrDefaultIfNotFound
private suspend fun <T : Any> Http2ClientConnection.post(path: AsciiString, data: CharSequence, contentType: AsciiString, deserializer: DeserializationStrategy<T>): T {
return connection.stream { stream, result ->
val bufferAllocator = stream.alloc()
stream.pipeline().addLast(Http2StreamJsonInboundHandler(result = result, bufferAllocator = bufferAllocator, deserializer = deserializer))
stream.pipeline().addLast(Http2StreamJsonInboundHandler(urlPath = path, result = result, bufferAllocator = bufferAllocator, deserializer = deserializer))
stream.writeHeaders(
headers = createHeaders(
@@ -94,6 +95,7 @@ private suspend fun <T : Any> Http2ClientConnection.post(path: AsciiString, data
}
private class Http2StreamJsonInboundHandler<T : Any>(
private val urlPath: CharSequence,
private val bufferAllocator: ByteBufAllocator,
private val result: CompletableDeferred<T>,
private val deserializer: DeserializationStrategy<T>?,
@@ -109,6 +111,8 @@ private class Http2StreamJsonInboundHandler<T : Any>(
compositeBuffer
}
override fun acceptInboundMessage(message: Any) = message is Http2DataFrame || message is Http2HeadersFrame
override fun channelRead0(context: ChannelHandlerContext, frame: Http2StreamFrame) {
if (frame is Http2DataFrame) {
val frameContent = frame.content()
@@ -154,7 +158,7 @@ private class Http2StreamJsonInboundHandler<T : Any>(
result.complete(defaultIfNotFound)
}
else {
result.completeExceptionally(UnexpectedHttpStatus(status))
result.completeExceptionally(UnexpectedHttpStatus(urlPath = urlPath, status))
}
return
}

View File

@@ -90,7 +90,7 @@ private class ZstdDecompressingFileDownloadHandler(
if (frame is Http2HeadersFrame) {
val status = HttpResponseStatus.parseLine(frame.headers().status())
if (status != HttpResponseStatus.OK) {
result.completeExceptionally(UnexpectedHttpStatus(status))
result.completeExceptionally(UnexpectedHttpStatus(null, status))
}
}
else if (frame is Http2DataFrame) {
@@ -160,7 +160,7 @@ private class FileDownloadHandler(
result.complete(-1)
}
else {
result.completeExceptionally(UnexpectedHttpStatus(status))
result.completeExceptionally(UnexpectedHttpStatus(null, status))
}
}
}

View File

@@ -115,7 +115,6 @@ internal abstract class InboundHandlerResultTracker<T : Any>(
) : SimpleChannelInboundHandler<T>() {
override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
result.completeExceptionally(cause)
context.close()
}
override fun channelInactive(context: ChannelHandlerContext) {

View File

@@ -1,10 +1,8 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
package org.jetbrains.intellij.build.http2Client
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
import org.jetbrains.intellij.build.http2Client.Http2ClientConnectionFactory
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import java.net.URI
@@ -15,11 +13,11 @@ internal suspend fun <T> checkMirrorAndConnect(
authHeader: CharSequence? = null,
block: suspend (connection: Http2ClientConnection, urlPathPrefix: String) -> T,
): T {
var urlPath = initialServerUri.path
// first let's check for initial redirect (mirror selection)
var connection = client.connect(host = initialServerUri.host, port = initialServerUri.port, authHeader = authHeader)
try {
spanBuilder("mirror selection").use { span ->
return spanBuilder("mirror selection").setAttribute("initialServerUri", initialServerUri.toString()).use { span ->
// first let's check for initial redirect (mirror selection)
var connection = client.connect(address = initialServerUri, authHeader = authHeader)
try {
var urlPath = initialServerUri.path
val newLocation = connection.getRedirectLocation("$urlPath/")?.toString()
if (newLocation == null) {
span.addEvent("origin server will be used", Attributes.of(AttributeKey.stringKey("url"), initialServerUri.toString()))
@@ -30,12 +28,12 @@ internal suspend fun <T> checkMirrorAndConnect(
val newServerUri = URI(newLocation)
urlPath = newServerUri.path.trimEnd('/')
span.addEvent("redirected to mirror", Attributes.of(AttributeKey.stringKey("url"), newLocation))
connection = client.connect(host = newServerUri.host, port = newServerUri.port, authHeader = authHeader)
connection = client.connect(address = newServerUri, authHeader = authHeader)
}
block(connection, urlPath)
}
finally {
connection.close()
}
return block(connection, urlPath)
}
finally {
connection.close()
}
}

View File

@@ -5,6 +5,7 @@ package org.jetbrains.intellij.build.http2Client
import com.github.luben.zstd.Zstd
import com.github.luben.zstd.ZstdCompressCtx
import io.netty.buffer.ByteBufUtil
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
@@ -31,6 +32,17 @@ internal suspend fun Http2ClientConnection.upload(path: CharSequence, file: Path
return upload(path = path, file = file, sourceBlockSize = 1024 * 1024, zstdCompressContextPool = null)
}
internal suspend fun Http2ClientConnection.upload(path: CharSequence, data: CharSequence) {
return connection.stream { stream, result ->
val handler = WebDavPutStatusChecker(result)
handler.uploadedResult = Unit
stream.pipeline().addLast(handler)
stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM), endStream = false)
stream.writeData(ByteBufUtil.writeUtf8(stream.alloc(), data), endStream = true)
}
}
internal suspend fun Http2ClientConnection.upload(
path: CharSequence,
file: Path,
@@ -80,9 +92,9 @@ internal suspend fun Http2ClientConnection.upload(
}
}
private class WebDavPutStatusChecker(private val result: CompletableDeferred<UploadResult>) : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
private class WebDavPutStatusChecker<T>(private val result: CompletableDeferred<T>) : InboundHandlerResultTracker<Http2HeadersFrame>(result) {
@JvmField
var uploadedResult: UploadResult? = null
var uploadedResult: T? = null
override fun channelRead0(context: ChannelHandlerContext, frame: Http2HeadersFrame) {
if (!frame.isEndStream) {
@@ -95,7 +107,7 @@ private class WebDavPutStatusChecker(private val result: CompletableDeferred<Upl
result.complete(uploadedResult!!)
}
else {
result.completeExceptionally(UnexpectedHttpStatus(status))
result.completeExceptionally(UnexpectedHttpStatus(null, status))
}
}
}

View File

@@ -41,7 +41,6 @@ import org.jetbrains.intellij.build.moduleBased.OriginalModuleRepository
import org.jetbrains.intellij.build.telemetry.ConsoleSpanExporter
import org.jetbrains.intellij.build.telemetry.JaegerJsonSpanExporterManager
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.block
import org.jetbrains.intellij.build.telemetry.use
import org.jetbrains.jps.model.*
import org.jetbrains.jps.model.artifact.JpsArtifactService
@@ -296,7 +295,7 @@ class CompilationContextImpl private constructor(
}
override suspend fun compileModules(moduleNames: Collection<String>?, includingTestsInModules: List<String>?) {
spanBuilder("resolve dependencies and compile modules").block { span ->
spanBuilder("resolve dependencies and compile modules").use { span ->
compileMutex.withReentrantLock {
resolveProjectDependencies(this@CompilationContextImpl)
reuseOrCompile(context = this@CompilationContextImpl, moduleNames = moduleNames, includingTestsInModules = includingTestsInModules, span = span)

View File

@@ -1,34 +0,0 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("ReplaceGetOrSet")
package org.jetbrains.intellij.build.impl
// https://rosettacode.org/wiki/Find_common_directory_path#Java
internal fun getCommonPath(paths: List<String>): String {
var commonPath = ""
val folders = Array(paths.size) { paths[it].split('/') }
for (j in folders[0].indices) {
// grab the next folder name in the first path
val thisFolder = folders[0][j]
// assume all have matched in case there are no more paths
var allMatched = true
var i = 1
while (i < folders.size && allMatched) {
// look at the other paths
if (folders[i].size < j) { // if there is no folder here
allMatched = false // no match
break // stop looking because we've gone as far as we can
}
//otherwise
allMatched = folders[i][j] == thisFolder //check if it matched
i++
}
if (!allMatched) {
break
}
commonPath += "$thisFolder/"
}
return commonPath
}

View File

@@ -15,6 +15,9 @@ import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.impl.JpsCompilationRunner
import org.jetbrains.intellij.build.impl.cleanOutput
import org.jetbrains.intellij.build.impl.generateRuntimeModuleRepository
import org.jetbrains.intellij.build.jpsCache.isForceDownloadJpsCache
import org.jetbrains.intellij.build.jpsCache.isPortableCompilationCacheEnabled
import org.jetbrains.intellij.build.jpsCache.jpsCacheRemoteGitUrl
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.block
import org.jetbrains.intellij.build.telemetry.use
@@ -35,11 +38,11 @@ internal fun checkCompilationOptions(context: CompilationContext) {
messages.error(message)
}
}
if (options.pathToCompiledClassesArchive != null && IS_PORTABLE_COMPILATION_CACHE_ENABLED) {
if (options.pathToCompiledClassesArchive != null && isPortableCompilationCacheEnabled) {
messages.error("JPS Cache is enabled so '${BuildOptions.INTELLIJ_BUILD_COMPILER_CLASSES_ARCHIVE}' cannot be used")
}
val pathToCompiledClassArchiveMetadata = options.pathToCompiledClassesArchivesMetadata
if (pathToCompiledClassArchiveMetadata != null && IS_PORTABLE_COMPILATION_CACHE_ENABLED) {
if (pathToCompiledClassArchiveMetadata != null && isPortableCompilationCacheEnabled) {
messages.error("JPS Cache is enabled " +
"so '${BuildOptions.INTELLIJ_BUILD_COMPILER_CLASSES_ARCHIVES_METADATA}' cannot be used to fetch compile output")
}
@@ -108,7 +111,7 @@ internal fun isCompilationRequired(options: BuildOptions): Boolean {
internal fun keepCompilationState(options: BuildOptions): Boolean {
return !options.forceRebuild &&
(IS_PORTABLE_COMPILATION_CACHE_ENABLED ||
(isPortableCompilationCacheEnabled ||
options.useCompiledClassesFromProjectOutput ||
options.pathToCompiledClassesArchive == null ||
options.pathToCompiledClassesArchivesMetadata != null ||
@@ -141,9 +144,13 @@ internal suspend fun reuseOrCompile(context: CompilationContext, moduleNames: Co
)
}
}
IS_PORTABLE_COMPILATION_CACHE_ENABLED -> {
isPortableCompilationCacheEnabled -> {
span.addEvent("JPS remote cache will be used for compilation")
downloadJpsCacheAndCompileProject(context)
downloadCacheAndCompileProject(
forceDownload = isForceDownloadJpsCache,
gitUrl = jpsCacheRemoteGitUrl,
context = context,
)
}
else -> {
block("compile modules") {

View File

@@ -1,103 +1,23 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
import io.netty.util.AsciiString
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import org.jetbrains.intellij.build.BuildPaths.Companion.ULTIMATE_HOME
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.impl.cleanOutput
import org.jetbrains.intellij.build.impl.compilation.cache.CommitsHistory
import org.jetbrains.intellij.build.jpsCache.getJpsCacheUrl
import org.jetbrains.intellij.build.jpsCache.isPortableCompilationCacheEnabled
import org.jetbrains.intellij.build.jpsCache.jpsCacheAuthHeader
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import org.jetbrains.intellij.build.telemetry.withTracer
import org.jetbrains.jps.incremental.storage.ProjectStamps
import java.net.URI
import java.nio.file.Path
import java.util.*
import java.util.concurrent.CancellationException
import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.deleteRecursively
internal val IS_PORTABLE_COMPILATION_CACHE_ENABLED: Boolean
get() = ProjectStamps.PORTABLE_CACHES && IS_JPS_CACHE_URL_CONFIGURED
private var isAlreadyUpdated = false
internal object TestJpsCompilationCacheDownload {
@ExperimentalPathApi
@JvmStatic
fun main(args: Array<String>) = withTracer {
System.setProperty("jps.cache.test", "true")
System.setProperty("org.jetbrains.jps.portable.caches", "true")
val projectHome = ULTIMATE_HOME
val outputDir = projectHome.resolve("out/test-jps-cache-downloaded")
outputDir.deleteRecursively()
downloadJpsCache(
cacheUrl = URI(System.getProperty(URL_PROPERTY, "https://127.0.0.1:1900/cache/jps")),
gitUrl = computeRemoteGitUrl(),
authHeader = getAuthHeader(),
projectHome = projectHome,
classOutDir = outputDir.resolve("classes"),
cacheDestination = outputDir.resolve("jps-build-data"),
reportStatisticValue = { k, v ->
println("$k: $v")
}
)
}
}
internal suspend fun downloadJpsCacheAndCompileProject(context: CompilationContext) {
downloadCacheAndCompileProject(
forceDownload = System.getProperty(FORCE_DOWNLOAD_PROPERTY).toBoolean(),
gitUrl = computeRemoteGitUrl(),
context = context,
)
}
/**
* Upload local [PortableCompilationCache].
*/
suspend fun uploadPortableCompilationCache(context: CompilationContext) {
uploadJpsCache(
forcedUpload = context.options.forceRebuild,
commitHash = getCommitCache(),
s3Dir = getS3Dir(),
authHeader = getAuthHeader(),
uploadUrl = getJpsCacheUploadUrl(),
context = context,
)
}
/**
* Publish already uploaded [PortableCompilationCache].
*/
suspend fun publishPortableCompilationCache(context: CompilationContext, overrideCommits: Set<String>? = null) {
updateJpsCacheCommitHistory(
overrideCommits = overrideCommits,
remoteGitUrl = computeRemoteGitUrl(),
commitHash = getCommitCache(),
uploadUrl = getJpsCacheUploadUrl(),
authHeader = getAuthHeader(),
context = context,
s3Dir = getS3Dir(),
)
}
private fun getCommitCache() = require(COMMIT_HASH_PROPERTY, "Repository commit")
private fun getS3Dir(): Path? {
return System.getProperty(AWS_SYNC_FOLDER_PROPERTY)?.let<String, Path> { Path.of(it) }
}
/**
* Publish already uploaded [PortableCompilationCache] overriding existing [CommitsHistory].
* Used in force rebuild and cleanup.
*/
suspend fun publishUploadedJpsCacheWithCommitHistoryOverride(forceRebuiltCommits: Set<String>, context: CompilationContext) {
publishPortableCompilationCache(context = context, overrideCommits = forceRebuiltCommits)
}
/**
* Download the latest available [PortableCompilationCache],
* [resolveProjectDependencies]
@@ -110,19 +30,19 @@ suspend fun publishUploadedJpsCacheWithCommitHistoryOverride(forceRebuiltCommits
* For more details see [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter.initialize]
*/
@Suppress("KDocUnresolvedReference")
private suspend fun downloadCacheAndCompileProject(forceDownload: Boolean, gitUrl: String, context: CompilationContext) {
internal suspend fun downloadCacheAndCompileProject(forceDownload: Boolean, gitUrl: String, context: CompilationContext) {
val forceRebuild = context.options.forceRebuild
spanBuilder("download JPS cache and compile")
.setAttribute("forceRebuild", forceRebuild)
.setAttribute("forceDownload", forceDownload)
.use { span ->
val cacheUrl = URI(require(URL_PROPERTY, "Remote Cache url"))
val cacheUrl = getJpsCacheUrl()
if (isAlreadyUpdated) {
span.addEvent("PortableCompilationCache is already updated")
span.addEvent("JPS Cache is already updated")
return@use
}
check(IS_PORTABLE_COMPILATION_CACHE_ENABLED) {
check(isPortableCompilationCacheEnabled) {
"JPS Caches are expected to be enabled"
}
@@ -220,12 +140,12 @@ private class PortableCompilationCache(forceDownload: Boolean) {
classOutDir: Path,
context: CompilationContext,
): Int {
return spanBuilder("download Portable Compilation Cache").use { span ->
return spanBuilder("download JPS Cache").use { span ->
try {
downloadJpsCache(
cacheUrl = cacheUrl,
gitUrl = gitUrl,
authHeader = getAuthHeader(),
authHeader = jpsCacheAuthHeader,
projectHome = projectHome,
classOutDir = classOutDir,
cacheDestination = context.compilationData.dataStorageRoot,
@@ -236,9 +156,7 @@ private class PortableCompilationCache(forceDownload: Boolean) {
throw e
}
catch (e: Exception) {
e.printStackTrace()
span.addEvent("Failed to download Compilation Cache. Re-trying without any caches.")
span.recordException(e)
span.recordException(e, Attributes.of(AttributeKey.stringKey("message"), "Failed to download JPS Cache. Re-trying without any caches."))
context.options.forceRebuild = true
forceDownload = false
context.options.incrementalCompilation = false
@@ -257,47 +175,6 @@ internal fun portableJpsCacheUsageStatus(availableCommitDepth: Int): String {
}
}
/**
* URL for read/write operations
*/
private const val UPLOAD_URL_PROPERTY = "intellij.jps.remote.cache.upload.url"
/**
* URL for read-only operations
*/
private const val URL_PROPERTY = "intellij.jps.remote.cache.url"
private val IS_JPS_CACHE_URL_CONFIGURED = !System.getProperty(URL_PROPERTY).isNullOrBlank()
/**
* IntelliJ repository git remote url
*/
private const val GIT_REPOSITORY_URL_PROPERTY = "intellij.remote.url"
/**
* Download [PortableCompilationCache] even if there are caches available locally
*/
private const val FORCE_DOWNLOAD_PROPERTY = "intellij.jps.cache.download.force"
/**
* Folder to store [PortableCompilationCache] for later upload to AWS S3 bucket.
* Upload performed in a separate process on CI.
*/
private const val AWS_SYNC_FOLDER_PROPERTY = "jps.caches.aws.sync.folder"
/**
* Commit hash for which [PortableCompilationCache] is to be built/downloaded
*/
private const val COMMIT_HASH_PROPERTY = "build.vcs.number"
private fun require(systemProperty: String, description: String): String {
val value = System.getProperty(systemProperty)
require(!value.isNullOrBlank()) {
"$description is not defined. Please set '$systemProperty' system property."
}
return value
}
/**
* Compiled bytecode of project module
*
@@ -307,22 +184,4 @@ internal class CompilationOutput(
@JvmField val remotePath: String,
// local path to compilation output
@JvmField val path: Path,
)
private fun getJpsCacheUploadUrl(): URI = URI(require(UPLOAD_URL_PROPERTY, "Remote Cache upload url"))
private fun getAuthHeader(): CharSequence? {
val username = System.getProperty("jps.auth.spaceUsername")
val password = System.getProperty("jps.auth.spacePassword")
return when {
password == null -> null
username == null -> AsciiString.of("Bearer $password")
else -> AsciiString.of("Basic " + Base64.getEncoder().encodeToString("$username:$password".toByteArray()))
}
}
private fun computeRemoteGitUrl(): String {
val remoteGitUrl = require(GIT_REPOSITORY_URL_PROPERTY, "Repository url")
Span.current().addEvent("Git remote url $remoteGitUrl")
return remoteGitUrl
}
)

View File

@@ -10,12 +10,13 @@ import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.jetbrains.intellij.build.BuildPaths.Companion.ULTIMATE_HOME
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.http2Client.*
import org.jetbrains.intellij.build.impl.compilation.cache.CommitsHistory
import org.jetbrains.intellij.build.impl.compilation.cache.getAllCompilationOutputs
import org.jetbrains.intellij.build.jpsCache.*
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import org.jetbrains.intellij.build.telemetry.withTracer
import org.jetbrains.jps.incremental.storage.BuildTargetSourcesState
import java.net.URI
import java.nio.file.Files
@@ -23,9 +24,35 @@ import java.nio.file.Path
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.LongAdder
import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.deleteRecursively
private const val COMMITS_COUNT = 1_000
internal object TestJpsCacheDownload {
@ExperimentalPathApi
@JvmStatic
fun main(args: Array<String>) = withTracer(serviceName = "test-jps-cache-downloader") {
System.setProperty("jps.cache.test", "true")
System.setProperty("org.jetbrains.jps.portable.caches", "true")
val projectHome = ULTIMATE_HOME
val outputDir = projectHome.resolve("out/test-jps-cache-downloaded")
outputDir.deleteRecursively()
downloadJpsCache(
cacheUrl = getJpsCacheUrl("https://127.0.0.1:1900/cache/jps"),
gitUrl = jpsCacheRemoteGitUrl,
authHeader = jpsCacheAuthHeader,
projectHome = projectHome,
classOutDir = outputDir.resolve("classes"),
cacheDestination = outputDir.resolve("jps-build-data"),
reportStatisticValue = { k, v ->
println("$k: $v")
}
)
}
}
internal suspend fun downloadJpsCache(
cacheUrl: URI,
authHeader: CharSequence?,
@@ -45,7 +72,7 @@ internal suspend fun downloadJpsCache(
prepareDownload(urlPathPrefix = urlPathPrefix, gitUrl = gitUrl, connection = connection, lastCommits = Git(projectHome).log(COMMITS_COUNT))
} ?: return@checkMirrorAndConnect -1
availableCommitDepth = info.second
spanBuilder("download jps cache").setAttribute("commit", info.first).use {
spanBuilder("download JPS Cache").setAttribute("commit", info.first).use {
doDownload(
urlPathPrefix = urlPathPrefix,
lastCachedCommit = info.first,
@@ -82,7 +109,7 @@ private suspend fun downloadToFile(urlPath: String, file: Path, spanName: String
}
}
private suspend fun prepareDownload(urlPathPrefix: String, gitUrl: String, connection: Http2ClientConnection?, lastCommits: List<String>): Pair<String, Int>? {
private suspend fun prepareDownload(urlPathPrefix: String, gitUrl: String, connection: Http2ClientConnection, lastCommits: List<String>): Pair<String, Int>? {
val availableCachesKeys = getAvailableCachesKeys(urlPathPrefix = urlPathPrefix, gitUrl = gitUrl, connection = connection)
val availableCommitDepth = lastCommits.indexOfFirst {
availableCachesKeys.contains(it)
@@ -109,14 +136,10 @@ private suspend fun prepareDownload(urlPathPrefix: String, gitUrl: String, conne
return lastCachedCommit to availableCommitDepth
}
private suspend fun getAvailableCachesKeys(urlPathPrefix: String, gitUrl: String, connection: Http2ClientConnection?): Collection<String> {
val commitHistoryUrl = "$urlPathPrefix/${CommitsHistory.JSON_FILE}"
require(!commitHistoryUrl.isS3() && connection != null)
private suspend fun getAvailableCachesKeys(urlPathPrefix: String, gitUrl: String, connection: Http2ClientConnection): Collection<String> {
val commitHistoryUrl = "$urlPathPrefix/$COMMIT_HISTORY_JSON_FILE"
val json: Map<String, Set<String>> = connection.getJsonOrDefaultIfNotFound(path = commitHistoryUrl, defaultIfNotFound = emptyMap())
if (json.isEmpty()) {
return emptyList()
}
return CommitsHistory(json).commitsForRemote(gitUrl)
return if (json.isEmpty()) emptyList() else CommitHistory(json).commitsForRemote(gitUrl)
}
private suspend fun doDownload(

View File

@@ -10,17 +10,16 @@ import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.jetbrains.intellij.build.BuildMessages
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
import org.jetbrains.intellij.build.http2Client.getJsonOrDefaultIfNotFound
import org.jetbrains.intellij.build.http2Client.upload
import org.jetbrains.intellij.build.http2Client.withHttp2ClientConnectionFactory
import org.jetbrains.intellij.build.impl.compilation.cache.CommitsHistory
import org.jetbrains.intellij.build.impl.compilation.cache.getAllCompilationOutputs
import org.jetbrains.intellij.build.io.copyFile
import org.jetbrains.intellij.build.io.moveFile
import org.jetbrains.intellij.build.io.zipWithCompression
import org.jetbrains.intellij.build.jpsCache.*
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import org.jetbrains.jps.incremental.storage.BuildTargetSourcesState
@@ -35,22 +34,42 @@ import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.deleteRecursively
import kotlin.random.Random
private const val SOURCES_STATE_FILE_NAME = "target_sources_state.json"
private const val SOURCE_STATE_FILE_NAME = "target_sources_state.json"
/**
* Upload local JPS Cache
*/
suspend fun uploadJpsCache(forceUpload: Boolean, context: CompilationContext) {
uploadJpsCache(
forceUpload = forceUpload,
commitHash = jpsCacheCommit,
s3Dir = jpsCacheS3Dir,
authHeader = jpsCacheAuthHeader,
uploadUrl = jpsCacheUploadUrl,
dataStorageRoot = context.compilationData.dataStorageRoot,
classOutDir = context.classesOutputDirectory,
tempDir = context.paths.tempDir,
messages = context.messages,
)
}
internal suspend fun uploadJpsCache(
forcedUpload: Boolean,
forceUpload: Boolean,
commitHash: String,
authHeader: CharSequence?,
s3Dir: Path?,
uploadUrl: URI,
context: CompilationContext,
dataStorageRoot: Path,
classOutDir: Path,
tempDir: Path,
messages: BuildMessages,
) {
if (s3Dir != null) {
s3Dir.deleteRecursively()
Files.createDirectories(s3Dir)
}
val sourceStateFile = context.compilationData.dataStorageRoot.resolve(SOURCES_STATE_FILE_NAME)
val sourceStateFile = dataStorageRoot.resolve(SOURCE_STATE_FILE_NAME)
val sourceState = try {
Files.newBufferedReader(sourceStateFile).use {
BuildTargetSourcesState.readJson(JsonReader(it))
@@ -63,20 +82,27 @@ internal suspend fun uploadJpsCache(
val start = System.nanoTime()
val totalUploadedBytes = LongAdder()
val uploadedOutputCount = LongAdder()
val messages = context.messages
withHttp2ClientConnectionFactory(trustAll = uploadUrl.host == "127.0.0.1") { client ->
client.connect(host = uploadUrl.host, port = uploadUrl.port, authHeader = authHeader).use { connection ->
client.connect(address = uploadUrl, authHeader = authHeader).use { connection ->
val urlPathPrefix = uploadUrl.path
withContext(Dispatchers.IO) {
launch {
spanBuilder("upload jps cache").use {
uploadJpsCaches(urlPathPrefix = urlPathPrefix, connection = connection, s3Dir = s3Dir, forcedUpload = forcedUpload, commitHash = commitHash, context = context)
uploadJpsCaches(
urlPathPrefix = urlPathPrefix,
connection = connection,
s3Dir = s3Dir,
forceUpload = forceUpload,
commitHash = commitHash,
dataStorageRoot = dataStorageRoot,
tempDir = tempDir,
)
}
}
spanBuilder("upload compilation outputs").use {
val allCompilationOutputs = getAllCompilationOutputs(sourceState = sourceState, classOutDir = context.classesOutputDirectory)
val tempDir = Files.createTempDirectory(context.paths.tempDir, "jps-cache-bytecode-")
val allCompilationOutputs = getAllCompilationOutputs(sourceState = sourceState, classOutDir = classOutDir)
val zipTempDir = Files.createTempDirectory(tempDir, "jps-cache-bytecode-")
try {
uploadCompilationOutputs(
uploadedOutputCount = uploadedOutputCount,
@@ -85,12 +111,12 @@ internal suspend fun uploadJpsCache(
urlPathPrefix = urlPathPrefix,
connection = connection,
s3Dir = s3Dir,
forcedUpload = forcedUpload,
tempDir = tempDir,
forcedUpload = forceUpload,
tempDir = zipTempDir,
)
}
finally {
tempDir.deleteRecursively()
zipTempDir.deleteRecursively()
}
messages.reportStatisticValue("Total outputs", allCompilationOutputs.size.toString())
}
@@ -128,12 +154,12 @@ private suspend fun uploadJpsCaches(
urlPathPrefix: String,
connection: Http2ClientConnection,
commitHash: String,
forcedUpload: Boolean,
forceUpload: Boolean,
s3Dir: Path?,
context: CompilationContext,
dataStorageRoot: Path,
tempDir: Path,
) {
val dataStorageRoot = context.compilationData.dataStorageRoot
val zipFile = context.paths.tempDir.resolve("$commitHash-${java.lang.Long.toUnsignedString(Random.nextLong(), Character.MAX_RADIX)}.zip")
val zipFile = tempDir.resolve("$commitHash-${java.lang.Long.toUnsignedString(Random.nextLong(), Character.MAX_RADIX)}.zip")
try {
val compressed by lazy {
zipWithCompression(zipFile, mapOf(dataStorageRoot to ""))
@@ -142,7 +168,7 @@ private suspend fun uploadJpsCaches(
val cachePath = "caches/$commitHash"
val urlPath = "$urlPathPrefix/$cachePath"
if (forcedUpload || !checkExists(connection, urlPath, logIfExists = true)) {
if (forceUpload || !checkExists(connection, urlPath, logIfExists = true)) {
connection.upload(path = urlPath, file = compressed)
}
@@ -196,72 +222,7 @@ private suspend fun uploadCompilationOutputs(
}
}
/**
* Upload and publish a file with commits history
*/
internal suspend fun updateJpsCacheCommitHistory(
overrideCommits: Set<String>?,
remoteGitUrl: String,
commitHash: String,
uploadUrl: URI,
authHeader: CharSequence?,
s3Dir: Path?,
context: CompilationContext,
) {
val overrideRemoteHistory = overrideCommits != null
val commitHistory = CommitsHistory(mapOf(remoteGitUrl to (overrideCommits ?: setOf(commitHash))))
withHttp2ClientConnectionFactory(trustAll = uploadUrl.host == "127.0.0.1") { client ->
val urlPathPrefix = uploadUrl.path
client.connect(uploadUrl.host, uploadUrl.port, authHeader = authHeader).use { connection ->
for (commitHashForRemote in commitHistory.commitsForRemote(remoteGitUrl)) {
val cacheUploaded = checkExists(connection, "$urlPathPrefix/caches/$commitHashForRemote")
val metadataUploaded = checkExists(connection, "$urlPathPrefix/metadata/$commitHashForRemote")
if (!cacheUploaded && !metadataUploaded) {
val msg = "Unable to publish $commitHashForRemote due to missing caches/$commitHashForRemote and metadata/$commitHashForRemote." +
" Probably caused by previous cleanup build."
if (overrideRemoteHistory) {
context.messages.error(msg)
}
else {
context.messages.warning(msg)
}
return@use false
}
check(cacheUploaded == metadataUploaded) {
"JPS Caches are uploaded: $cacheUploaded, metadata is uploaded: $metadataUploaded"
}
}
val newHistory = if (overrideRemoteHistory) commitHistory else commitHistory + remoteCommitHistory(connection, urlPathPrefix)
connection.upload(path = "$urlPathPrefix/${CommitsHistory.JSON_FILE}", file = writeCommitHistory(commitHistory = newHistory, context = context, s3Dir = s3Dir))
val expected = newHistory.commitsForRemote(remoteGitUrl).toSet()
val actual = remoteCommitHistory(connection, urlPathPrefix).commitsForRemote(remoteGitUrl).toSet()
val missing = expected - actual
val unexpected = actual - expected
check(missing.none() && unexpected.none()) {
"""
Missing: $missing
Unexpected: $unexpected
""".trimIndent()
}
}
}
}
private suspend fun remoteCommitHistory(connection: Http2ClientConnection, urlPathPrefix: String): CommitsHistory {
return CommitsHistory(connection.getJsonOrDefaultIfNotFound(path = "$urlPathPrefix/${CommitsHistory.JSON_FILE}", defaultIfNotFound = emptyMap()))
}
private fun writeCommitHistory(commitHistory: CommitsHistory, context: CompilationContext, s3Dir: Path?): Path {
val commitHistoryFile = (s3Dir ?: context.paths.tempDir).resolve(CommitsHistory.JSON_FILE)
Files.createDirectories(commitHistoryFile.parent)
val json = commitHistory.toJson()
Files.writeString(commitHistoryFile, json)
Span.current().addEvent("write commit history", Attributes.of(AttributeKey.stringKey("data"), json))
return commitHistoryFile
}
private suspend fun checkExists(
internal suspend fun checkExists(
connection: Http2ClientConnection,
urlPath: String,
logIfExists: Boolean = false,

View File

@@ -6,10 +6,7 @@ import io.opentelemetry.api.common.Attributes
import kotlinx.coroutines.isActive
import okio.IOException
import org.jetbrains.intellij.build.forEachConcurrent
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
import org.jetbrains.intellij.build.http2Client.Http2ClientConnectionFactory
import org.jetbrains.intellij.build.http2Client.ZstdDecompressContextPool
import org.jetbrains.intellij.build.http2Client.download
import org.jetbrains.intellij.build.http2Client.*
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import java.net.URI

View File

@@ -1,14 +1,12 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation.cache
package org.jetbrains.intellij.build.jpsCache
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
class CommitsHistory(private val commitsPerRemote: Map<String, Set<String>>) {
companion object {
internal const val JSON_FILE = "commit_history.json"
}
internal const val COMMIT_HISTORY_JSON_FILE = "commit_history.json"
class CommitHistory(private val commitsPerRemote: Map<String, Set<String>>) {
constructor(json: String) : this(Json.decodeFromString<Map<String, Set<String>>>(json))
fun toJson(): String {
@@ -19,12 +17,12 @@ class CommitsHistory(private val commitsPerRemote: Map<String, Set<String>>) {
return commitsPerRemote[remote] ?: emptyList()
}
operator fun plus(other: CommitsHistory): CommitsHistory {
return CommitsHistory(union(other.commitsPerRemote))
operator fun plus(other: CommitHistory): CommitHistory {
return CommitHistory(union(other.commitsPerRemote))
}
operator fun minus(other: CommitsHistory): CommitsHistory {
return CommitsHistory(subtract(other.commitsPerRemote))
operator fun minus(other: CommitHistory): CommitHistory {
return CommitHistory(subtract(other.commitsPerRemote))
}
private fun union(map: Map<String, Set<String>>): Map<String, Set<String>> {

View File

@@ -0,0 +1,78 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.jpsCache
import io.netty.util.AsciiString
import io.opentelemetry.api.trace.Span
import org.jetbrains.jps.incremental.storage.ProjectStamps
import java.net.URI
import java.nio.file.Path
import java.util.*
/**
* URL for read-only operations
*/
private const val URL_PROPERTY = "intellij.jps.remote.cache.url"
/**
* IntelliJ repository git remote url
*/
private const val GIT_REPOSITORY_URL_PROPERTY = "intellij.remote.url"
/**
* Commit hash for which JPS Cache is to be built/downloaded
*/
private const val COMMIT_HASH_PROPERTY = "build.vcs.number"
private val IS_JPS_CACHE_URL_CONFIGURED = !System.getProperty(URL_PROPERTY).isNullOrBlank()
internal val isPortableCompilationCacheEnabled: Boolean
get() = ProjectStamps.PORTABLE_CACHES && IS_JPS_CACHE_URL_CONFIGURED
/**
* Folder to store JPS Cache for later upload to AWS S3 bucket.
* Upload performed in a separate process on CI.
*/
internal val jpsCacheS3Dir: Path?
get() = System.getProperty("jps.caches.aws.sync.folder")?.let<String, Path> { Path.of(it) }
/**
* Whether to download JPS Cache even if there are caches available locally.
*/
internal val isForceDownloadJpsCache: Boolean
get() = System.getProperty("intellij.jps.cache.download.force").toBoolean()
internal fun getJpsCacheUrl(default: String? = null): URI {
return URI(getRequiredSystemProperty(systemProperty = URL_PROPERTY, description = "Remote Cache url", default = default).trimEnd('/'))
}
internal val jpsCacheUploadUrl: URI
get() = URI(getRequiredSystemProperty("intellij.jps.remote.cache.upload.url", "Remote Cache upload url").trimEnd('/'))
internal val jpsCacheCommit: String
get() = getRequiredSystemProperty(COMMIT_HASH_PROPERTY, "Repository commit")
internal val jpsCacheRemoteGitUrl: String
get() {
val remoteGitUrl = getRequiredSystemProperty(GIT_REPOSITORY_URL_PROPERTY, "Repository url")
Span.current().addEvent("Git remote url $remoteGitUrl")
return remoteGitUrl
}
internal val jpsCacheAuthHeader: CharSequence?
get() {
val username = System.getProperty("jps.auth.spaceUsername")
val password = System.getProperty("jps.auth.spacePassword")
return when {
password == null -> null
username == null -> AsciiString.of("Bearer $password")
else -> AsciiString.of("Basic " + Base64.getEncoder().encodeToString("$username:$password".toByteArray()))
}
}
private fun getRequiredSystemProperty(systemProperty: String, description: String, default: String? = null): String {
val value = System.getProperty(systemProperty) ?: default
require(!value.isNullOrBlank()) {
"$description is not defined. Please set '$systemProperty' system property."
}
return value
}

View File

@@ -1,5 +1,5 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation.cache
package org.jetbrains.intellij.build.jpsCache
import org.jetbrains.intellij.build.impl.compilation.CompilationOutput
import org.jetbrains.jps.builders.java.JavaModuleBuildTargetType

View File

@@ -0,0 +1,135 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.jpsCache
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import org.jetbrains.intellij.build.http2Client.*
import org.jetbrains.intellij.build.impl.compilation.checkExists
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
/**
* Publish already uploaded JPS Cache.
*/
suspend fun publishUploadedJpsCacheCache(overrideCommits: Set<String>? = null) {
updateJpsCacheCommitHistory(
overrideCommits = overrideCommits,
remoteGitUrl = jpsCacheRemoteGitUrl,
commitHash = jpsCacheCommit,
uploadUrl = jpsCacheUploadUrl,
authHeader = jpsCacheAuthHeader,
s3Dir = jpsCacheS3Dir,
)
}
/**
* Upload and publish a file with commits history
*/
internal suspend fun updateJpsCacheCommitHistory(
overrideCommits: Set<String>?,
remoteGitUrl: String,
commitHash: String,
uploadUrl: URI,
authHeader: CharSequence?,
s3Dir: Path?,
) {
val overrideRemoteHistory = overrideCommits != null
val commits = overrideCommits ?: setOf(commitHash)
spanBuilder("update JPS Cache commit history")
.setAttribute("url", uploadUrl.toString())
.setAttribute("overrideRemoteHistory", overrideRemoteHistory)
.setAttribute("s3Dir", s3Dir?.toString() ?: "")
.setAttribute(AttributeKey.stringArrayKey("commits"), java.util.List.copyOf(commits))
.use {
val commitHistory = CommitHistory(java.util.Map.of(remoteGitUrl, commits))
withHttp2ClientConnectionFactory(trustAll = uploadUrl.host == "127.0.0.1") { client ->
checkMirrorAndConnect(initialServerUri = uploadUrl, authHeader = authHeader, client = client) { connection, urlPathPrefix ->
val uploaded = checkThatJpsCacheWasUploaded(
commitHistory = commitHistory,
remoteGitUrl = remoteGitUrl,
urlPathPrefix = urlPathPrefix,
connection = connection,
overrideRemoteHistory = overrideRemoteHistory,
)
if (!uploaded) {
return@checkMirrorAndConnect
}
val newHistory = if (overrideRemoteHistory) commitHistory else commitHistory + getRemoteCommitHistory(connection, urlPathPrefix)
val serializedNewHistory = newHistory.toJson()
client.connect(address = uploadUrl, authHeader = authHeader).use { connectionForPut ->
connectionForPut.upload(path = "${uploadUrl.path}/$COMMIT_HISTORY_JSON_FILE", data = serializedNewHistory)
}
if (s3Dir != null) {
val commitHistoryFile = s3Dir.resolve(COMMIT_HISTORY_JSON_FILE)
Files.createDirectories(commitHistoryFile.parent)
Files.writeString(commitHistoryFile, serializedNewHistory)
Span.current().addEvent(
"write commit history",
Attributes.of(
AttributeKey.stringKey("data"), serializedNewHistory,
AttributeKey.stringKey("file"), commitHistoryFile.toString()
),
)
}
verify(newHistory = newHistory, remoteGitUrl = remoteGitUrl, connectionForGet = connection, urlPathPrefixForGet = urlPathPrefix)
}
}
}
}
private suspend fun checkThatJpsCacheWasUploaded(
commitHistory: CommitHistory,
remoteGitUrl: String,
urlPathPrefix: String,
connection: Http2ClientConnection,
overrideRemoteHistory: Boolean,
): Boolean {
for (commitHashForRemote in commitHistory.commitsForRemote(remoteGitUrl)) {
val cacheUrl = "$urlPathPrefix/caches/$commitHashForRemote"
val cacheUploaded = checkExists(connection, cacheUrl)
val metadataUrlPath = "$urlPathPrefix/metadata/$commitHashForRemote"
val metadataUploaded = checkExists(connection, metadataUrlPath)
if (!cacheUploaded && !metadataUploaded) {
val message = "Unable to publish $commitHashForRemote due to missing $cacheUrl and $metadataUrlPath. Probably caused by previous cleanup build."
if (overrideRemoteHistory) {
throw RuntimeException(message)
}
else {
Span.current().addEvent(message)
}
return false
}
check(cacheUploaded == metadataUploaded) {
"JPS Caches are uploaded: $cacheUploaded, metadata is uploaded: $metadataUploaded"
}
}
return true
}
private suspend fun verify(
newHistory: CommitHistory,
remoteGitUrl: String,
connectionForGet: Http2ClientConnection,
urlPathPrefixForGet: String,
) {
val expected = newHistory.commitsForRemote(remoteGitUrl).toSet()
val actual = getRemoteCommitHistory(connectionForGet, urlPathPrefixForGet).commitsForRemote(remoteGitUrl).toSet()
val missing = expected - actual
val unexpected = actual - expected
check(missing.none() && unexpected.none()) {
"""
Missing: $missing
Unexpected: $unexpected
""".trimIndent()
}
}
private suspend fun getRemoteCommitHistory(connection: Http2ClientConnection, urlPathPrefix: String): CommitHistory {
return CommitHistory(connection.getJsonOrDefaultIfNotFound(path = "$urlPathPrefix/$COMMIT_HISTORY_JSON_FILE", defaultIfNotFound = emptyMap()))
}

View File

@@ -1,5 +1,5 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation
package org.jetbrains.intellij.build.jpsCache
import java.util.concurrent.TimeUnit
@@ -16,8 +16,6 @@ private fun Process.waitWithTimeout(): Int {
return exitValue()
}
internal fun String.isS3() = startsWith("s3://")
/**
* Executes 'aws s3 [args]' process
*/

View File

@@ -1,15 +1,16 @@
// Copyright 2000-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation.cache
import org.jetbrains.intellij.build.jpsCache.CommitHistory
import org.junit.Test
class CommitsHistoryTest {
class CommitHistoryTest {
@Test
fun `commit-history json union test`() {
val union = CommitsHistory("""{
val union = CommitHistory("""{
"remote1" : [ "commit1.1", "commit1.2" ],
"remote2" : [ "commit2.1", "commit2.2" ]
}""") + CommitsHistory("""{
}""") + CommitHistory("""{
"remote1" : [ "commit1.3", "commit1.3" ],
"remote3" : [ "commit3.1" ]
}""")
@@ -20,10 +21,10 @@ class CommitsHistoryTest {
@Test
fun `commit-history json subtraction test`() {
val subtraction = CommitsHistory("""{
val subtraction = CommitHistory("""{
"remote1" : [ "commit1.1", "commit1.2" ],
"remote2" : [ "commit2.1", "commit2.2" ]
}""") - CommitsHistory("""{
}""") - CommitHistory("""{
"remote1" : [ "commit1.2", "commit1.3" ],
"remote2" : [ "commit2.1", "commit2.3" ],
"remote3" : [ "commit3.1" ]