[jps cache] IJI-2336 Upload commit_history.json with remote modification check using ETag and If-Match

IJ-CR-162857

(cherry picked from commit c495876127ce514010a81dcbacfc1c9e612ca719)

GitOrigin-RevId: 0fce0bfdfaf895fcb55a744bdb75ef5e8471f8bf
This commit is contained in:
Vladislav Rassokhin
2025-05-09 14:59:49 +02:00
committed by intellij-monorepo-bot
parent f3755ce471
commit 84b2a35a5d
4 changed files with 112 additions and 24 deletions

View File

@@ -1,6 +1,7 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.http2Client
import com.intellij.openapi.util.Ref
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.ByteBufInputStream
import io.netty.buffer.ByteBufUtil
@@ -10,6 +11,7 @@ import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http2.Http2DataFrame
import io.netty.handler.codec.http2.Http2Headers
import io.netty.handler.codec.http2.Http2HeadersFrame
import io.netty.handler.codec.http2.Http2StreamFrame
import io.netty.util.AsciiString
@@ -25,9 +27,9 @@ private val json = Json {
ignoreUnknownKeys = true
}
internal suspend fun Http2ClientConnection.getString(path: CharSequence): String {
internal suspend fun Http2ClientConnection.getString(path: CharSequence, headers: Ref<Http2Headers>? = null): String {
return connection.stream { stream, result ->
stream.pipeline().addLast(Http2StreamJsonInboundHandler(urlPath = path, result = result, bufferAllocator = stream.alloc(), deserializer = null))
stream.pipeline().addLast(Http2StreamJsonInboundHandler(urlPath = path, result = result, bufferAllocator = stream.alloc(), deserializer = null, headers = headers))
stream.writeHeaders(
headers = createHeaders(
@@ -100,6 +102,7 @@ private class Http2StreamJsonInboundHandler<T : Any>(
private val result: CompletableDeferred<T>,
private val deserializer: DeserializationStrategy<T>?,
private val defaultIfNotFound: T? = null,
private val headers: Ref<Http2Headers>? = null,
) : InboundHandlerResultTracker<Http2StreamFrame>(result) {
private var isGzip = false
@@ -152,7 +155,8 @@ private class Http2StreamJsonInboundHandler<T : Any>(
result.complete(response)
}
else if (frame is Http2HeadersFrame) {
val status = HttpResponseStatus.parseLine(frame.headers().status())
val headers = frame.headers()
val status = HttpResponseStatus.parseLine(headers.status())
if (status != HttpResponseStatus.OK) {
if (status == HttpResponseStatus.NOT_FOUND && defaultIfNotFound != null) {
result.complete(defaultIfNotFound)
@@ -163,7 +167,8 @@ private class Http2StreamJsonInboundHandler<T : Any>(
return
}
isGzip = frame.headers().get(HttpHeaderNames.CONTENT_ENCODING) == HttpHeaderValues.GZIP
isGzip = headers.get(HttpHeaderNames.CONTENT_ENCODING) == HttpHeaderValues.GZIP
this.headers?.set(headers)
}
}
}

View File

@@ -1,4 +1,4 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:Suppress("DuplicatedCode", "SSBasedInspection")
package org.jetbrains.intellij.build.http2Client
@@ -32,15 +32,16 @@ internal val READ_OPERATION = EnumSet.of(StandardOpenOption.READ)
internal data class UploadResult(@JvmField var uploadedSize: Long, @JvmField var fileSize: Long)
internal suspend fun Http2ClientConnection.upload(path: CharSequence, file: Path): UploadResult {
return upload(path = path, file = file, sourceBlockSize = 1024 * 1024, zstdCompressContextPool = null)
internal suspend fun Http2ClientConnection.upload(path: CharSequence, file: Path, extraHeaders: List<AsciiString> = emptyList()): UploadResult {
return upload(path = path, file = file, sourceBlockSize = 1024 * 1024, zstdCompressContextPool = null, extraHeaders = extraHeaders)
}
internal suspend fun Http2ClientConnection.upload(path: CharSequence, data: CharSequence) {
internal suspend fun Http2ClientConnection.upload(path: CharSequence, data: CharSequence, extraHeaders: List<AsciiString> = emptyList()) {
val headers = withDefaultContentType(extraHeaders).toTypedArray()
return connection.stream { stream, result ->
stream.pipeline().addLast(WebDavPutStatusChecker(result))
stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM), endStream = false)
stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), *headers), endStream = false)
stream.writeData(ByteBufUtil.writeUtf8(stream.alloc(), data), endStream = true)
}
}
@@ -51,12 +52,14 @@ internal suspend fun Http2ClientConnection.upload(
sourceBlockSize: Int = 4 * 1024 * 1024,
zstdCompressContextPool: ZstdCompressContextPool?,
isDir: Boolean = false,
extraHeaders: List<AsciiString> = emptyList(),
): UploadResult {
val headers = withDefaultContentType(extraHeaders).toTypedArray()
return connection.stream { stream, result ->
val status = CompletableDeferred<Unit>(parent = result)
stream.pipeline().addLast(WebDavPutStatusChecker(status))
stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM), endStream = false)
stream.writeHeaders(createHeaders(HttpMethod.PUT, AsciiString.of(path), *headers), endStream = false)
val uploadResult = if (zstdCompressContextPool == null) {
FileChannel.open(file, READ_OPERATION).use { channel ->
@@ -84,6 +87,21 @@ internal suspend fun Http2ClientConnection.upload(
}
}
private fun withDefaultContentType(extraHeaders: List<AsciiString>): ArrayList<AsciiString> {
require(extraHeaders.size % 2 == 0) {
"extraHeaders must be a list of key-value pairs, got $extraHeaders"
}
val headers: ArrayList<AsciiString>
if (extraHeaders.contains(HttpHeaderNames.CONTENT_TYPE)) {
headers = arrayListOf()
}
else {
headers = arrayListOf(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
}
headers.addAll(extraHeaders)
return headers
}
private suspend fun compressFile(
file: Path,
zstdCompressContextPool: ZstdCompressContextPool,

View File

@@ -1,10 +1,21 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.jpsCache
import com.intellij.openapi.util.Ref
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http2.Http2Headers
import io.netty.util.AsciiString
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import org.jetbrains.intellij.build.http2Client.*
import org.jetbrains.intellij.build.http2Client.Http2ClientConnection
import org.jetbrains.intellij.build.http2Client.UnexpectedHttpStatus
import org.jetbrains.intellij.build.http2Client.checkMirrorAndConnect
import org.jetbrains.intellij.build.http2Client.getJsonOrDefaultIfNotFound
import org.jetbrains.intellij.build.http2Client.getString
import org.jetbrains.intellij.build.http2Client.upload
import org.jetbrains.intellij.build.http2Client.withHttp2ClientConnectionFactory
import org.jetbrains.intellij.build.impl.compilation.checkExists
import org.jetbrains.intellij.build.telemetry.TraceManager.spanBuilder
import org.jetbrains.intellij.build.telemetry.use
@@ -59,10 +70,14 @@ internal suspend fun updateJpsCacheCommitHistory(
return@checkMirrorAndConnect
}
val newHistory = if (overrideRemoteHistory) commitHistory else commitHistory + getRemoteCommitHistory(connection, urlPathPrefix)
val serializedNewHistory = newHistory.toJson()
client.connect(address = uploadUrl, authHeader = authHeader).use { connectionForPut ->
connectionForPut.upload(path = "${uploadUrl.path}/$COMMIT_HISTORY_JSON_FILE", data = serializedNewHistory)
val (newHistory: CommitHistory, serializedNewHistory) = withCheckingRemoteModifications(
overrideRemoteHistory, commitHistory, connection,
urlPathPrefix
) { content, headers ->
client.connect(address = uploadUrl, authHeader = authHeader).use { connectionForPut ->
val path = "${uploadUrl.path}/$COMMIT_HISTORY_JSON_FILE"
connectionForPut.upload(path, content, headers)
}
}
if (s3Dir != null) {
val commitHistoryFile = s3Dir.resolve(COMMIT_HISTORY_JSON_FILE)
@@ -83,6 +98,55 @@ internal suspend fun updateJpsCacheCommitHistory(
}
}
private suspend fun withCheckingRemoteModifications(
overrideRemoteHistory: Boolean,
commitHistory: CommitHistory,
connection: Http2ClientConnection,
urlPathPrefix: String,
doUpload: suspend (String, List<AsciiString>) -> Unit,
): Pair<CommitHistory, String> {
if (overrideRemoteHistory) {
val serializedNewHistory = commitHistory.toJson()
doUpload(serializedNewHistory, emptyList())
return Pair(commitHistory, serializedNewHistory)
}
var attempts = 0
while (true) {
val responseHeaders = Ref<Http2Headers>()
val content = connection.getString("$urlPathPrefix/$COMMIT_HISTORY_JSON_FILE", responseHeaders)
val remoteCommitHistory = CommitHistory(content)
val newHistory = commitHistory + remoteCommitHistory
val serializedNewHistory = newHistory.toJson()
val uploadHeaders = arrayListOf(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
responseHeaders.get().get(HttpHeaderNames.ETAG)?.let { etag ->
uploadHeaders.add(HttpHeaderNames.IF_MATCH)
uploadHeaders.add(AsciiString.of(etag.removePrefix("W/"))) // when compressing response (gzip), nginx marks etag as weak
}
responseHeaders.get().get(HttpHeaderNames.LAST_MODIFIED)?.let { date ->
uploadHeaders.add(HttpHeaderNames.IF_UNMODIFIED_SINCE)
uploadHeaders.add(AsciiString.of(date))
}
try {
doUpload(serializedNewHistory, uploadHeaders)
return Pair(newHistory, serializedNewHistory)
}
catch (e: Throwable) {
if (attempts > 10) throw e
if (e is UnexpectedHttpStatus) {
val code = e.status.code()
if (code == 409 || code == 412) {
Span.current().addEvent("got $code response, will try to upload commit history again")
attempts++
continue
}
}
throw e
}
}
}
private suspend fun checkThatJpsCacheWasUploaded(
commitHistory: CommitHistory,
remoteGitUrl: String,

View File

@@ -1,6 +1,7 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.impl.compilation.cache
import org.assertj.core.api.BDDAssertions.then
import org.jetbrains.intellij.build.jpsCache.CommitHistory
import org.junit.Test
@@ -14,9 +15,9 @@ class CommitHistoryTest {
"remote1" : [ "commit1.3", "commit1.3" ],
"remote3" : [ "commit3.1" ]
}""")
assert((union.commitsForRemote("remote1") - listOf("commit1.1", "commit1.2", "commit1.3")).isEmpty())
assert((union.commitsForRemote("remote2") - listOf("commit2.1", "commit2.2")).isEmpty())
assert((union.commitsForRemote("remote3") - listOf("commit3.1")).isEmpty())
then(union.commitsForRemote("remote1")).containsExactlyInAnyOrder("commit1.1", "commit1.2", "commit1.3")
then(union.commitsForRemote("remote2")).containsExactlyInAnyOrder("commit2.1", "commit2.2")
then(union.commitsForRemote("remote3")).containsExactlyInAnyOrder("commit3.1")
}
@Test
@@ -29,8 +30,8 @@ class CommitHistoryTest {
"remote2" : [ "commit2.1", "commit2.3" ],
"remote3" : [ "commit3.1" ]
}""")
assert((subtraction.commitsForRemote("remote1") - "commit1.1").isEmpty())
assert((subtraction.commitsForRemote("remote2") - "commit2.2").isEmpty())
assert(subtraction.commitsForRemote("remote3").isEmpty())
then(subtraction.commitsForRemote("remote1")).containsExactlyInAnyOrder("commit1.1")
then(subtraction.commitsForRemote("remote2")).containsExactlyInAnyOrder("commit2.2")
then(subtraction.commitsForRemote("remote3")).isEmpty()
}
}