[benchmarks] Implementing reset for telemetry exporters

GitOrigin-RevId: 654ed95a45602685d61fc614ca496e13b59682fd
This commit is contained in:
Nikita Kudrin
2024-07-23 11:31:32 +03:00
committed by intellij-monorepo-bot
parent 26e37ff340
commit e7429a5265
7 changed files with 86 additions and 18 deletions

View File

@@ -169,6 +169,11 @@ class TelemetryManagerImpl(coroutineScope: CoroutineScope, isUnitTestMode: Boole
log.info("OpenTelemetry metrics were flushed") log.info("OpenTelemetry metrics were flushed")
} }
@TestOnly
override suspend fun resetExporters() {
batchSpanProcessor?.reset()
}
} }
private class IntelliJTracerImpl(private val scope: Scope, private val otlpService: OtlpService) : IntelliJTracer { private class IntelliJTracerImpl(private val scope: Scope, private val otlpService: OtlpService) : IntelliJTracer {

View File

@@ -16,6 +16,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.onTimeout import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select import kotlinx.coroutines.selects.select
import org.jetbrains.annotations.ApiStatus.Internal import org.jetbrains.annotations.ApiStatus.Internal
import org.jetbrains.annotations.TestOnly
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
@@ -26,13 +27,14 @@ class BatchSpanProcessor(
private val coroutineScope: CoroutineScope, private val coroutineScope: CoroutineScope,
private val spanExporters: List<AsyncSpanExporter>, private val spanExporters: List<AsyncSpanExporter>,
private val scheduleDelay: Duration = 1.minutes, private val scheduleDelay: Duration = 1.minutes,
private val maxExportBatchSize: Int = 512 private val maxExportBatchSize: Int = 512,
) : SpanProcessor { ) : SpanProcessor {
private val queue = Channel<ReadableSpan>(capacity = Channel.UNLIMITED) private val queue = Channel<ReadableSpan>(capacity = Channel.UNLIMITED)
private val flushRequested = Channel<FlushRequest>(capacity = Channel.UNLIMITED) private val flushRequested = Channel<FlushRequest>(capacity = Channel.UNLIMITED)
private data class FlushRequest(@JvmField val exportOnly: Boolean) { private data class FlushRequest(@JvmField val exportOnly: Boolean) {
@JvmField val job: CompletableDeferred<Unit> = CompletableDeferred() @JvmField
val job: CompletableDeferred<Unit> = CompletableDeferred()
} }
init { init {
@@ -177,4 +179,21 @@ class BatchSpanProcessor(
} }
} }
} }
@TestOnly
suspend fun reset() {
for (spanExporter in spanExporters) {
try {
withTimeout(30.seconds) {
spanExporter.reset()
}
}
catch (e: CancellationException) {
throw e
}
catch (e: Exception) {
logger<BatchSpanProcessor>().error("Failed to reset", e)
}
}
}
} }

View File

@@ -27,17 +27,22 @@ import java.util.concurrent.TimeUnit
@ApiStatus.Internal @ApiStatus.Internal
class JaegerJsonSpanExporter( class JaegerJsonSpanExporter(
file: Path, file: Path,
serviceName: String, val serviceName: String,
serviceVersion: String? = null, val serviceVersion: String? = null,
serviceNamespace: String? = null, val serviceNamespace: String? = null,
) : AsyncSpanExporter { ) : AsyncSpanExporter {
override val exporterVersion: Int = 2 override val exporterVersion: Int = 2
private val fileChannel: FileChannel private val fileChannel: FileChannel
private val writer: JsonGenerator private var writer: JsonGenerator
private val lock = Mutex() private val lock = Mutex()
private fun initWriter() = JsonFactory().createGenerator(Channels.newOutputStream(fileChannel))
.configure(com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET, true)
// Channels.newOutputStream doesn't implement flush, but just to be sure
.configure(com.fasterxml.jackson.core.JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false)
init { init {
val parent = file.parent val parent = file.parent
Files.createDirectories(parent) Files.createDirectories(parent)
@@ -46,10 +51,7 @@ class JaegerJsonSpanExporter(
StandardOpenOption.WRITE, StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) StandardOpenOption.TRUNCATE_EXISTING))
writer = JsonFactory().createGenerator(Channels.newOutputStream(fileChannel)) writer = initWriter()
.configure(com.fasterxml.jackson.core.JsonGenerator.Feature.AUTO_CLOSE_TARGET, true)
// Channels.newOutputStream doesn't implement flush, but just to be sure
.configure(com.fasterxml.jackson.core.JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false)
beginWriter(w = writer, beginWriter(w = writer,
serviceName = serviceName, serviceName = serviceName,
@@ -190,7 +192,7 @@ class JaegerJsonSpanExporter(
override suspend fun flush() { override suspend fun flush() {
lock.withReentrantLock { lock.withReentrantLock {
// if shutdown was already invoked OR nothing has been written to the temp file // if shutdown was already invoked OR nothing has been written to the output file
if (writer.isClosed) { if (writer.isClosed) {
return@withReentrantLock return@withReentrantLock
} }
@@ -201,6 +203,24 @@ class JaegerJsonSpanExporter(
fileChannel.position(fileChannel.position() - jsonEnd.size) fileChannel.position(fileChannel.position() - jsonEnd.size)
} }
} }
override suspend fun reset() {
lock.withReentrantLock {
// if shutdown was already invoked OR nothing has been written to the output file
if (writer.isClosed) {
return@withReentrantLock
}
writer = initWriter()
fileChannel.truncate(0)
beginWriter(w = writer,
serviceName = serviceName,
serviceVersion = serviceVersion,
serviceNamespace = serviceNamespace,
exporterVersion = exporterVersion)
}
}
} }
private fun beginWriter( private fun beginWriter(

View File

@@ -12,5 +12,8 @@ interface AsyncSpanExporter {
suspend fun flush() {} suspend fun flush() {}
/** Should clean any previously exported metrics */
suspend fun reset() {}
suspend fun shutdown() {} suspend fun shutdown() {}
} }

View File

@@ -100,7 +100,26 @@ interface TelemetryManager {
@Suppress("unused") @Suppress("unused")
@TestOnly @TestOnly
fun forceFlushMetricsBlocking() { fun forceFlushMetricsBlocking() {
runBlocking { forceFlushMetrics() }
}
/**
* Should reset all registered exporters [com.intellij.platform.diagnostic.telemetry.AsyncSpanExporter.reset].
* Meaning that spans and meters should be exported and discarded, so that resulting artifacts will not contain previously collected data.
*/
@TestOnly
suspend fun resetExporters()
/**
* Discard previously collected metrics and invoke flush.
* @see resetExporters
* @see forceFlushMetrics
*/
@Suppress("unused")
@TestOnly
fun reset() {
runBlocking { runBlocking {
resetExporters()
forceFlushMetrics() forceFlushMetrics()
} }
} }
@@ -152,6 +171,10 @@ class NoopTelemetryManager : TelemetryManager {
override suspend fun forceFlushMetrics() { override suspend fun forceFlushMetrics() {
logger<NoopTelemetryManager>().info("Cannot force flushing metrics for Noop telemetry manager") logger<NoopTelemetryManager>().info("Cannot force flushing metrics for Noop telemetry manager")
} }
override suspend fun resetExporters() {
logger<NoopTelemetryManager>().info("Cannot clean exported metrics for Noop telemetry manager")
}
} }
// suspend here is required to get parent span from coroutine context; that's why a version without `suspend` is called `rootSpan`. // suspend here is required to get parent span from coroutine context; that's why a version without `suspend` is called `rootSpan`.

View File

@@ -21,7 +21,6 @@ import com.intellij.util.ReflectionUtil;
import com.intellij.util.io.PersistentEnumeratorCache; import com.intellij.util.io.PersistentEnumeratorCache;
import com.intellij.util.ref.DebugReflectionUtil; import com.intellij.util.ref.DebugReflectionUtil;
import com.intellij.util.ui.UIUtil; import com.intellij.util.ui.UIUtil;
import kotlin.Suppress;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly; import org.jetbrains.annotations.TestOnly;

View File

@@ -56,7 +56,7 @@ public class PerformanceTestInfoImpl implements PerformanceTestInfo {
private String uniqueTestName; // at least full qualified test name (plus other identifiers, optionally) private String uniqueTestName; // at least full qualified test name (plus other identifiers, optionally)
@NotNull @NotNull
private final IJTracer tracer; private final IJTracer tracer;
private ArrayList<TelemetryMetricsCollector> metricsCollectors = new ArrayList<>(); private final ArrayList<TelemetryMetricsCollector> metricsCollectors = new ArrayList<>();
private boolean useDefaultSpanMetricExporter = true; private boolean useDefaultSpanMetricExporter = true;
@@ -109,11 +109,10 @@ public class PerformanceTestInfoImpl implements PerformanceTestInfo {
} }
} }
private static void cleanupOutdatedMeters() { private static void cleanupOutdatedMetrics() {
try { try {
// force spans and meters to be written to disk before any test starts // force spans and meters to be exported and discarded to minimize interference of the same metric on different tests
// it's at least what we can do to minimize interference of the same meter on different tests TelemetryManager.getInstance().reset();
TelemetryManager.getInstance().forceFlushMetricsBlocking();
// remove content of the previous tests from the idea.log // remove content of the previous tests from the idea.log
IJPerfMetricsPublisher.Companion.truncateTestLog(); IJPerfMetricsPublisher.Companion.truncateTestLog();
@@ -139,7 +138,7 @@ public class PerformanceTestInfoImpl implements PerformanceTestInfo {
public PerformanceTestInfoImpl() { public PerformanceTestInfoImpl() {
initOpenTelemetry(); initOpenTelemetry();
cleanupOutdatedMeters(); cleanupOutdatedMetrics();
this.tracer = TelemetryManager.getInstance().getTracer(new Scope("performanceUnitTests", null)); this.tracer = TelemetryManager.getInstance().getTracer(new Scope("performanceUnitTests", null));
} }