From e7429a52657e69faf8b4d51e54ccb1109d5f353c Mon Sep 17 00:00:00 2001 From: Nikita Kudrin Date: Tue, 23 Jul 2024 11:31:32 +0300 Subject: [PATCH] [benchmarks] Implementing reset for telemetry exporters GitOrigin-RevId: 654ed95a45602685d61fc614ca496e13b59682fd --- .../src/TelemetryManagerImpl.kt | 5 +++ .../src/BatchSpanProcessor.kt | 23 ++++++++++- .../src/JaegerJsonSpanExporter.kt | 38 ++++++++++++++----- .../diagnostic/telemetry/AsyncSpanExporter.kt | 3 ++ .../diagnostic/telemetry/TelemetryManager.kt | 23 +++++++++++ .../testFramework/common/src/LeakHunter.java | 1 - .../benchmark/PerformanceTestInfoImpl.java | 11 +++--- 7 files changed, 86 insertions(+), 18 deletions(-) diff --git a/platform/diagnostic/telemetry-impl/src/TelemetryManagerImpl.kt b/platform/diagnostic/telemetry-impl/src/TelemetryManagerImpl.kt index e583b6e68162..8a51a6c4d2ef 100644 --- a/platform/diagnostic/telemetry-impl/src/TelemetryManagerImpl.kt +++ b/platform/diagnostic/telemetry-impl/src/TelemetryManagerImpl.kt @@ -169,6 +169,11 @@ class TelemetryManagerImpl(coroutineScope: CoroutineScope, isUnitTestMode: Boole log.info("OpenTelemetry metrics were flushed") } + + @TestOnly + override suspend fun resetExporters() { + batchSpanProcessor?.reset() + } } private class IntelliJTracerImpl(private val scope: Scope, private val otlpService: OtlpService) : IntelliJTracer { diff --git a/platform/diagnostic/telemetry.exporters/src/BatchSpanProcessor.kt b/platform/diagnostic/telemetry.exporters/src/BatchSpanProcessor.kt index b3fb8cd7656e..e257f81a4645 100644 --- a/platform/diagnostic/telemetry.exporters/src/BatchSpanProcessor.kt +++ b/platform/diagnostic/telemetry.exporters/src/BatchSpanProcessor.kt @@ -16,6 +16,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.selects.onTimeout import kotlinx.coroutines.selects.select import org.jetbrains.annotations.ApiStatus.Internal +import org.jetbrains.annotations.TestOnly import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -26,13 +27,14 @@ class BatchSpanProcessor( private val coroutineScope: CoroutineScope, private val spanExporters: List, private val scheduleDelay: Duration = 1.minutes, - private val maxExportBatchSize: Int = 512 + private val maxExportBatchSize: Int = 512, ) : SpanProcessor { private val queue = Channel(capacity = Channel.UNLIMITED) private val flushRequested = Channel(capacity = Channel.UNLIMITED) private data class FlushRequest(@JvmField val exportOnly: Boolean) { - @JvmField val job: CompletableDeferred = CompletableDeferred() + @JvmField + val job: CompletableDeferred = CompletableDeferred() } init { @@ -177,4 +179,21 @@ class BatchSpanProcessor( } } } + + @TestOnly + suspend fun reset() { + for (spanExporter in spanExporters) { + try { + withTimeout(30.seconds) { + spanExporter.reset() + } + } + catch (e: CancellationException) { + throw e + } + catch (e: Exception) { + logger().error("Failed to reset", e) + } + } + } } diff --git a/platform/diagnostic/telemetry.exporters/src/JaegerJsonSpanExporter.kt b/platform/diagnostic/telemetry.exporters/src/JaegerJsonSpanExporter.kt index d10c314b84cb..c2b66b47d208 100644 --- a/platform/diagnostic/telemetry.exporters/src/JaegerJsonSpanExporter.kt +++ b/platform/diagnostic/telemetry.exporters/src/JaegerJsonSpanExporter.kt @@ -27,17 +27,22 @@ import java.util.concurrent.TimeUnit @ApiStatus.Internal class JaegerJsonSpanExporter( file: Path, - serviceName: String, - serviceVersion: String? = null, - serviceNamespace: String? = null, + val serviceName: String, + val serviceVersion: String? = null, + val serviceNamespace: String? = null, ) : AsyncSpanExporter { override val exporterVersion: Int = 2 private val fileChannel: FileChannel - private val writer: JsonGenerator + private var writer: JsonGenerator private val lock = Mutex() + private fun initWriter() = JsonFactory().createGenerator(Channels.newOutputStream(fileChannel)) + .configure(com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET, true) + // Channels.newOutputStream doesn't implement flush, but just to be sure + .configure(com.fasterxml.jackson.core.JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false) + init { val parent = file.parent Files.createDirectories(parent) @@ -46,10 +51,7 @@ class JaegerJsonSpanExporter( StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) - writer = JsonFactory().createGenerator(Channels.newOutputStream(fileChannel)) - .configure(com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET, true) - // Channels.newOutputStream doesn't implement flush, but just to be sure - .configure(com.fasterxml.jackson.core.JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false) + writer = initWriter() beginWriter(w = writer, serviceName = serviceName, @@ -190,7 +192,7 @@ class JaegerJsonSpanExporter( override suspend fun flush() { lock.withReentrantLock { - // if shutdown was already invoked OR nothing has been written to the temp file + // if shutdown was already invoked OR nothing has been written to the output file if (writer.isClosed) { return@withReentrantLock } @@ -201,6 +203,24 @@ class JaegerJsonSpanExporter( fileChannel.position(fileChannel.position() - jsonEnd.size) } } + + override suspend fun reset() { + lock.withReentrantLock { + // if shutdown was already invoked OR nothing has been written to the output file + if (writer.isClosed) { + return@withReentrantLock + } + + writer = initWriter() + fileChannel.truncate(0) + + beginWriter(w = writer, + serviceName = serviceName, + serviceVersion = serviceVersion, + serviceNamespace = serviceNamespace, + exporterVersion = exporterVersion) + } + } } private fun beginWriter( diff --git a/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/AsyncSpanExporter.kt b/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/AsyncSpanExporter.kt index 15b491f45772..aafbe6d8bc69 100644 --- a/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/AsyncSpanExporter.kt +++ b/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/AsyncSpanExporter.kt @@ -12,5 +12,8 @@ interface AsyncSpanExporter { suspend fun flush() {} + /** Should clean any previously exported metrics */ + suspend fun reset() {} + suspend fun shutdown() {} } \ No newline at end of file diff --git a/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/TelemetryManager.kt b/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/TelemetryManager.kt index 7abf3fd8a3ce..f2c5f2f87de1 100644 --- a/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/TelemetryManager.kt +++ b/platform/diagnostic/telemetry/src/com/intellij/platform/diagnostic/telemetry/TelemetryManager.kt @@ -100,7 +100,26 @@ interface TelemetryManager { @Suppress("unused") @TestOnly fun forceFlushMetricsBlocking() { + runBlocking { forceFlushMetrics() } + } + + /** + * Should reset all registered exporters [com.intellij.platform.diagnostic.telemetry.AsyncSpanExporter.reset]. + * Meaning that spans and meters should be exported and discarded, so that resulting artifacts will not contain previously collected data. + */ + @TestOnly + suspend fun resetExporters() + + /** + * Discard previously collected metrics and invoke flush. + * @see resetExporters + * @see forceFlushMetrics + */ + @Suppress("unused") + @TestOnly + fun reset() { runBlocking { + resetExporters() forceFlushMetrics() } } @@ -152,6 +171,10 @@ class NoopTelemetryManager : TelemetryManager { override suspend fun forceFlushMetrics() { logger().info("Cannot force flushing metrics for Noop telemetry manager") } + + override suspend fun resetExporters() { + logger().info("Cannot clean exported metrics for Noop telemetry manager") + } } // suspend here is required to get parent span from coroutine context; that's why a version without `suspend` is called `rootSpan`. diff --git a/platform/testFramework/common/src/LeakHunter.java b/platform/testFramework/common/src/LeakHunter.java index 36f17d66d26b..84e56cb86e0b 100644 --- a/platform/testFramework/common/src/LeakHunter.java +++ b/platform/testFramework/common/src/LeakHunter.java @@ -21,7 +21,6 @@ import com.intellij.util.ReflectionUtil; import com.intellij.util.io.PersistentEnumeratorCache; import com.intellij.util.ref.DebugReflectionUtil; import com.intellij.util.ui.UIUtil; -import kotlin.Suppress; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; diff --git a/tools/intellij.tools.ide.metrics.benchmark/src/com/intellij/tools/ide/metrics/benchmark/PerformanceTestInfoImpl.java b/tools/intellij.tools.ide.metrics.benchmark/src/com/intellij/tools/ide/metrics/benchmark/PerformanceTestInfoImpl.java index 8cb456085d60..fddc50c515d4 100644 --- a/tools/intellij.tools.ide.metrics.benchmark/src/com/intellij/tools/ide/metrics/benchmark/PerformanceTestInfoImpl.java +++ b/tools/intellij.tools.ide.metrics.benchmark/src/com/intellij/tools/ide/metrics/benchmark/PerformanceTestInfoImpl.java @@ -56,7 +56,7 @@ public class PerformanceTestInfoImpl implements PerformanceTestInfo { private String uniqueTestName; // at least full qualified test name (plus other identifiers, optionally) @NotNull private final IJTracer tracer; - private ArrayList metricsCollectors = new ArrayList<>(); + private final ArrayList metricsCollectors = new ArrayList<>(); private boolean useDefaultSpanMetricExporter = true; @@ -109,11 +109,10 @@ public class PerformanceTestInfoImpl implements PerformanceTestInfo { } } - private static void cleanupOutdatedMeters() { + private static void cleanupOutdatedMetrics() { try { - // force spans and meters to be written to disk before any test starts - // it's at least what we can do to minimize interference of the same meter on different tests - TelemetryManager.getInstance().forceFlushMetricsBlocking(); + // force spans and meters to be exported and discarded to minimize interference of the same metric on different tests + TelemetryManager.getInstance().reset(); // remove content of the previous tests from the idea.log IJPerfMetricsPublisher.Companion.truncateTestLog(); @@ -139,7 +138,7 @@ public class PerformanceTestInfoImpl implements PerformanceTestInfo { public PerformanceTestInfoImpl() { initOpenTelemetry(); - cleanupOutdatedMeters(); + cleanupOutdatedMetrics(); this.tracer = TelemetryManager.getInstance().getTracer(new Scope("performanceUnitTests", null)); }