[llm] LME-482 Export eval data to langfuse

[llm] [LME-482] Export eval data to langfuse

(cherry picked from commit 8a104fd4273a649af928255ec11fc8f03c9cbc39)

[ai assistant] LLM-17486 Add basic classes for OT export

(cherry picked from commit fa8cea73f8bb6f258ea5139da3593368efeec107)


Co-authored-by: Nikolai Bogdanov <nikolai.bogdanov@jetbrains.com>

Merge-request: IJ-MR-168589
Merged-by: Nikolai Bogdanov <nikolai.bogdanov@jetbrains.com>

GitOrigin-RevId: 2caae57547fd51330a29cdd85f0f17264fea3ec9
This commit is contained in:
Mikhail Mazurkevich
2025-07-21 11:34:36 +00:00
committed by intellij-monorepo-bot
parent d40044bdd4
commit c9a77c5ca0
4 changed files with 52 additions and 7 deletions

View File

@@ -7534,6 +7534,13 @@ jvm_import(
source_jar = "@opentelemetry-exporter-common-1_48_0-sources_http//file"
)
java_library(
name = "opentelemetry-exporter-otlp-common-provided",
exports = [":opentelemetry-exporter-otlp-common"],
neverlink = True,
visibility = ["//visibility:public"]
)
jvm_import(
name = "opentelemetry-extension-kotlin",
jar = "@opentelemetry-extension-kotlin-1_48_0_http//file",

View File

@@ -0,0 +1,28 @@
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.diagnostic.telemetry.exporters
import com.intellij.platform.diagnostic.telemetry.AsyncSpanExporter
import io.opentelemetry.sdk.trace.data.SpanData
import org.jetbrains.annotations.ApiStatus
/**
* A general-purpose wrapper for any [com.intellij.platform.diagnostic.telemetry.AsyncSpanExporter] that adds filtering capability.
* This allows filtering spans before they are passed to the wrapped exporter.
*/
@ApiStatus.Internal
class FilterableAsyncSpanExporter(
private val delegate: AsyncSpanExporter,
private val filter: (SpanData) -> Boolean
) : AsyncSpanExporter {
override suspend fun export(spans: Collection<SpanData>) {
val filteredSpans = spans.filter(filter)
if (filteredSpans.isNotEmpty()) delegate.export(filteredSpans)
}
override suspend fun flush(): Unit = delegate.flush()
override suspend fun reset(): Unit = delegate.reset()
override suspend fun shutdown(): Unit = delegate.shutdown()
}

View File

@@ -15,13 +15,16 @@ import org.jetbrains.annotations.ApiStatus.Internal
import java.net.ConnectException
@Internal
class OtlpSpanExporter(private val traceUrl: String) : AsyncSpanExporter {
class OtlpSpanExporter(
private val traceUrl: String,
private val authorizationHeader: String? = null
) : AsyncSpanExporter {
override suspend fun export(spans: Collection<SpanData>) {
try {
val item = TraceRequestMarshaler.create(spans)
httpPost(traceUrl, contentLength = item.binarySerializedSize.toLong(), contentType = ContentType.XProtobuf) {
httpPost(traceUrl, contentLength = item.binarySerializedSize.toLong(), contentType = ContentType.XProtobuf, body = {
item.writeBinaryTo(this)
}
}, authorizationHeader)
}
catch (e: CancellationException) {
throw e
@@ -35,9 +38,9 @@ class OtlpSpanExporter(private val traceUrl: String) : AsyncSpanExporter {
}
companion object {
suspend fun exportBackendData(traceUrl: String, receivedBytes: ByteArray) {
suspend fun exportBackendData(traceUrl: String, receivedBytes: ByteArray, authorizationHeader: String? = null) {
runCatching {
httpPost(url = traceUrl, contentType = ContentType.XProtobuf, body = receivedBytes)
httpPost(url = traceUrl, contentType = ContentType.XProtobuf, body = receivedBytes, authorizationHeader)
}.getOrLogException(thisLogger())
}
}

View File

@@ -5,6 +5,7 @@ import io.ktor.client.HttpClient
import io.ktor.client.engine.java.Java
import io.ktor.client.plugins.HttpRequestRetry
import io.ktor.client.plugins.HttpTimeout
import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.http.content.ByteArrayContent
@@ -35,15 +36,21 @@ sealed interface ContentType {
private class ContentTypeImpl(@JvmField val contentType: io.ktor.http.ContentType) : ContentType
@ApiStatus.Internal
suspend fun httpPost(url: String, contentLength: Long, contentType: ContentType, body: suspend OutputStream.() -> Unit) {
suspend fun httpPost(url: String, contentLength: Long, contentType: ContentType, body: suspend OutputStream.() -> Unit, authorizationHeader: String? = null) {
httpClient.post(url) {
headers {
authorizationHeader?.let { append("Authorization", it) }
}
setBody(OutputStreamContent(body, (contentType as ContentTypeImpl).contentType, status = null, contentLength))
}
}
@ApiStatus.Internal
suspend fun httpPost(url: String, contentType: ContentType, body: ByteArray) {
suspend fun httpPost(url: String, contentType: ContentType, body: ByteArray, authorizationHeader: String? = null) {
httpClient.post(url) {
headers {
authorizationHeader?.let { append("Authorization", it) }
}
setBody(ByteArrayContent(body, (contentType as ContentTypeImpl).contentType))
}
}