[tests] benchmark for async task overhead

+ coroutines `Deferred` vs `j.u.c` thread pool `Future`s
+ IO vs blocking dispatchers
+ `Synchronous` vs `ArrayBlockingQueue`

GitOrigin-RevId: 87b9d42ee237455ac1524969794830c8552bb166
This commit is contained in:
Ruslan Cheremin
2024-07-21 21:37:16 +02:00
committed by intellij-monorepo-bot
parent 13d31a63e3
commit 556ca8cdb7
2 changed files with 210 additions and 0 deletions

View File

@@ -0,0 +1,149 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.vfs.newvfs.persistent;
import kotlinx.coroutines.Dispatchers;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import static com.intellij.openapi.vfs.newvfs.persistent.AsyncKt.runTaskAsyncViaCoroutines;
import static com.intellij.util.io.BlockingKt.getBlockingDispatcher;
import static java.util.concurrent.TimeUnit.*;
/**
* Benchmark async task overhead on old-school thread pools vs coroutines
*/
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(NANOSECONDS)
@Warmup(iterations = 3, time = 2, timeUnit = SECONDS)
@Measurement(iterations = 3, time = 5, timeUnit = SECONDS)
@Threads(1)
@Fork(1)
public class AsyncTaskBenchmark {
@State(Scope.Benchmark)
public static class BaseContext {
@Param({"5000"})
public long baselineTaskDurationNs;
public Callable<Long> taskToRunAsync;
@Setup
public void setup() {
taskToRunAsync = new SlowTask(baselineTaskDurationNs);
}
}
@State(Scope.Benchmark)
public static class AsyncContext {
@Param({
"thread-pool",
"thread-pool-queried",
"coroutine-blocking-dispatcher",
"coroutine-io-dispatcher"
})
public String METHOD;
private ExecutorService executorService;
public Callable<Long> asyncWrapper;
@Setup
public void setup(BaseContext dataContext) {
var taskToRunAsync = dataContext.taskToRunAsync;
switch (METHOD) {
case "thread-pool": {
//copied from ProcessIOExecutorService:
executorService = new ThreadPoolExecutor(
/* poolSize: */ 1, Integer.MAX_VALUE,
/* keepAlive: */ 1, MINUTES,
new SynchronousQueue<>() //rendezvous queue
);
asyncWrapper = () -> executorService.submit(taskToRunAsync).get();
break;
}
case "thread-pool-queried": {
executorService = new ThreadPoolExecutor(
/* poolSize: */ 1, Integer.MAX_VALUE,
/* keepAlive: */ 1, MINUTES,
new ArrayBlockingQueue<>(64) //use actual queue instead of rendezvous
);
asyncWrapper = () -> executorService.submit(taskToRunAsync).get();
break;
}
case "coroutine-blocking-dispatcher": {
asyncWrapper = () -> runTaskAsyncViaCoroutines(taskToRunAsync, getBlockingDispatcher() );
break;
}
case "coroutine-io-dispatcher": {
asyncWrapper = () -> runTaskAsyncViaCoroutines(taskToRunAsync, Dispatchers.getIO() );
break;
}
default:
throw new IllegalStateException("METHOD: " + METHOD + " is unrecognized");
}
}
@TearDown
public void tearDown() throws InterruptedException {
if (executorService != null) {
executorService.shutdown();
executorService.awaitTermination(1, SECONDS);
}
}
}
@Benchmark
public Long directCall(BaseContext context) throws Exception {
return context.taskToRunAsync.call();
}
@Benchmark
public Long asyncCall(BaseContext context,
AsyncContext asyncContext) throws Exception {
return asyncContext.asyncWrapper.call();
}
public static class SlowTask implements Callable<Long> {
private final long delayNs;
public SlowTask(long delayNs) { this.delayNs = delayNs; }
@Override
public Long call() throws Exception {
long startedAt = System.nanoTime();
LockSupport.parkNanos(delayNs);
return System.nanoTime() - startedAt;
}
}
public static void main(final String[] args) throws RunnerException {
final Options opt = new OptionsBuilder()
.jvmArgs()
//.mode(Mode.SingleShotTime)
//.warmupIterations(10_000)
//.warmupTime(seconds(10))
//.warmupBatchSize(1000)
//.measurementIterations(10_000)
.include(AsyncTaskBenchmark.class.getSimpleName() + ".*")
.threads(1)
.forks(1)
.build();
new Runner(opt).run();
}
}

View File

@@ -0,0 +1,61 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.vfs.newvfs.persistent
import kotlinx.coroutines.*
import java.util.concurrent.Callable
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine
/* Support for AsyncTaskBenchmark: run java [Callable] via coroutines dispatcher */
@OptIn(DelicateCoroutinesApi::class)
fun <V> runTaskAsyncViaCoroutines(task: Callable<V>, coroutineDispatcher: CoroutineDispatcher): V {
@Suppress("SSBasedInspection")
val deferred = CoroutineScope(coroutineDispatcher + CoroutineName("detachedComputation: $task")).async {
task.call()
}
return runSuspend {
deferred.await()
}
}
// Copied from kotlin.coroutines.jvm.internal.RunSuspend.kt
internal fun <T> runSuspend(block: suspend () -> T): T {
val run = RunSuspend<T>()
block.startCoroutine(run)
return run.await()
}
private class RunSuspend<T> : Continuation<T> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
var result: Result<T>? = null
val lock = ReentrantLock()
val condition = lock.newCondition()
override fun resumeWith(result: Result<T>) = lock.withLock {
this.result = result
condition.signalAll()
}
fun await(): T {
lock.withLock {
while (true) {
when (val result = this.result) {
null -> condition.await()
else -> {
return result.getOrThrow() // throw up failure
}
}
}
}
}
}