From 04c4bee35a8e22631d1fb2dfb48b14abc4bb456f Mon Sep 17 00:00:00 2001 From: Vladimir Lagunov Date: Thu, 5 Sep 2024 13:53:53 +0200 Subject: [PATCH] IJent: Refactor IjentDeployingStrategy to have stderr reported during shell bootstrap failure There was a problem that `com.intellij.platform.ijent.spi.IjentDeployingOverShellProcessStrategy.ShellProcessWrapper#readWholeErrorStream` used to start reading stderr when the shell process is already dead, so in case of problems there was no stderr. The new refactoring brings `IjentSessionMediator` into `IjentDeployingStrategy`, and `IjentSessionMediator` knows how to collect stderr correctly. GitOrigin-RevId: 26563aa82b8842ae945ca37ce19c2fc23c7492f8 --- .idea/modules.xml | 1 + .../ijent/AbstractIjentVerificationAction.kt | 6 +- .../intellij/platform/ijent/IjentDeployer.kt | 34 ++-- .../platform/ijent/IjentGlobalScopeHolder.kt | 16 -- .../platform/ijent/IjentSessionRegistry.kt | 21 ++- .../IjentDeployingOverShellProcessStrategy.kt | 87 +++++----- .../ijent/spi/IjentDeployingStrategy.kt | 2 +- .../ijent/spi/IjentSessionMediator.kt | 149 ++++++++++++------ .../ijent/spi/IjentSessionProvider.kt | 28 ++-- ...platform.ijent.community.testFramework.iml | 17 ++ .../testFramework/wsl/WslIjentTestUtil.kt | 51 ++++++ .../wsl/ProductionWslIjentManager.kt | 21 ++- .../wsl/WslIjentDeployingStrategy.kt | 5 +- .../intellij/execution/wsl/WslIjentManager.kt | 25 +-- 14 files changed, 289 insertions(+), 174 deletions(-) delete mode 100644 platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt create mode 100644 platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml create mode 100644 platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt diff --git a/.idea/modules.xml b/.idea/modules.xml index 6e92151c0e53..865397a0b8c3 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -685,6 +685,7 @@ + diff --git a/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt b/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt index 87fcf7b13af8..10422144c56a 100644 --- a/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt +++ b/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt @@ -58,8 +58,8 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() { try { withModalProgress(modalTaskOwner, e.presentation.text, TaskCancellation.cancellable()) { coroutineScope { - val (title, deployingStrategy) = deployingStrategy() - deployingStrategy.deploy("IjentVerificationAction").ijentApi.use { ijent -> + val (title, deployingStrategy) = deployingStrategy(this) + deployingStrategy.deploy().ijentApi.use { ijent -> coroutineScope { launch { val info = ijent.ijentProcessInfo @@ -115,7 +115,7 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() { } } - protected abstract suspend fun deployingStrategy(): Pair + protected abstract suspend fun deployingStrategy(ijentProcessScope: CoroutineScope): Pair companion object { protected val LOG = logger() diff --git a/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt b/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt index c2709af5251f..30504358d578 100644 --- a/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt +++ b/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt @@ -5,21 +5,18 @@ package com.intellij.platform.ijent import com.intellij.platform.ijent.spi.DeployedIjent import com.intellij.platform.ijent.spi.IjentDeployingStrategy import com.intellij.platform.ijent.spi.connectToRunningIjent -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.job /** * Starts IJent on some machine, defined by [com.intellij.platform.ijent.spi.IjentDeployingStrategy]. * - * [ijentName] is a label used for logging, debugging, thread names, etc. - * - * By default, the IJent executable exits when the IDE exits. - * [com.intellij.platform.eel.IjentApi.close] and [com.intellij.platform.ijent.bindToScope] may be used to terminate IJent earlier. + * By default, the IJent executable exits when + * the coroutine scope of [IjentProcessMediator] from [IjentDeployingStrategy.createProcess] exits. + * [com.intellij.platform.ijent.IjentApi.close] may be used to terminate IJent earlier. * * TODO Either define thrown exceptions or return something like Result. */ -suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent { - val (remotePathToBinary, ijentApi) = doDeploy(ijentName) +suspend fun IjentDeployingStrategy.deploy(): DeployedIjent { + val (remotePathToBinary, ijentApi) = doDeploy() return object : DeployedIjent { override val ijentApi: IjentApi = ijentApi override val remotePathToBinary: String = remotePathToBinary @@ -27,8 +24,8 @@ suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent { } /** A specialized version of [com.intellij.platform.ijent.deploy] */ -suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjent.Posix { - val (remotePathToBinary, ijentApi) = doDeploy(ijentName) +suspend fun IjentDeployingStrategy.Posix.deploy(): DeployedIjent.Posix { + val (remotePathToBinary, ijentApi) = doDeploy() ijentApi as IjentPosixApi return object : DeployedIjent.Posix { override val ijentApi: IjentPosixApi = ijentApi @@ -37,8 +34,8 @@ suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjen } /** A specialized version of [com.intellij.platform.ijent.deploy] */ -suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIjent.Windows { - val (remotePathToBinary, ijentApi) = doDeploy(ijentName) +suspend fun IjentDeployingStrategy.Windows.deploy(): DeployedIjent.Windows { + val (remotePathToBinary, ijentApi) = doDeploy() ijentApi as IjentWindowsApi return object : DeployedIjent.Windows { override val ijentApi: IjentWindowsApi = ijentApi @@ -46,20 +43,13 @@ suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIj } } -/** A shortcut for terminating an [IjentApi] when the [coroutineScope] completes. */ -fun IjentApi.bindToScope(coroutineScope: CoroutineScope) { - coroutineScope.coroutineContext.job.invokeOnCompletion { - this@bindToScope.close() - } -} - -private suspend fun IjentDeployingStrategy.doDeploy(ijentName: String): Pair = +private suspend fun IjentDeployingStrategy.doDeploy(): Pair = try { val targetPlatform = getTargetPlatform() val remotePathToBinary = copyFile(IjentExecFileProvider.getInstance().getIjentBinary(targetPlatform)) - val process = createProcess(remotePathToBinary) + val mediator = createProcess(remotePathToBinary) - val ijentApi = connectToRunningIjent(ijentName, getConnectionStrategy(), targetPlatform, process) + val ijentApi = connectToRunningIjent(getConnectionStrategy(), targetPlatform, mediator) remotePathToBinary to ijentApi } finally { diff --git a/platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt b/platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt deleted file mode 100644 index 5f3efe3e8119..000000000000 --- a/platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. -package com.intellij.platform.ijent - -import com.intellij.openapi.application.ApplicationManager -import com.intellij.openapi.components.Service -import com.intellij.openapi.components.service -import com.intellij.openapi.components.serviceAsync -import kotlinx.coroutines.CoroutineScope - -@Service -internal class IjentApplicationScope private constructor(scope: CoroutineScope) : CoroutineScope by scope { - companion object { - fun instance(): IjentApplicationScope = ApplicationManager.getApplication().service() - suspend fun instanceAsync(): IjentApplicationScope = serviceAsync() - } -} \ No newline at end of file diff --git a/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt b/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt index af0d335d3fcd..3927d059dc6f 100644 --- a/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt +++ b/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt @@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong /** - * This service MUST know about all running IJents. + * This service MUST know about a running IJent if it's going to be used through [java.nio.file.spi.FileSystemProvider]. * It also MAY know about delegates and wrappers over interfaces, if they are registered with [register]. */ @Service(Service.Level.APP) @@ -28,21 +28,15 @@ class IjentSessionRegistry(private val coroutineScope: CoroutineScope) { /** * [ijentName] is used for debugging utilities like logs and thread names. * - * When [oneOff] is true, [launcher] may be called at most once, and if something wrong happens with the returned IJent, - * [get] still keeps returning that broken instance of [IjentApi]. - * - * When [oneOff] is false, [get] checks if an already created [IjentApi] is functional, and if there's a problem, [get] calls - * [launcher] again and remembers the new instance. - * - * [launcher] should use the provided coroutine scope for launching various jobs, passing to the implementation of [IjentApi], etc. + * If something happens with IJent, and it starts throwing [IjentUnavailableException], + * [launch] is called again to make a new instance of [IjentApi]. */ fun register( ijentName: String, - oneOff: Boolean, launcher: suspend (ijentId: IjentId) -> IjentApi, ): IjentId { val ijentId = IjentId("ijent-${counter.getAndIncrement()}-${ijentName.replace(Regex("[^A-Za-z0-9-]"), "-")}") - ijents[ijentId] = IjentBundle(launcher, null, oneOff) + ijents[ijentId] = IjentBundle(launcher, null, oneOff = false) return ijentId } @@ -100,7 +94,12 @@ class IjentSessionRegistry(private val coroutineScope: CoroutineScope) { oneOff = oldBundle.oneOff, ) })!! - return bundle.deferred!!.await() + try { + return bundle.deferred!!.await() + } + catch (err: Throwable) { + throw IjentUnavailableException.unwrapFromCancellationExceptions(err) + } } companion object { diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt index 363fdcf49a7b..b5f74a7c70fe 100644 --- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt +++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt @@ -2,15 +2,15 @@ package com.intellij.platform.ijent.spi import com.intellij.execution.CommandLineUtil.posixQuote -import com.intellij.openapi.application.ApplicationManager -import com.intellij.openapi.diagnostic.* +import com.intellij.openapi.diagnostic.debug +import com.intellij.openapi.diagnostic.logger +import com.intellij.openapi.diagnostic.trace import com.intellij.platform.eel.EelPlatform +import com.intellij.platform.ijent.IjentUnavailableException import com.intellij.platform.ijent.getIjentGrpcArgv -import com.intellij.util.io.computeDetached import com.intellij.util.io.copyToAsync import kotlinx.coroutines.* import org.jetbrains.annotations.VisibleForTesting -import java.io.IOException import java.io.InputStream import java.nio.file.Path import kotlin.io.path.fileSize @@ -18,18 +18,20 @@ import kotlin.io.path.inputStream import kotlin.time.Duration.Companion.seconds abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : IjentDeployingStrategy.Posix { + protected abstract val ijentLabel: String + /** * If there's some bind mount, returns the path for the remote machine/container that corresponds to [path]. * Otherwise, returns null. */ protected abstract suspend fun mapPath(path: Path): String? - protected abstract suspend fun createShellProcess(): ShellProcessWrapper + protected abstract suspend fun createShellProcess(): Process private val myContext: Deferred = run { var createdShellProcess: ShellProcessWrapper? = null val context = scope.async(start = CoroutineStart.LAZY) { - val shellProcess = createShellProcess() + val shellProcess = ShellProcessWrapper(IjentSessionMediator.create(scope, createShellProcess(), ijentLabel)) createdShellProcess = shellProcess createDeployingContext(shellProcess.apply { // The timeout is taken at random. @@ -40,11 +42,6 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I } }) } - context.invokeOnCompletion { error -> - if (error != null && error !is CancellationException) { - createdShellProcess?.destroyForcibly() - } - } context } @@ -54,7 +51,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I } } - final override suspend fun createProcess(binaryPath: String): Process { + final override suspend fun createProcess(binaryPath: String): IjentSessionMediator { return myContext.await().execCommand { execIjent(binaryPath) } @@ -74,9 +71,10 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I override suspend fun getConnectionStrategy(): IjentConnectionStrategy = IjentConnectionStrategy.Default - class ShellProcessWrapper(private var wrapped: Process?) { + internal class ShellProcessWrapper(private var mediator: IjentSessionMediator?) { + @OptIn(DelicateCoroutinesApi::class) suspend fun write(data: String) { - val process = wrapped!! + val process = mediator!!.process @Suppress("NAME_SHADOWING") val data = if (data.endsWith("\n")) data else "$data\n" @@ -96,7 +94,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I suspend fun readLineWithoutBuffering(): String = withContext(Dispatchers.IO) { val buffer = StringBuilder() - val stream = wrapped!!.inputStream + val stream = mediator!!.process.inputStream while (true) { ensureActive() val c = stream.read() @@ -112,7 +110,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I } suspend fun copyDataFrom(stream: InputStream) { - val process = wrapped!! + val process = mediator!!.process withContext(Dispatchers.IO) { stream.copyToAsync(process.outputStream) ensureActive() @@ -120,17 +118,22 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I } } - fun destroyForcibly() { - wrapped!!.destroyForcibly() + @OptIn(InternalCoroutinesApi::class) + suspend fun destroyForciblyAndGetError(): Throwable { + mediator!!.process.destroyForcibly() + try { + val job = mediator!!.ijentProcessScope.coroutineContext.job + job.join() + throw job.getCancellationException() + } + catch (err: Throwable) { + return IjentUnavailableException.unwrapFromCancellationExceptions(err) + } } - @OptIn(DelicateCoroutinesApi::class) - suspend fun readWholeErrorStream(): ByteArray = - computeDetached { wrapped!!.errorStream.readAllBytes() } - - fun extractProcess(): Process { - val result = wrapped!! - wrapped = null + fun extractProcess(): IjentSessionMediator { + val result = mediator!! + mediator = null return result } } @@ -140,24 +143,26 @@ private suspend fun DeployingContextAndShell.execCommand(block: suspen return try { block() } - catch (err: Throwable) { - runCatching { process.destroyForcibly() }.exceptionOrNull()?.let(err::addSuppressed) + catch (initialErrorFromStack: Throwable) { + val errorFromScope = process.destroyForciblyAndGetError() + val errorFromStack = IjentUnavailableException.unwrapFromCancellationExceptions(initialErrorFromStack) - val attachment = Attachment("stderr", String(process.readWholeErrorStream())) - attachment.isIncluded = attachment.isIncluded or ApplicationManager.getApplication().isInternal + // It happens sometimes that some valuable error is wrapped into CancellationException. + // However, if there's no valuable error, + // then it's a fair CancellationException that should be thrown futher, to cancel the context. + val (mainError, suppressedError) = + when (errorFromStack) { + is IjentUnavailableException, is CancellationException -> errorFromStack to errorFromScope + else -> errorFromScope to errorFromStack + } - val errorWithAttachments = RuntimeExceptionWithAttachments(err.message ?: "", err, attachment) + if (mainError != suppressedError) { + mainError.addSuppressed(suppressedError) + } - // TODO Suppress RuntimeExceptionWithAttachments instead of wrapping when KT-66006 is resolved. - //err.addSuppressed(RuntimeExceptionWithAttachments( - // "The error happened during handling $process", - // Attachment("stderr", stderr.toString()).apply { isIncluded = isIncluded or ApplicationManager.getApplication().isInternal }, - //)) - //throw err - - throw when (err) { - is TimeoutCancellationException, is IOException -> IjentStartupError.CommunicationError(errorWithAttachments) - else -> errorWithAttachments + throw when (mainError) { + is IjentUnavailableException, is CancellationException -> mainError + else -> IjentStartupError.CommunicationError(mainError) } } } @@ -348,7 +353,7 @@ private suspend fun DeployingContextAndShell.uploadIjentBinary( return process.readLineWithoutBuffering() } -private suspend fun DeployingContextAndShell.execIjent(remotePathToBinary: String): Process { +private suspend fun DeployingContextAndShell.execIjent(remotePathToBinary: String): IjentSessionMediator { val joinedCmd = getIjentGrpcArgv(remotePathToBinary, selfDeleteOnExit = true, usrBinEnv = context.env).joinToString(" ") val commandLineArgs = context.run { """ diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt index 9ef3e0d3ccef..f60c6a6b514a 100644 --- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt +++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt @@ -48,7 +48,7 @@ interface IjentDeployingStrategy { * @param binaryPath path to ijent binary on target environment * @return process that will be used for communication */ - suspend fun createProcess(binaryPath: String): Process + suspend fun createProcess(binaryPath: String): IjentSessionMediator /** * Copy files to the target environment. Typically used to transfer the ijent binary to the target machine. diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt index 06459a62e9aa..ab0a99151e26 100644 --- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt +++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt @@ -4,9 +4,9 @@ package com.intellij.platform.ijent.spi import com.intellij.openapi.diagnostic.Attachment import com.intellij.openapi.diagnostic.debug import com.intellij.openapi.diagnostic.logger -import com.intellij.platform.ijent.IjentApplicationScope -import com.intellij.platform.ijent.IjentId +import com.intellij.openapi.progress.Cancellation import com.intellij.platform.ijent.IjentUnavailableException +import com.intellij.platform.ijent.coroutineNameAppended import com.intellij.platform.util.coroutines.childScope import com.intellij.util.containers.ContainerUtil import com.intellij.util.io.awaitExit @@ -22,8 +22,6 @@ import java.time.ZonedDateTime import java.time.format.DateTimeParseException import java.util.* import java.util.concurrent.TimeUnit -import kotlin.coroutines.AbstractCoroutineContextElement -import kotlin.coroutines.CoroutineContext import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds import kotlin.time.toKotlinDuration @@ -34,6 +32,10 @@ import kotlin.time.toKotlinDuration * * [processExit] never throws. When it completes, it either means that the process has finished, or that the whole scope of IJent processes * is canceled. + * + * [ijentProcessScope] should be used by the [com.intellij.platform.ijent.IjentApi] implementation for launching internal coroutines. + * No matter if IJent exits expectedly or not, an attempt to do anything with [ijentProcessScope] after the IJent has exited + * throws [IjentUnavailableException]. */ class IjentSessionMediator private constructor( val ijentProcessScope: CoroutineScope, @@ -58,40 +60,85 @@ class IjentSessionMediator private constructor( /** * See the docs of [IjentSessionMediator]. * - * [ijentId] is used only for logging. + * [ijentLabel] is used only for logging. + * + * Beware that [parentScope] receives [IjentUnavailableException.CommunicationFailure] if IJent _suddenly_ exits, f.i., after SIGKILL. + * Nothing happens with [parentScope] if IJent exits expectedly, f.i., after [com.intellij.platform.ijent.IjentApi.close]. */ @OptIn(DelicateCoroutinesApi::class) - fun create(process: Process, ijentId: IjentId): IjentSessionMediator { + fun create(parentScope: CoroutineScope, process: Process, ijentLabel: String): IjentSessionMediator { + // TODO What about https://youtrack.jetbrains.com/issue/IJPL-156891 ? + + require(parentScope.coroutineContext[Job] != null) { + "Scope $parentScope has no Job" + } + val context = IjentThreadPool.asCoroutineDispatcher() + val ijentProcessScope = run { + // Prevents from logging the error by the default exception handler. + // Errors are logged explicitly in this function. + val dummyExceptionHandler = CoroutineExceptionHandler { _, err -> /* nothing */ } + + // This supervisor scope exists only to prevent automatic propagation of IjentUnavailableException to the parent scope. + // Instead, there's a logic below that decides if a specific IjentUnavailableException should be propagated to the parent scope. + val trickySupervisorScope = parentScope.childScope(ijentLabel, context + dummyExceptionHandler, supervisor = true) + + val ijentProcessScope = trickySupervisorScope.childScope(ijentLabel, supervisor = false) + + ijentProcessScope.coroutineContext.job.invokeOnCompletion { err -> + trickySupervisorScope.cancel() + + if (err != null) { + val propagateToParentScope = when (err) { + is IjentUnavailableException -> when (err) { + is IjentUnavailableException.ClosedByApplication -> false + is IjentUnavailableException.CommunicationFailure -> !err.exitedExpectedly + } + else -> true + } + + if (propagateToParentScope) { + try { + err.addSuppressed(Throwable("Rethrown from here")) + parentScope.launch(start = CoroutineStart.UNDISPATCHED) { + throw err + } + } + catch (_: Throwable) { + // It seems that the scope has already been canceled with something else. + } + } + + // TODO Callers should be able to define their own exception handlers. + logIjentError(ijentLabel, err) + } + } + + ijentProcessScope + } + val lastStderrMessages = MutableSharedFlow( replay = 30, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST, ) - val exceptionHandler = IjentSessionCoroutineExceptionHandler(ijentId) - val connectionScope = IjentApplicationScope.instance().childScope( - "ijent $ijentId > connection scope", - supervisor = false, - context = exceptionHandler + IjentThreadPool.asCoroutineDispatcher(), - ) - // stderr logger should outlive the current scope. In case if an error appears, the scope is cancelled immediately, but the whole // intention of the stderr logger is to write logs of the remote process, which come from the remote machine to the local one with // a delay. - GlobalScope.launch(blockingDispatcher + CoroutineName("ijent $ijentId > stderr logger")) { - ijentProcessStderrLogger(process, ijentId, lastStderrMessages) + GlobalScope.launch(blockingDispatcher + ijentProcessScope.coroutineNameAppended("stderr logger")) { + ijentProcessStderrLogger(process, ijentLabel, lastStderrMessages) } val processExit = CompletableDeferred() - val mediator = IjentSessionMediator(connectionScope, process, processExit) + val mediator = IjentSessionMediator(ijentProcessScope, process, processExit) - val awaiterScope = IjentApplicationScope.instance().launch(CoroutineName("ijent $ijentId > exit awaiter scope") + exceptionHandler) { - ijentProcessExitAwaiter(ijentId, mediator, lastStderrMessages) + val awaiterScope = ijentProcessScope.launch(context = context + ijentProcessScope.coroutineNameAppended("exit awaiter scope")) { + ijentProcessExitAwaiter(ijentLabel, mediator, lastStderrMessages) } - val finalizerScope = connectionScope.launch(CoroutineName("ijent $ijentId > finalizer scope")) { - ijentProcessFinalizer(ijentId, mediator) + val finalizerScope = ijentProcessScope.launch(context = context + ijentProcessScope.coroutineNameAppended("finalizer scope")) { + ijentProcessFinalizer(ijentLabel, mediator) } awaiterScope.invokeOnCompletion { err -> @@ -104,21 +151,19 @@ class IjentSessionMediator private constructor( } } -private class IjentSessionCoroutineExceptionHandler( - private val ijentId: IjentId, -) : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler { - private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap()) +private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap()) - override fun toString(): String = javaClass.simpleName - - override fun handleException(context: CoroutineContext, exception: Throwable) { +private fun logIjentError(ijentLabel: String, exception: Throwable) { + // The logger can create new services, and since this function is called inside an already failed coroutine context, + // service creation would be impossible without `executeInNonCancelableSection`. + Cancellation.executeInNonCancelableSection { when (exception) { is IjentUnavailableException -> when (exception) { is IjentUnavailableException.ClosedByApplication -> Unit is IjentUnavailableException.CommunicationFailure -> { if (!exception.exitedExpectedly && loggedErrors.add(exception)) { - LOG.error("Exception in connection with IJent $ijentId: ${exception.message}", exception) + LOG.error("Exception in connection with IJent $ijentLabel: ${exception.message}", exception) } } } @@ -127,27 +172,27 @@ private class IjentSessionCoroutineExceptionHandler( else -> { if (loggedErrors.add(exception)) { - LOG.error("Unexpected error during communnication with IJent $ijentId", exception) + LOG.error("Unexpected error during communnication with IJent $ijentLabel", exception) } } } } } -private suspend fun ijentProcessStderrLogger(process: Process, ijentId: IjentId, lastStderrMessages: MutableSharedFlow) { +private suspend fun ijentProcessStderrLogger(process: Process, ijentLabel: String, lastStderrMessages: MutableSharedFlow) { try { process.errorStream.reader().useLines { lines -> for (line in lines) { yield() if (line.isNotEmpty()) { - logIjentStderr(ijentId, line) + logIjentStderr(ijentLabel, line) lastStderrMessages.emit(line) } } } } catch (err: IOException) { - LOG.debug { "$ijentId bootstrap got an error: $err" } + LOG.debug { "$ijentLabel bootstrap got an error: $err" } } finally { lastStderrMessages.emit(null) @@ -165,13 +210,13 @@ private val ijentLogMessageRegex = Regex( RegexOption.COMMENTS, ) -private fun logIjentStderr(ijentId: IjentId, line: String) { +private fun logIjentStderr(ijentLabel: String, line: String) { val hostDateTime = ZonedDateTime.now() val (rawRemoteDateTime, level, message) = ijentLogMessageRegex.matchEntire(line)?.destructured ?: run { - LOG.debug { "$ijentId log: $line" } + LOG.debug { "$ijentLabel log: $line" } return } @@ -179,7 +224,7 @@ private fun logIjentStderr(ijentId: IjentId, line: String) { java.time.Duration.between(ZonedDateTime.parse(rawRemoteDateTime), hostDateTime).toKotlinDuration() } catch (_: DateTimeParseException) { - LOG.debug { "$ijentId log: $line" } + LOG.debug { "$ijentLabel log: $line" } return } @@ -192,7 +237,7 @@ private fun logIjentStderr(ijentId: IjentId, line: String) { } logger(buildString { - append(ijentId) + append(ijentLabel) append(" log: ") if (dateTimeDiff.absoluteValue > 50.milliseconds) { // The timeout is taken at random. append(rawRemoteDateTime) @@ -205,12 +250,12 @@ private fun logIjentStderr(ijentId: IjentId, line: String) { } private suspend fun ijentProcessExitAwaiter( - ijentId: IjentId, + ijentLabel: String, mediator: IjentSessionMediator, lastStderrMessages: MutableSharedFlow, ): Nothing { val exitCode = mediator.process.awaitExit() - LOG.debug { "IJent process $ijentId exited with code $exitCode" } + LOG.debug { "IJent process $ijentLabel exited with code $exitCode" } val isExitExpected = when (mediator.expectedErrorCode) { IjentSessionMediator.ExpectedErrorCode.NO -> false @@ -233,7 +278,7 @@ private suspend fun ijentProcessExitAwaiter( } } IjentUnavailableException.CommunicationFailure( - "The process $ijentId suddenly exited with the code $exitCode", + "The process $ijentLabel suddenly exited with the code $exitCode", Attachment("stderr", stderr.toString()), ) } @@ -250,37 +295,41 @@ private suspend fun collectLines(lastStderrMessages: SharedFlow, stderr } @OptIn(DelicateCoroutinesApi::class) -private suspend fun ijentProcessFinalizer(ijentId: IjentId, mediator: IjentSessionMediator): Nothing { +private suspend fun ijentProcessFinalizer(ijentLabel: String, mediator: IjentSessionMediator): Nothing { try { awaitCancellation() } catch (err: Exception) { - throw when (val cause = generateSequence(err, Throwable::cause).firstOrNull { it !is CancellationException }) { - null -> err - is IjentUnavailableException -> cause - else -> { - LOG.debug(err) { "$ijentId is going to be terminated due to receiving an error" } - IjentUnavailableException.CommunicationFailure("IJent communication terminated due to an error", err) - } + val actualErrors = generateSequence(err, Throwable::cause).filterTo(mutableListOf()) { it !is CancellationException } + + val existingIjentUnavailableException = actualErrors.filterIsInstance().firstOrNull() + if (existingIjentUnavailableException != null) { + throw existingIjentUnavailableException } + + val cause = actualErrors.firstOrNull() ?: err + val message = + if (cause is CancellationException) "The coroutine scope of $ijentLabel was cancelled" + else "IJent communication terminated due to an error" + throw IjentUnavailableException.ClosedByApplication(message, cause) } finally { mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ANY val process = mediator.process if (process.isAlive) { - GlobalScope.launch(Dispatchers.IO + CoroutineName("$ijentId destruction")) { + GlobalScope.launch(Dispatchers.IO + CoroutineName("$ijentLabel destruction")) { try { process.waitFor(5, TimeUnit.SECONDS) // A random timeout. } finally { if (process.isAlive) { - LOG.warn("The process $ijentId is still alive, it will be killed") + LOG.warn("The process $ijentLabel is still alive, it will be killed") process.destroy() } } } GlobalScope.launch(Dispatchers.IO) { - LOG.debug { "Closing stdin of $ijentId" } + LOG.debug { "Closing stdin of $ijentLabel" } process.outputStream.close() } } diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt index 349f846d9b63..cfb9620fe5b0 100644 --- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt +++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt @@ -50,22 +50,24 @@ internal class DefaultIjentSessionProvider : IjentSessionProvider { * [ijentName] is used for debugging utilities like logs and thread names. * * The process terminates automatically only when the IDE exits, or if [IjentApi.close] is called explicitly. - * [com.intellij.platform.ijent.bindToScope] may be useful for terminating the IJent process earlier. */ -suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform, process: Process): IjentApi { - val ijentSessionRegistry = IjentSessionRegistry.instanceAsync() - val ijentId = ijentSessionRegistry.register(ijentName, oneOff = true) { ijentId -> - val mediator = IjentSessionMediator.create(process, ijentId) - mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO - IjentSessionProvider.instanceAsync().connect(strategy, platform, mediator) - } - return ijentSessionRegistry.get(ijentId) +suspend fun connectToRunningIjent(strategy: IjentConnectionStrategy, platform: EelPlatform, mediator: IjentSessionMediator): IjentApi { + mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO + return IjentSessionProvider.instanceAsync().connect(strategy, platform, mediator) } /** A specialized overload of [connectToRunningIjent] */ -suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform.Posix, process: Process): IjentPosixApi = - connectToRunningIjent(ijentName, strategy, platform as EelPlatform, process) as IjentPosixApi +suspend fun connectToRunningIjent( + strategy: IjentConnectionStrategy, + platform: EelPlatform.Posix, + mediator: IjentSessionMediator, +): IjentPosixApi = + connectToRunningIjent(strategy, platform as EelPlatform, mediator) as IjentPosixApi /** A specialized overload of [connectToRunningIjent] */ -suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform.Windows, process: Process): IjentWindowsApi = - connectToRunningIjent(ijentName, strategy, platform as EelPlatform, process) as IjentWindowsApi +suspend fun connectToRunningIjent( + strategy: IjentConnectionStrategy, + platform: EelPlatform.Windows, + mediator: IjentSessionMediator, +): IjentWindowsApi = + connectToRunningIjent(strategy, platform as EelPlatform, mediator) as IjentWindowsApi diff --git a/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml b/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml new file mode 100644 index 000000000000..7573f0de9487 --- /dev/null +++ b/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt b/platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt new file mode 100644 index 000000000000..4062a396fef4 --- /dev/null +++ b/platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt @@ -0,0 +1,51 @@ +// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license. +@file:JvmName("WslIjentTestUtil") +package com.intellij.ijent.testFramework.wsl + +import com.intellij.execution.wsl.ProductionWslIjentManager +import com.intellij.execution.wsl.WslIjentManager +import com.intellij.execution.wsl.ijent.nio.toggle.IjentWslNioFsToggler +import com.intellij.openapi.Disposable +import com.intellij.openapi.application.ApplicationManager +import com.intellij.openapi.util.Disposer +import com.intellij.platform.util.coroutines.childScope +import com.intellij.testFramework.replaceService +import kotlinx.coroutines.* +import java.util.concurrent.CancellationException + +fun replaceProductionWslIjentManager(disposable: Disposable) { + replaceService(WslIjentManager::class.java, ::ProductionWslIjentManager, disposable) +} + +fun replaceProductionWslIjentManager(newServiceScope: CoroutineScope) { + replaceService(WslIjentManager::class.java, ::ProductionWslIjentManager, newServiceScope) +} + +fun replaceIjentWslNioFsToggler(newServiceScope: CoroutineScope) { + replaceService(IjentWslNioFsToggler::class.java, ::IjentWslNioFsToggler, newServiceScope) +} + +private fun replaceService(iface: Class, constructor: (CoroutineScope) -> T, newServiceScope: CoroutineScope) { + val disposable = Disposer.newDisposable(newServiceScope.toString()) + newServiceScope.coroutineContext.job.invokeOnCompletion { + Disposer.dispose(disposable) + } + ApplicationManager.getApplication().replaceService( + iface, + constructor(newServiceScope), + disposable, + ) +} + +@OptIn(DelicateCoroutinesApi::class) +private fun replaceService(iface: Class, constructor: (CoroutineScope) -> T, disposable: Disposable) { + val newServiceScope = GlobalScope.childScope("Disposable $disposable", supervisor = true) + Disposer.register(disposable) { + newServiceScope.cancel(CancellationException("Disposed $disposable")) + } + ApplicationManager.getApplication().replaceService( + iface, + constructor(newServiceScope), + disposable, + ) +} \ No newline at end of file diff --git a/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt b/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt index a505fdeb766f..561bd62cc54d 100644 --- a/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt +++ b/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt @@ -6,7 +6,6 @@ import com.intellij.openapi.project.Project import com.intellij.platform.ijent.IjentId import com.intellij.platform.ijent.IjentPosixApi import com.intellij.platform.ijent.IjentSessionRegistry -import com.intellij.platform.ijent.bindToScope import com.intellij.platform.ijent.spi.IjentThreadPool import com.intellij.platform.util.coroutines.childScope import kotlinx.coroutines.CoroutineScope @@ -37,9 +36,17 @@ class ProductionWslIjentManager(private val scope: CoroutineScope) : WslIjentMan override suspend fun getIjentApi(wslDistribution: WSLDistribution, project: Project?, rootUser: Boolean): IjentPosixApi { val ijentSessionRegistry = IjentSessionRegistry.instanceAsync() val ijentId = myCache.computeIfAbsent("""wsl:${wslDistribution.id}${if (rootUser) ":root" else ""}""") { ijentName -> - val ijentId = ijentSessionRegistry.register(ijentName, oneOff = false) { - val ijent = deployAndLaunchIjent(project, wslDistribution, wslCommandLineOptionsModifier = { it.setSudo(rootUser) }) - ijent.bindToScope(scope) + val ijentId = ijentSessionRegistry.register(ijentName) { ijentId -> + val ijent = deployAndLaunchIjent( + scope, + project, + ijentId.toString(), + wslDistribution, + wslCommandLineOptionsModifier = { it.setSudo(rootUser) }, + ) + scope.coroutineContext.job.invokeOnCompletion { + ijent.close() + } ijent } scope.coroutineContext.job.invokeOnCompletion { @@ -51,6 +58,12 @@ class ProductionWslIjentManager(private val scope: CoroutineScope) : WslIjentMan return ijentSessionRegistry.get(ijentId) as IjentPosixApi } + init { + scope.coroutineContext.job.invokeOnCompletion { + dropCache() + } + } + @VisibleForTesting fun dropCache() { val ijentSessionRegistry = serviceIfCreated() diff --git a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt index 5d8cc6b7f083..a2f95b134c02 100644 --- a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt +++ b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt @@ -16,6 +16,7 @@ import java.nio.file.Path @ApiStatus.Internal class WslIjentDeployingStrategy( scope: CoroutineScope, + override val ijentLabel: String, private val distribution: WSLDistribution, private val project: Project?, private val wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {} @@ -24,7 +25,7 @@ class WslIjentDeployingStrategy( distribution.getWslPath(path) @OptIn(IntellijInternalApi::class, DelicateCoroutinesApi::class) - override suspend fun createShellProcess(): ShellProcessWrapper { + override suspend fun createShellProcess(): Process { // IJent can start an interactive shell by itself whenever it needs. // Enabling an interactive shell for IJent by default can bring problems, because stdio of IJent must not be populated // with possible user extensions in ~/.profile @@ -38,7 +39,7 @@ class WslIjentDeployingStrategy( val commandLine = WSLDistribution.neverRunTTYFix(GeneralCommandLine("/bin/sh")) distribution.doPatchCommandLine(commandLine, project, wslCommandLineOptions) - return ShellProcessWrapper(computeDetached { commandLine.createProcess() }) + return computeDetached { commandLine.createProcess() } } override suspend fun getConnectionStrategy(): IjentConnectionStrategy { diff --git a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt index 4af00b82b85a..e9b679c84b11 100644 --- a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt +++ b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt @@ -12,7 +12,6 @@ import com.intellij.platform.ijent.deploy import com.intellij.platform.ijent.spi.DeployedIjent import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.coroutineScope import org.jetbrains.annotations.ApiStatus import org.jetbrains.annotations.VisibleForTesting @@ -51,24 +50,28 @@ interface WslIjentManager { } } -suspend fun deployAndLaunchIjent( +internal suspend fun deployAndLaunchIjent( + parentScope: CoroutineScope, project: Project?, + ijentLabel: String, wslDistribution: WSLDistribution, wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {}, -): IjentPosixApi = deployAndLaunchIjentGettingPath(project, wslDistribution, wslCommandLineOptionsModifier).ijentApi +): IjentPosixApi = + deployAndLaunchIjentGettingPath(parentScope, project, ijentLabel, wslDistribution, wslCommandLineOptionsModifier).ijentApi @VisibleForTesting suspend fun deployAndLaunchIjentGettingPath( + parentScope: CoroutineScope, project: Project?, + ijentLabel: String, wslDistribution: WSLDistribution, wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {}, ): DeployedIjent.Posix { - return coroutineScope { - WslIjentDeployingStrategy( - scope = this, - distribution = wslDistribution, - project = project, - wslCommandLineOptionsModifier = wslCommandLineOptionsModifier - ).deploy("WSL-${wslDistribution.id}") - } + return WslIjentDeployingStrategy( + scope = parentScope, + ijentLabel = ijentLabel, + distribution = wslDistribution, + project = project, + wslCommandLineOptionsModifier = wslCommandLineOptionsModifier + ).deploy() } \ No newline at end of file