prefer stateless pure functions, otherwise not easy to understand the state flow and what is used in what scenario

GitOrigin-RevId: 64d899b65c09c0cde3a55d1ef8ce3a26e5e032d9
This commit is contained in:
Vladimir Krivosheev
2024-09-02 21:39:10 +02:00
committed by intellij-monorepo-bot
parent 8cb1f0414b
commit 18a927dcbb
10 changed files with 240 additions and 179 deletions

View File

@@ -87,7 +87,7 @@ class TeamCityBuildMessageLogger : BuildMessageLogger() {
}
finally {
print(ServiceMessageTypes.BLOCK_CLOSED)
TraceManager.exportPendingSpans()
TraceManager.scheduleExportPendingSpans()
}
}
}

View File

@@ -17,16 +17,22 @@ import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.data.SpanData
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.runBlocking
import org.jetbrains.intellij.build.dependencies.BuildDependenciesDownloader
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration.Companion.seconds
@Suppress("SSBasedInspection")
var traceManagerInitializer: () -> Pair<Tracer, BatchSpanProcessor> = {
val batchSpanProcessor = BatchSpanProcessor(coroutineScope = CoroutineScope(Job()), spanExporters = JaegerJsonSpanExporterManager.spanExporterProvider)
val batchSpanProcessor = BatchSpanProcessor(
scheduleDelay = 10.seconds,
coroutineScope = CoroutineScope(Job()),
spanExporters = JaegerJsonSpanExporterManager.spanExporterProvider,
)
val tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(batchSpanProcessor)
.setResource(Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), "builder")))
@@ -64,9 +70,9 @@ object TraceManager {
batchSpanProcessor.forceShutdown()
}
suspend fun exportPendingSpans() {
suspend fun scheduleExportPendingSpans() {
if (isEnabled) {
batchSpanProcessor.doFlush(exportOnly = true)
batchSpanProcessor.scheduleFlush()
}
}
}
@@ -103,7 +109,7 @@ object JaegerJsonSpanExporterManager {
if (addShutDownHook && shutdownHookAdded.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(
Thread({
runBlocking {
runBlocking(Dispatchers.IO) {
TraceManager.shutdown()
}
}, "close tracer"))

View File

@@ -25,6 +25,7 @@ suspend fun <T> SpanBuilder.block(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
operation: suspend CoroutineScope.(Span) -> T,
): T {
TraceManager.scheduleExportPendingSpans()
return startSpan().useWithoutActiveScope { span ->
// see `use` below why `withContext` must be inner
TeamCityBuildMessageLogger.withBlock(span) {

View File

@@ -6,7 +6,6 @@ import org.jetbrains.intellij.build.dependencies.DependenciesProperties
import org.jetbrains.intellij.build.impl.BundledRuntime
import org.jetbrains.intellij.build.impl.CompilationTasksImpl
import org.jetbrains.intellij.build.impl.JpsCompilationData
import org.jetbrains.intellij.build.impl.compilation.PortableCompilationCache
import org.jetbrains.intellij.build.moduleBased.OriginalModuleRepository
import org.jetbrains.jps.model.JpsModel
import org.jetbrains.jps.model.JpsProject
@@ -22,7 +21,6 @@ interface CompilationContext {
val dependenciesProperties: DependenciesProperties
val bundledRuntime: BundledRuntime
val compilationData: JpsCompilationData
val portableCompilationCache: PortableCompilationCache
fun isStepSkipped(step: String): Boolean = options.buildStepsToSkip.contains(step)

View File

@@ -28,7 +28,10 @@ import org.jetbrains.intellij.build.dependencies.DependenciesProperties
import org.jetbrains.intellij.build.dependencies.JdkDownloader
import org.jetbrains.intellij.build.impl.JdkUtils.defineJdk
import org.jetbrains.intellij.build.impl.JdkUtils.readModulesFromReleaseFile
import org.jetbrains.intellij.build.impl.compilation.*
import org.jetbrains.intellij.build.impl.compilation.checkCompilationOptions
import org.jetbrains.intellij.build.impl.compilation.isCompilationRequired
import org.jetbrains.intellij.build.impl.compilation.keepCompilationState
import org.jetbrains.intellij.build.impl.compilation.reuseOrCompile
import org.jetbrains.intellij.build.impl.logging.BuildMessagesHandler
import org.jetbrains.intellij.build.impl.logging.BuildMessagesImpl
import org.jetbrains.intellij.build.impl.moduleBased.OriginalModuleRepositoryImpl
@@ -129,10 +132,6 @@ class CompilationContextImpl private constructor(
override lateinit var compilationData: JpsCompilationData
override val portableCompilationCache: PortableCompilationCache by lazy {
PortableCompilationCache(this)
}
@Volatile
private var cachedJdkHome: Path? = null
@@ -220,6 +219,7 @@ class CompilationContextImpl private constructor(
context.enableCoroutinesDump(it)
}
}
spanBuilder("prepare for build").use {
context.prepareForBuild()
}
@@ -480,9 +480,9 @@ internal suspend fun cleanOutput(context: CompilationContext, keepCompilationSta
context.paths.jpsArtifacts,
)
val outputDirectoriesToKeep = buildSet {
this.add(context.paths.logDir)
add(context.paths.logDir)
if (keepCompilationState) {
this.addAll(compilationState)
addAll(compilationState)
}
}
spanBuilder("clean output").use { span ->

View File

@@ -94,7 +94,7 @@ internal fun checkCompilationOptions(context: CompilationContext) {
if (options.forceRebuild && pathToCompiledClassArchiveMetadata != null) {
messages.error("Both '${BuildOptions.FORCE_REBUILD_PROPERTY}' and '${BuildOptions.INTELLIJ_BUILD_COMPILER_CLASSES_ARCHIVES_METADATA}' options are specified")
}
if (options.isInDevelopmentMode && ProjectStamps.PORTABLE_CACHES) {
if (options.isInDevelopmentMode && ProjectStamps.PORTABLE_CACHES && !System.getProperty("jps.cache.test").toBoolean()) {
messages.error("${ProjectStamps.PORTABLE_CACHES_PROPERTY} is not expected to be enabled in development mode due to performance penalty")
}
if (!options.useCompiledClassesFromProjectOutput) {
@@ -146,11 +146,18 @@ internal suspend fun reuseOrCompile(context: CompilationContext, moduleNames: Co
}
IS_PORTABLE_COMPILATION_CACHE_ENABLED -> {
span.addEvent("JPS remote cache will be used for compilation")
context.portableCompilationCache.downloadCacheAndCompileProject()
downloadJpsCacheAndCompileProject(context)
}
else -> {
block("compile modules") {
doCompile(moduleNames = moduleNames, includingTestsInModules = includingTestsInModules, availableCommitDepth = -1, context = context)
doCompile(
moduleNames = moduleNames,
includingTestsInModules = includingTestsInModules,
availableCommitDepth = -1,
context = context,
// null - we checked IS_PORTABLE_COMPILATION_CACHE_ENABLED above
handleCompilationFailureBeforeRetry = null,
)
}
return
}
@@ -174,6 +181,7 @@ internal suspend fun doCompile(
includingTestsInModules: List<String>? = null,
availableCommitDepth: Int,
context: CompilationContext,
handleCompilationFailureBeforeRetry: (suspend (successMessage: String) -> String)?,
) {
check(JavaVersion.current().isAtLeast(17)) {
"Build script must be executed under Java 17 to compile intellij project but it's executed under Java ${JavaVersion.current()}"
@@ -187,7 +195,7 @@ internal suspend fun doCompile(
try {
val (status, isIncrementalCompilation) = when {
context.options.forceRebuild -> "forced rebuild" to false
availableCommitDepth >= 0 -> context.portableCompilationCache.usageStatus(availableCommitDepth) to true
availableCommitDepth >= 0 -> portableJpsCacheUsageStatus(availableCommitDepth) to true
isIncrementalCompilationDataAvailable(context) -> "compiled using local cache" to true
else -> "clean build" to false
}
@@ -219,7 +227,14 @@ internal suspend fun doCompile(
context.messages.buildStatus(status)
}
catch (e: Exception) {
retryCompilation(context = context, runner = runner, moduleNames = moduleNames, includingTestsInModules = includingTestsInModules, e = e)
retryCompilation(
context = context,
runner = runner,
moduleNames = moduleNames,
includingTestsInModules = includingTestsInModules,
e = e,
handleCompilationFailureBeforeRetry = handleCompilationFailureBeforeRetry,
)
}
}
@@ -256,7 +271,8 @@ private suspend fun retryCompilation(
runner: JpsCompilationRunner,
moduleNames: Collection<String>?,
includingTestsInModules: List<String>?,
e: Exception
e: Exception,
handleCompilationFailureBeforeRetry: (suspend (successMessage: String) -> String)?,
) {
if (!context.options.incrementalCompilation) {
throw e
@@ -266,6 +282,7 @@ private suspend fun retryCompilation(
"'${BuildOptions.INCREMENTAL_COMPILATION_FALLBACK_REBUILD_PROPERTY}' is false.")
throw e
}
var successMessage = "Clean build retry"
when {
e is TimeoutCancellationException -> {
@@ -274,8 +291,8 @@ private suspend fun retryCompilation(
cleanOutput(context = context, keepCompilationState = false)
context.options.incrementalCompilation = false
}
IS_PORTABLE_COMPILATION_CACHE_ENABLED -> {
successMessage = context.portableCompilationCache.handleCompilationFailureBeforeRetry(successMessage)
handleCompilationFailureBeforeRetry != null -> {
successMessage = handleCompilationFailureBeforeRetry(successMessage)
}
else -> {
Span.current().addEvent("Incremental compilation failed. Re-trying with clean build.")

View File

@@ -2,9 +2,14 @@
package org.jetbrains.intellij.build.impl.compilation
import io.opentelemetry.api.trace.Span
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import org.jetbrains.intellij.build.BuildOptions
import org.jetbrains.intellij.build.BuildPaths.Companion.ULTIMATE_HOME
import org.jetbrains.intellij.build.CompilationContext
import org.jetbrains.intellij.build.impl.cleanOutput
import org.jetbrains.intellij.build.impl.compilation.cache.CommitsHistory
import org.jetbrains.intellij.build.impl.createCompilationContext
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.block
import org.jetbrains.intellij.build.telemetry.use
@@ -18,108 +23,134 @@ internal val IS_PORTABLE_COMPILATION_CACHE_ENABLED: Boolean
private var isAlreadyUpdated = false
class PortableCompilationCache(private val context: CompilationContext) {
private val git = Git(context.paths.projectHome)
/**
* Server which stores [PortableCompilationCache]
*/
internal inner class RemoteCache {
val url by lazy { require(URL_PROPERTY, "Remote Cache url") }
val uploadUrl by lazy { require(UPLOAD_URL_PROPERTY, "Remote Cache upload url") }
val authHeader by lazy {
val username = System.getProperty("jps.auth.spaceUsername")
val password = System.getProperty("jps.auth.spacePassword")
when {
password == null -> ""
username == null -> "Bearer $password"
else -> "Basic " + Base64.getEncoder().encodeToString("$username:$password".toByteArray())
}
internal object TestJpsCompilationCacheDownload {
@JvmStatic
fun main(args: Array<String>) = runBlocking(Dispatchers.Default) {
System.setProperty("jps.cache.test", "true")
System.setProperty("org.jetbrains.jps.portable.caches", "true")
if (System.getProperty(URL_PROPERTY) == null) {
System.setProperty(URL_PROPERTY, "https://127.0.0.1:1900")
}
val shouldBeDownloaded: Boolean
get() = !forceRebuild && !isLocalCacheUsed()
}
private var forceDownload = System.getProperty(FORCE_DOWNLOAD_PROPERTY).toBoolean()
private val forceRebuild = context.options.forceRebuild
private val remoteCache = RemoteCache()
private val remoteGitUrl by lazy {
val result = require(GIT_REPOSITORY_URL_PROPERTY, "Repository url")
context.messages.info("Git remote url $result")
result
}
private val downloader by lazy {
PortableCompilationCacheDownloader(context = context, git = git, remoteCache = remoteCache, gitUrl = remoteGitUrl)
}
private val uploader by lazy {
val s3Folder = Path.of(require(AWS_SYNC_FOLDER_PROPERTY, "AWS S3 sync folder"))
val commitHash = require(COMMIT_HASH_PROPERTY, "Repository commit")
PortableCompilationCacheUploader(
context = context,
remoteCache = remoteCache,
remoteGitUrl = remoteGitUrl,
commitHash = commitHash,
s3Folder = s3Folder,
forcedUpload = forceRebuild,
val projectHome = ULTIMATE_HOME
val outputDir = projectHome.resolve("out/compilation")
val context = createCompilationContext(
projectHome = projectHome,
defaultOutputRoot = outputDir,
options = BuildOptions(
incrementalCompilation = true,
useCompiledClassesFromProjectOutput = true,
),
)
downloadCacheAndCompileProject(forceDownload = false, context = context)
}
}
/**
* Download the latest available [PortableCompilationCache],
* [resolveProjectDependencies]
* and perform incremental compilation if necessary.
*
* If rebuild is forced, an incremental compilation flag has to be set to false; otherwise backward-refs won't be created.
* During rebuild, JPS checks condition [org.jetbrains.jps.backwardRefs.index.CompilerReferenceIndex.exists] || [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter.isRebuildInAllJavaModules]
* and if incremental compilation is enabled, JPS won't create [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter].
* For more details see [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter.initialize]
*/
internal suspend fun downloadCacheAndCompileProject() {
spanBuilder("download JPS cache and compile")
.setAttribute("forceRebuild", forceRebuild)
.setAttribute("forceDownload", forceDownload)
.block { span ->
if (isAlreadyUpdated) {
span.addEvent("PortableCompilationCache is already updated")
return@block
}
internal suspend fun downloadJpsCacheAndCompileProject(context: CompilationContext) {
downloadCacheAndCompileProject(forceDownload = System.getProperty(FORCE_DOWNLOAD_PROPERTY).toBoolean(), context = context)
}
check(IS_PORTABLE_COMPILATION_CACHE_ENABLED) {
"JPS Caches are expected to be enabled"
}
/**
* Upload local [PortableCompilationCache] to [PortableJpsCacheRemoteCacheConfig]
*/
suspend fun uploadPortableCompilationCache(context: CompilationContext) {
createPortableCompilationCacheUploader(context).upload(context.messages)
}
if (forceRebuild || forceDownload) {
cleanOutput(context = context, keepCompilationState = false)
}
/**
* Publish already uploaded [PortableCompilationCache] to [PortableJpsCacheRemoteCacheConfig]
*/
suspend fun publishPortableCompilationCache(context: CompilationContext) {
createPortableCompilationCacheUploader(context).updateCommitHistory()
}
val availableCommitDepth = if (remoteCache.shouldBeDownloaded) {
downloadCache(downloader)
}
else {
-1
}
/**
* Publish already uploaded [PortableCompilationCache] to [PortableJpsCacheRemoteCacheConfig] overriding existing [CommitsHistory].
* Used in force rebuild and cleanup.
*/
suspend fun publishUploadedJpsCacheWithCommitHistoryOverride(forceRebuiltCommits: Set<String>, context: CompilationContext) {
createPortableCompilationCacheUploader(context).updateCommitHistory(overrideCommits = forceRebuiltCommits)
}
context.options.incrementalCompilation = !forceRebuild
// compilation is executed unconditionally here even if the exact commit cache is downloaded
// to have an additional validation step and not to ignore a local changes, for example, in TeamCity Remote Run
doCompile(availableCommitDepth = availableCommitDepth, context = context)
isAlreadyUpdated = true
context.options.incrementalCompilation = true
/**
* Download the latest available [PortableCompilationCache],
* [resolveProjectDependencies]
* and perform incremental compilation if necessary.
*
* If rebuild is forced, an incremental compilation flag has to be set to false; otherwise backward-refs won't be created.
* During rebuild,
* JPS checks condition [org.jetbrains.jps.backwardRefs.index.CompilerReferenceIndex.exists] || [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter.isRebuildInAllJavaModules]
* and if incremental compilation is enabled, JPS won't create [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter].
* For more details see [org.jetbrains.jps.backwardRefs.JavaBackwardReferenceIndexWriter.initialize]
*/
@Suppress("KDocUnresolvedReference")
private suspend fun downloadCacheAndCompileProject(forceDownload: Boolean, context: CompilationContext) {
val forceRebuild = context.options.forceRebuild
spanBuilder("download JPS cache and compile")
.setAttribute("forceRebuild", forceRebuild)
.setAttribute("forceDownload", forceDownload)
.block { span ->
if (isAlreadyUpdated) {
span.addEvent("PortableCompilationCache is already updated")
return@block
}
}
private fun isLocalCacheUsed() = !forceRebuild && !forceDownload && isIncrementalCompilationDataAvailable(context)
check(IS_PORTABLE_COMPILATION_CACHE_ENABLED) {
"JPS Caches are expected to be enabled"
}
if (forceRebuild || forceDownload) {
cleanOutput(context = context, keepCompilationState = false)
}
val downloader = PortableCompilationCacheDownloader(
context = context,
git = Git(context.paths.projectHome),
remoteCache = PortableJpsCacheRemoteCacheConfig(),
gitUrl = computeRemoteGitUrl(),
)
val portableCompilationCache = PortableCompilationCache(forceDownload = forceDownload)
val availableCommitDepth = if (!forceRebuild && (forceDownload || !isIncrementalCompilationDataAvailable(context))) {
portableCompilationCache.downloadCache(downloader, context)
}
else {
-1
}
context.options.incrementalCompilation = !forceRebuild
// compilation is executed unconditionally here even if the exact commit cache is downloaded
// to have an additional validation step and not to ignore a local changes, for example, in TeamCity Remote Run
doCompile(
availableCommitDepth = availableCommitDepth,
context = context,
handleCompilationFailureBeforeRetry = { successMessage ->
portableCompilationCache.handleCompilationFailureBeforeRetry(
successMessage = successMessage,
portableCompilationCacheDownloader = downloader,
forceDownload = portableCompilationCache.forceDownload,
context = context,
)
},
)
isAlreadyUpdated = true
context.options.incrementalCompilation = true
}
}
private class PortableCompilationCache(forceDownload: Boolean) {
var forceDownload: Boolean = forceDownload
private set
/**
* @return updated [successMessage]
*/
internal suspend fun handleCompilationFailureBeforeRetry(successMessage: String): String {
check(IS_PORTABLE_COMPILATION_CACHE_ENABLED) {
"JPS Caches are expected to be enabled"
}
suspend fun handleCompilationFailureBeforeRetry(
successMessage: String,
portableCompilationCacheDownloader: PortableCompilationCacheDownloader,
context: CompilationContext,
forceDownload: Boolean,
): String {
when {
forceDownload -> {
Span.current().addEvent("Incremental compilation using Remote Cache failed. Re-trying with clean build.")
@@ -131,39 +162,17 @@ class PortableCompilationCache(private val context: CompilationContext) {
// If download isn't forced, then locally available cache will be used which may suffer from those issues.
// Hence, compilation failure. Replacing local cache with remote one may help.
Span.current().addEvent("Incremental compilation using locally available caches failed. Re-trying using Remote Cache.")
val availableCommitDepth = downloadCache(downloader)
val availableCommitDepth = downloadCache(portableCompilationCacheDownloader, context)
if (availableCommitDepth >= 0) {
return usageStatus(availableCommitDepth)
return portableJpsCacheUsageStatus(availableCommitDepth)
}
}
}
return successMessage
}
/**
* Upload local [PortableCompilationCache] to [PortableCompilationCache.RemoteCache]
*/
suspend fun upload() {
uploader.upload(context.messages)
}
/**
* Publish already uploaded [PortableCompilationCache] to [PortableCompilationCache.RemoteCache]
*/
suspend fun publish() {
uploader.updateCommitHistory()
}
/**
* Publish already uploaded [PortableCompilationCache] to [PortableCompilationCache.RemoteCache] overriding existing [CommitsHistory].
* Used in force rebuild and cleanup.
*/
suspend fun overrideCommitHistory(forceRebuiltCommits: Set<String>) {
uploader.updateCommitHistory(commitHistory = CommitsHistory(mapOf(remoteGitUrl to forceRebuiltCommits)), overrideRemoteHistory = true)
}
private suspend fun downloadCache(downloader: PortableCompilationCacheDownloader): Int {
spanBuilder("downloading Portable Compilation Cache").use { span ->
suspend fun downloadCache(downloader: PortableCompilationCacheDownloader, context: CompilationContext): Int {
return spanBuilder("downloading Portable Compilation Cache").use { span ->
try {
downloader.download()
}
@@ -178,17 +187,17 @@ class PortableCompilationCache(private val context: CompilationContext) {
forceDownload = false
context.options.incrementalCompilation = false
cleanOutput(context = context, keepCompilationState = false)
-1
}
}
return downloader.getAvailableCommitDepth()
}
}
internal fun usageStatus(availableCommitDepth: Int): String {
return when (availableCommitDepth) {
0 -> "all classes reused from JPS remote cache"
1 -> "1 commit compiled using JPS remote cache"
else -> "$availableCommitDepth commits compiled using JPS remote cache"
}
internal fun portableJpsCacheUsageStatus(availableCommitDepth: Int): String {
return when (availableCommitDepth) {
0 -> "all classes reused from JPS remote cache"
1 -> "1 commit compiled using JPS remote cache"
else -> "$availableCommitDepth commits compiled using JPS remote cache"
}
}
@@ -203,7 +212,7 @@ private const val UPLOAD_URL_PROPERTY = "intellij.jps.remote.cache.upload.url"
private const val URL_PROPERTY = "intellij.jps.remote.cache.url"
/**
* If true then [PortableCompilationCache.RemoteCache] is configured to be used
* If true then [PortableJpsCacheRemoteCacheConfig] is configured to be used
*/
private val IS_CONFIGURED = !System.getProperty(URL_PROPERTY).isNullOrBlank()
@@ -231,7 +240,7 @@ private const val COMMIT_HASH_PROPERTY = "build.vcs.number"
private fun require(systemProperty: String, description: String): String {
val value = System.getProperty(systemProperty)
require(!value.isNullOrBlank()) {
"$description is not defined. Please set '$systemProperty' system property."
"$description is not defined. Please set '$systemProperty' system property."
}
return value
}
@@ -246,3 +255,37 @@ internal class CompilationOutput(
// local path to compilation output
@JvmField val path: Path,
)
/**
* Server which stores [PortableCompilationCache]
*/
internal class PortableJpsCacheRemoteCacheConfig {
val url by lazy { require(URL_PROPERTY, "Remote Cache url") }
val uploadUrl by lazy { require(UPLOAD_URL_PROPERTY, "Remote Cache upload url") }
val authHeader by lazy {
val username = System.getProperty("jps.auth.spaceUsername")
val password = System.getProperty("jps.auth.spacePassword")
when {
password == null -> ""
username == null -> "Bearer $password"
else -> "Basic " + Base64.getEncoder().encodeToString("$username:$password".toByteArray())
}
}
}
internal fun createPortableCompilationCacheUploader(context: CompilationContext): PortableCompilationCacheUploader {
return PortableCompilationCacheUploader(
context = context,
remoteCache = PortableJpsCacheRemoteCacheConfig(),
remoteGitUrl = computeRemoteGitUrl(),
commitHash = require(COMMIT_HASH_PROPERTY, "Repository commit"),
s3Folder = Path.of(require(AWS_SYNC_FOLDER_PROPERTY, "AWS S3 sync folder")),
forcedUpload = context.options.forceRebuild,
)
}
private fun computeRemoteGitUrl(): String {
val remoteGitUrl = require(GIT_REPOSITORY_URL_PROPERTY, "Repository url")
Span.current().addEvent("Git remote url $remoteGitUrl")
return remoteGitUrl
}

View File

@@ -31,7 +31,7 @@ private const val COMMITS_COUNT = 1_000
internal class PortableCompilationCacheDownloader(
private val context: CompilationContext,
private val git: Git,
private val remoteCache: PortableCompilationCache.RemoteCache,
private val remoteCache: PortableJpsCacheRemoteCacheConfig,
private val gitUrl: String,
) {
private val remoteCacheUrl = remoteCache.url.trimEnd('/')
@@ -88,16 +88,6 @@ internal class PortableCompilationCacheDownloader(
}
}
@OptIn(DelicateCoroutinesApi::class)
private val availableCommitDepthLazyTask = GlobalScope.async(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
val availableCachesKeys = availableCachesKeysLazyTask.await()
lastCommits.indexOfFirst {
availableCachesKeys.contains(it)
}
}
suspend fun getAvailableCommitDepth(): Int = availableCommitDepthLazyTask.await()
private suspend fun isExist(path: String): Boolean {
return spanBuilder("head").setAttribute("url", remoteCacheUrl).use {
retryWithExponentialBackOff {
@@ -106,11 +96,15 @@ internal class PortableCompilationCacheDownloader(
}
}
suspend fun download() {
val availableCommitDepth = getAvailableCommitDepth()
suspend fun download(): Int {
val availableCachesKeys = availableCachesKeysLazyTask.await()
val availableCommitDepth = lastCommits.indexOfFirst {
availableCachesKeys.contains(it)
}
if (availableCommitDepth !in 0 until lastCommits.count()) {
Span.current().addEvent("Unable to find cache for any of last ${lastCommits.count()} commits.")
return
return -1
}
val lastCachedCommit = lastCommits.get(availableCommitDepth)
@@ -148,6 +142,8 @@ internal class PortableCompilationCacheDownloader(
reportStatisticValue("jps-cache:downloaded:bytes", totalDownloadedBytes.sum().toString())
reportStatisticValue("jps-cache:downloaded:count", total.toString())
reportStatisticValue("jps-cache:notFound:count", notFound.sum().toString())
return availableCommitDepth
}
private suspend fun downloadAndUnpackJpsCache(commitHash: String, notFound: LongAdder, totalBytes: LongAdder) {

View File

@@ -42,16 +42,12 @@ private val MEDIA_TYPE_BINARY = "application/octet-stream".toMediaType()
internal class PortableCompilationCacheUploader(
private val context: CompilationContext,
private val remoteCache: PortableCompilationCache.RemoteCache,
private val remoteGitUrl: String,
internal val remoteCache: PortableJpsCacheRemoteCacheConfig,
@JvmField val remoteGitUrl: String,
private val commitHash: String,
private val s3Folder: Path,
private val forcedUpload: Boolean,
) {
private val uploader = Uploader(remoteCache.uploadUrl, remoteCache.authHeader)
private val commitHistory = CommitsHistory(mapOf(remoteGitUrl to setOf(commitHash)))
init {
s3Folder.deleteRecursively()
Files.createDirectories(s3Folder)
@@ -75,11 +71,12 @@ internal class PortableCompilationCacheUploader(
val start = System.nanoTime()
val totalUploadedBytes = AtomicLong()
val uploadedOutputCount = AtomicInteger()
val uploader = Uploader(remoteCache.uploadUrl, remoteCache.authHeader)
withContext(Dispatchers.IO) {
// Jps Caches upload is started first because of significant size
launch {
spanBuilder("upload jps cache").use {
uploadJpsCaches()
uploadJpsCaches(uploader)
}
}
@@ -100,7 +97,7 @@ internal class PortableCompilationCacheUploader(
messages.reportStatisticValue("Uploaded outputs", uploadedOutputCount.get().toString())
uploadMetadata(sourceStateFile)
uploadMetadata(sourceStateFile = sourceStateFile, uploader = uploader)
uploadToS3()
}
@@ -110,7 +107,7 @@ internal class PortableCompilationCacheUploader(
}
}
private suspend fun uploadJpsCaches() {
private suspend fun uploadJpsCaches(uploader: Uploader) {
val dataStorageRoot = context.compilationData.dataStorageRoot
val zipFile = dataStorageRoot.parent.resolve(commitHash)
Compressor.Zip(zipFile).use { zip ->
@@ -123,7 +120,7 @@ internal class PortableCompilationCacheUploader(
moveFile(zipFile, s3Folder.resolve(cachePath))
}
private suspend fun uploadMetadata(sourceStateFile: Path) {
private suspend fun uploadMetadata(sourceStateFile: Path, uploader: Uploader) {
val metadataPath = "metadata/$commitHash"
spanBuilder("upload metadata").setAttribute("path", metadataPath).use {
uploader.upload(metadataPath, sourceStateFile)
@@ -162,7 +159,10 @@ internal class PortableCompilationCacheUploader(
/**
* Upload and publish a file with commits history
*/
suspend fun updateCommitHistory(commitHistory: CommitsHistory = this.commitHistory, overrideRemoteHistory: Boolean = false) {
internal suspend fun updateCommitHistory(overrideCommits: Set<String>? = null) {
val overrideRemoteHistory = overrideCommits != null
val commitHistory = CommitsHistory(mapOf(remoteGitUrl to (overrideCommits ?: setOf(commitHash))))
val uploader = Uploader(serverUrl = remoteCache.uploadUrl, authHeader = remoteCache.authHeader)
for (commitHash in commitHistory.commitsForRemote(remoteGitUrl)) {
val cacheUploaded = uploader.isExist("caches/$commitHash")
val metadataUploaded = uploader.isExist("metadata/$commitHash")
@@ -176,10 +176,10 @@ internal class PortableCompilationCacheUploader(
"JPS Caches are uploaded: $cacheUploaded, metadata is uploaded: $metadataUploaded"
}
}
val newHistory = if (overrideRemoteHistory) commitHistory else commitHistory + remoteCommitHistory()
val newHistory = if (overrideRemoteHistory) commitHistory else commitHistory + remoteCommitHistory(uploader)
uploader.upload(path = CommitsHistory.JSON_FILE, file = writeCommitHistory(newHistory))
val expected = newHistory.commitsForRemote(remoteGitUrl).toSet()
val actual = remoteCommitHistory().commitsForRemote(remoteGitUrl).toSet()
val actual = remoteCommitHistory(uploader).commitsForRemote(remoteGitUrl).toSet()
val missing = expected - actual
val unexpected = actual - expected
check(missing.none() && unexpected.none()) {
@@ -190,7 +190,7 @@ internal class PortableCompilationCacheUploader(
}
}
private suspend fun remoteCommitHistory(): CommitsHistory {
private suspend fun remoteCommitHistory(uploader: Uploader): CommitsHistory {
return if (uploader.isExist(CommitsHistory.JSON_FILE)) {
CommitsHistory(uploader.getAsString(CommitsHistory.JSON_FILE, remoteCache.authHeader))
}
@@ -209,7 +209,7 @@ internal class PortableCompilationCacheUploader(
}
}
private class Uploader(serverUrl: String, val authHeader: String) {
private class Uploader(serverUrl: String, @JvmField val authHeader: String) {
private val serverUrl = serverUrl.trimEnd('/')
suspend fun upload(path: String, file: Path) {

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("RAW_RUN_BLOCKING")
package com.intellij.platform.diagnostic.telemetry.exporters
@@ -132,16 +132,16 @@ class BatchSpanProcessor(
}
suspend fun flush() {
doFlush(exportOnly = false)
}
suspend fun doFlush(exportOnly: Boolean) {
val flushRequest = FlushRequest(exportOnly = exportOnly)
val flushRequest = FlushRequest(exportOnly = false)
if (!flushRequested.trySend(flushRequest).isClosed) {
flushRequest.job.join()
}
}
suspend fun scheduleFlush() {
flushRequested.send(FlushRequest(exportOnly = true))
}
override fun forceFlush(): CompletableResultCode {
throw UnsupportedOperationException()
}