diff --git a/.idea/modules.xml b/.idea/modules.xml
index 6e92151c0e53..865397a0b8c3 100644
--- a/.idea/modules.xml
+++ b/.idea/modules.xml
@@ -685,6 +685,7 @@
+
diff --git a/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt b/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt
index 87fcf7b13af8..10422144c56a 100644
--- a/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt
+++ b/platform/execution-impl/src/com/intellij/execution/wsl/ijent/AbstractIjentVerificationAction.kt
@@ -58,8 +58,8 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() {
try {
withModalProgress(modalTaskOwner, e.presentation.text, TaskCancellation.cancellable()) {
coroutineScope {
- val (title, deployingStrategy) = deployingStrategy()
- deployingStrategy.deploy("IjentVerificationAction").ijentApi.use { ijent ->
+ val (title, deployingStrategy) = deployingStrategy(this)
+ deployingStrategy.deploy().ijentApi.use { ijent ->
coroutineScope {
launch {
val info = ijent.ijentProcessInfo
@@ -115,7 +115,7 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() {
}
}
- protected abstract suspend fun deployingStrategy(): Pair
+ protected abstract suspend fun deployingStrategy(ijentProcessScope: CoroutineScope): Pair
companion object {
protected val LOG = logger()
diff --git a/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt b/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt
index c2709af5251f..30504358d578 100644
--- a/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt
+++ b/platform/ijent/src/com/intellij/platform/ijent/IjentDeployer.kt
@@ -5,21 +5,18 @@ package com.intellij.platform.ijent
import com.intellij.platform.ijent.spi.DeployedIjent
import com.intellij.platform.ijent.spi.IjentDeployingStrategy
import com.intellij.platform.ijent.spi.connectToRunningIjent
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.job
/**
* Starts IJent on some machine, defined by [com.intellij.platform.ijent.spi.IjentDeployingStrategy].
*
- * [ijentName] is a label used for logging, debugging, thread names, etc.
- *
- * By default, the IJent executable exits when the IDE exits.
- * [com.intellij.platform.eel.IjentApi.close] and [com.intellij.platform.ijent.bindToScope] may be used to terminate IJent earlier.
+ * By default, the IJent executable exits when
+ * the coroutine scope of [IjentProcessMediator] from [IjentDeployingStrategy.createProcess] exits.
+ * [com.intellij.platform.ijent.IjentApi.close] may be used to terminate IJent earlier.
*
* TODO Either define thrown exceptions or return something like Result.
*/
-suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent {
- val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
+suspend fun IjentDeployingStrategy.deploy(): DeployedIjent {
+ val (remotePathToBinary, ijentApi) = doDeploy()
return object : DeployedIjent {
override val ijentApi: IjentApi = ijentApi
override val remotePathToBinary: String = remotePathToBinary
@@ -27,8 +24,8 @@ suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent {
}
/** A specialized version of [com.intellij.platform.ijent.deploy] */
-suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjent.Posix {
- val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
+suspend fun IjentDeployingStrategy.Posix.deploy(): DeployedIjent.Posix {
+ val (remotePathToBinary, ijentApi) = doDeploy()
ijentApi as IjentPosixApi
return object : DeployedIjent.Posix {
override val ijentApi: IjentPosixApi = ijentApi
@@ -37,8 +34,8 @@ suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjen
}
/** A specialized version of [com.intellij.platform.ijent.deploy] */
-suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIjent.Windows {
- val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
+suspend fun IjentDeployingStrategy.Windows.deploy(): DeployedIjent.Windows {
+ val (remotePathToBinary, ijentApi) = doDeploy()
ijentApi as IjentWindowsApi
return object : DeployedIjent.Windows {
override val ijentApi: IjentWindowsApi = ijentApi
@@ -46,20 +43,13 @@ suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIj
}
}
-/** A shortcut for terminating an [IjentApi] when the [coroutineScope] completes. */
-fun IjentApi.bindToScope(coroutineScope: CoroutineScope) {
- coroutineScope.coroutineContext.job.invokeOnCompletion {
- this@bindToScope.close()
- }
-}
-
-private suspend fun IjentDeployingStrategy.doDeploy(ijentName: String): Pair =
+private suspend fun IjentDeployingStrategy.doDeploy(): Pair =
try {
val targetPlatform = getTargetPlatform()
val remotePathToBinary = copyFile(IjentExecFileProvider.getInstance().getIjentBinary(targetPlatform))
- val process = createProcess(remotePathToBinary)
+ val mediator = createProcess(remotePathToBinary)
- val ijentApi = connectToRunningIjent(ijentName, getConnectionStrategy(), targetPlatform, process)
+ val ijentApi = connectToRunningIjent(getConnectionStrategy(), targetPlatform, mediator)
remotePathToBinary to ijentApi
}
finally {
diff --git a/platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt b/platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt
deleted file mode 100644
index 5f3efe3e8119..000000000000
--- a/platform/ijent/src/com/intellij/platform/ijent/IjentGlobalScopeHolder.kt
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
-package com.intellij.platform.ijent
-
-import com.intellij.openapi.application.ApplicationManager
-import com.intellij.openapi.components.Service
-import com.intellij.openapi.components.service
-import com.intellij.openapi.components.serviceAsync
-import kotlinx.coroutines.CoroutineScope
-
-@Service
-internal class IjentApplicationScope private constructor(scope: CoroutineScope) : CoroutineScope by scope {
- companion object {
- fun instance(): IjentApplicationScope = ApplicationManager.getApplication().service()
- suspend fun instanceAsync(): IjentApplicationScope = serviceAsync()
- }
-}
\ No newline at end of file
diff --git a/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt b/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt
index af0d335d3fcd..3927d059dc6f 100644
--- a/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt
+++ b/platform/ijent/src/com/intellij/platform/ijent/IjentSessionRegistry.kt
@@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
/**
- * This service MUST know about all running IJents.
+ * This service MUST know about a running IJent if it's going to be used through [java.nio.file.spi.FileSystemProvider].
* It also MAY know about delegates and wrappers over interfaces, if they are registered with [register].
*/
@Service(Service.Level.APP)
@@ -28,21 +28,15 @@ class IjentSessionRegistry(private val coroutineScope: CoroutineScope) {
/**
* [ijentName] is used for debugging utilities like logs and thread names.
*
- * When [oneOff] is true, [launcher] may be called at most once, and if something wrong happens with the returned IJent,
- * [get] still keeps returning that broken instance of [IjentApi].
- *
- * When [oneOff] is false, [get] checks if an already created [IjentApi] is functional, and if there's a problem, [get] calls
- * [launcher] again and remembers the new instance.
- *
- * [launcher] should use the provided coroutine scope for launching various jobs, passing to the implementation of [IjentApi], etc.
+ * If something happens with IJent, and it starts throwing [IjentUnavailableException],
+ * [launch] is called again to make a new instance of [IjentApi].
*/
fun register(
ijentName: String,
- oneOff: Boolean,
launcher: suspend (ijentId: IjentId) -> IjentApi,
): IjentId {
val ijentId = IjentId("ijent-${counter.getAndIncrement()}-${ijentName.replace(Regex("[^A-Za-z0-9-]"), "-")}")
- ijents[ijentId] = IjentBundle(launcher, null, oneOff)
+ ijents[ijentId] = IjentBundle(launcher, null, oneOff = false)
return ijentId
}
@@ -100,7 +94,12 @@ class IjentSessionRegistry(private val coroutineScope: CoroutineScope) {
oneOff = oldBundle.oneOff,
)
})!!
- return bundle.deferred!!.await()
+ try {
+ return bundle.deferred!!.await()
+ }
+ catch (err: Throwable) {
+ throw IjentUnavailableException.unwrapFromCancellationExceptions(err)
+ }
}
companion object {
diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt
index 363fdcf49a7b..b5f74a7c70fe 100644
--- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt
+++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingOverShellProcessStrategy.kt
@@ -2,15 +2,15 @@
package com.intellij.platform.ijent.spi
import com.intellij.execution.CommandLineUtil.posixQuote
-import com.intellij.openapi.application.ApplicationManager
-import com.intellij.openapi.diagnostic.*
+import com.intellij.openapi.diagnostic.debug
+import com.intellij.openapi.diagnostic.logger
+import com.intellij.openapi.diagnostic.trace
import com.intellij.platform.eel.EelPlatform
+import com.intellij.platform.ijent.IjentUnavailableException
import com.intellij.platform.ijent.getIjentGrpcArgv
-import com.intellij.util.io.computeDetached
import com.intellij.util.io.copyToAsync
import kotlinx.coroutines.*
import org.jetbrains.annotations.VisibleForTesting
-import java.io.IOException
import java.io.InputStream
import java.nio.file.Path
import kotlin.io.path.fileSize
@@ -18,18 +18,20 @@ import kotlin.io.path.inputStream
import kotlin.time.Duration.Companion.seconds
abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : IjentDeployingStrategy.Posix {
+ protected abstract val ijentLabel: String
+
/**
* If there's some bind mount, returns the path for the remote machine/container that corresponds to [path].
* Otherwise, returns null.
*/
protected abstract suspend fun mapPath(path: Path): String?
- protected abstract suspend fun createShellProcess(): ShellProcessWrapper
+ protected abstract suspend fun createShellProcess(): Process
private val myContext: Deferred = run {
var createdShellProcess: ShellProcessWrapper? = null
val context = scope.async(start = CoroutineStart.LAZY) {
- val shellProcess = createShellProcess()
+ val shellProcess = ShellProcessWrapper(IjentSessionMediator.create(scope, createShellProcess(), ijentLabel))
createdShellProcess = shellProcess
createDeployingContext(shellProcess.apply {
// The timeout is taken at random.
@@ -40,11 +42,6 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
})
}
- context.invokeOnCompletion { error ->
- if (error != null && error !is CancellationException) {
- createdShellProcess?.destroyForcibly()
- }
- }
context
}
@@ -54,7 +51,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
}
- final override suspend fun createProcess(binaryPath: String): Process {
+ final override suspend fun createProcess(binaryPath: String): IjentSessionMediator {
return myContext.await().execCommand {
execIjent(binaryPath)
}
@@ -74,9 +71,10 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
override suspend fun getConnectionStrategy(): IjentConnectionStrategy = IjentConnectionStrategy.Default
- class ShellProcessWrapper(private var wrapped: Process?) {
+ internal class ShellProcessWrapper(private var mediator: IjentSessionMediator?) {
+ @OptIn(DelicateCoroutinesApi::class)
suspend fun write(data: String) {
- val process = wrapped!!
+ val process = mediator!!.process
@Suppress("NAME_SHADOWING")
val data = if (data.endsWith("\n")) data else "$data\n"
@@ -96,7 +94,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
suspend fun readLineWithoutBuffering(): String =
withContext(Dispatchers.IO) {
val buffer = StringBuilder()
- val stream = wrapped!!.inputStream
+ val stream = mediator!!.process.inputStream
while (true) {
ensureActive()
val c = stream.read()
@@ -112,7 +110,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
suspend fun copyDataFrom(stream: InputStream) {
- val process = wrapped!!
+ val process = mediator!!.process
withContext(Dispatchers.IO) {
stream.copyToAsync(process.outputStream)
ensureActive()
@@ -120,17 +118,22 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
}
- fun destroyForcibly() {
- wrapped!!.destroyForcibly()
+ @OptIn(InternalCoroutinesApi::class)
+ suspend fun destroyForciblyAndGetError(): Throwable {
+ mediator!!.process.destroyForcibly()
+ try {
+ val job = mediator!!.ijentProcessScope.coroutineContext.job
+ job.join()
+ throw job.getCancellationException()
+ }
+ catch (err: Throwable) {
+ return IjentUnavailableException.unwrapFromCancellationExceptions(err)
+ }
}
- @OptIn(DelicateCoroutinesApi::class)
- suspend fun readWholeErrorStream(): ByteArray =
- computeDetached { wrapped!!.errorStream.readAllBytes() }
-
- fun extractProcess(): Process {
- val result = wrapped!!
- wrapped = null
+ fun extractProcess(): IjentSessionMediator {
+ val result = mediator!!
+ mediator = null
return result
}
}
@@ -140,24 +143,26 @@ private suspend fun DeployingContextAndShell.execCommand(block: suspen
return try {
block()
}
- catch (err: Throwable) {
- runCatching { process.destroyForcibly() }.exceptionOrNull()?.let(err::addSuppressed)
+ catch (initialErrorFromStack: Throwable) {
+ val errorFromScope = process.destroyForciblyAndGetError()
+ val errorFromStack = IjentUnavailableException.unwrapFromCancellationExceptions(initialErrorFromStack)
- val attachment = Attachment("stderr", String(process.readWholeErrorStream()))
- attachment.isIncluded = attachment.isIncluded or ApplicationManager.getApplication().isInternal
+ // It happens sometimes that some valuable error is wrapped into CancellationException.
+ // However, if there's no valuable error,
+ // then it's a fair CancellationException that should be thrown futher, to cancel the context.
+ val (mainError, suppressedError) =
+ when (errorFromStack) {
+ is IjentUnavailableException, is CancellationException -> errorFromStack to errorFromScope
+ else -> errorFromScope to errorFromStack
+ }
- val errorWithAttachments = RuntimeExceptionWithAttachments(err.message ?: "", err, attachment)
+ if (mainError != suppressedError) {
+ mainError.addSuppressed(suppressedError)
+ }
- // TODO Suppress RuntimeExceptionWithAttachments instead of wrapping when KT-66006 is resolved.
- //err.addSuppressed(RuntimeExceptionWithAttachments(
- // "The error happened during handling $process",
- // Attachment("stderr", stderr.toString()).apply { isIncluded = isIncluded or ApplicationManager.getApplication().isInternal },
- //))
- //throw err
-
- throw when (err) {
- is TimeoutCancellationException, is IOException -> IjentStartupError.CommunicationError(errorWithAttachments)
- else -> errorWithAttachments
+ throw when (mainError) {
+ is IjentUnavailableException, is CancellationException -> mainError
+ else -> IjentStartupError.CommunicationError(mainError)
}
}
}
@@ -348,7 +353,7 @@ private suspend fun DeployingContextAndShell.uploadIjentBinary(
return process.readLineWithoutBuffering()
}
-private suspend fun DeployingContextAndShell.execIjent(remotePathToBinary: String): Process {
+private suspend fun DeployingContextAndShell.execIjent(remotePathToBinary: String): IjentSessionMediator {
val joinedCmd = getIjentGrpcArgv(remotePathToBinary, selfDeleteOnExit = true, usrBinEnv = context.env).joinToString(" ")
val commandLineArgs = context.run {
"""
diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt
index 9ef3e0d3ccef..f60c6a6b514a 100644
--- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt
+++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentDeployingStrategy.kt
@@ -48,7 +48,7 @@ interface IjentDeployingStrategy {
* @param binaryPath path to ijent binary on target environment
* @return process that will be used for communication
*/
- suspend fun createProcess(binaryPath: String): Process
+ suspend fun createProcess(binaryPath: String): IjentSessionMediator
/**
* Copy files to the target environment. Typically used to transfer the ijent binary to the target machine.
diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt
index 06459a62e9aa..ab0a99151e26 100644
--- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt
+++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionMediator.kt
@@ -4,9 +4,9 @@ package com.intellij.platform.ijent.spi
import com.intellij.openapi.diagnostic.Attachment
import com.intellij.openapi.diagnostic.debug
import com.intellij.openapi.diagnostic.logger
-import com.intellij.platform.ijent.IjentApplicationScope
-import com.intellij.platform.ijent.IjentId
+import com.intellij.openapi.progress.Cancellation
import com.intellij.platform.ijent.IjentUnavailableException
+import com.intellij.platform.ijent.coroutineNameAppended
import com.intellij.platform.util.coroutines.childScope
import com.intellij.util.containers.ContainerUtil
import com.intellij.util.io.awaitExit
@@ -22,8 +22,6 @@ import java.time.ZonedDateTime
import java.time.format.DateTimeParseException
import java.util.*
import java.util.concurrent.TimeUnit
-import kotlin.coroutines.AbstractCoroutineContextElement
-import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration
@@ -34,6 +32,10 @@ import kotlin.time.toKotlinDuration
*
* [processExit] never throws. When it completes, it either means that the process has finished, or that the whole scope of IJent processes
* is canceled.
+ *
+ * [ijentProcessScope] should be used by the [com.intellij.platform.ijent.IjentApi] implementation for launching internal coroutines.
+ * No matter if IJent exits expectedly or not, an attempt to do anything with [ijentProcessScope] after the IJent has exited
+ * throws [IjentUnavailableException].
*/
class IjentSessionMediator private constructor(
val ijentProcessScope: CoroutineScope,
@@ -58,40 +60,85 @@ class IjentSessionMediator private constructor(
/**
* See the docs of [IjentSessionMediator].
*
- * [ijentId] is used only for logging.
+ * [ijentLabel] is used only for logging.
+ *
+ * Beware that [parentScope] receives [IjentUnavailableException.CommunicationFailure] if IJent _suddenly_ exits, f.i., after SIGKILL.
+ * Nothing happens with [parentScope] if IJent exits expectedly, f.i., after [com.intellij.platform.ijent.IjentApi.close].
*/
@OptIn(DelicateCoroutinesApi::class)
- fun create(process: Process, ijentId: IjentId): IjentSessionMediator {
+ fun create(parentScope: CoroutineScope, process: Process, ijentLabel: String): IjentSessionMediator {
+ // TODO What about https://youtrack.jetbrains.com/issue/IJPL-156891 ?
+
+ require(parentScope.coroutineContext[Job] != null) {
+ "Scope $parentScope has no Job"
+ }
+ val context = IjentThreadPool.asCoroutineDispatcher()
+ val ijentProcessScope = run {
+ // Prevents from logging the error by the default exception handler.
+ // Errors are logged explicitly in this function.
+ val dummyExceptionHandler = CoroutineExceptionHandler { _, err -> /* nothing */ }
+
+ // This supervisor scope exists only to prevent automatic propagation of IjentUnavailableException to the parent scope.
+ // Instead, there's a logic below that decides if a specific IjentUnavailableException should be propagated to the parent scope.
+ val trickySupervisorScope = parentScope.childScope(ijentLabel, context + dummyExceptionHandler, supervisor = true)
+
+ val ijentProcessScope = trickySupervisorScope.childScope(ijentLabel, supervisor = false)
+
+ ijentProcessScope.coroutineContext.job.invokeOnCompletion { err ->
+ trickySupervisorScope.cancel()
+
+ if (err != null) {
+ val propagateToParentScope = when (err) {
+ is IjentUnavailableException -> when (err) {
+ is IjentUnavailableException.ClosedByApplication -> false
+ is IjentUnavailableException.CommunicationFailure -> !err.exitedExpectedly
+ }
+ else -> true
+ }
+
+ if (propagateToParentScope) {
+ try {
+ err.addSuppressed(Throwable("Rethrown from here"))
+ parentScope.launch(start = CoroutineStart.UNDISPATCHED) {
+ throw err
+ }
+ }
+ catch (_: Throwable) {
+ // It seems that the scope has already been canceled with something else.
+ }
+ }
+
+ // TODO Callers should be able to define their own exception handlers.
+ logIjentError(ijentLabel, err)
+ }
+ }
+
+ ijentProcessScope
+ }
+
val lastStderrMessages = MutableSharedFlow(
replay = 30,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
- val exceptionHandler = IjentSessionCoroutineExceptionHandler(ijentId)
- val connectionScope = IjentApplicationScope.instance().childScope(
- "ijent $ijentId > connection scope",
- supervisor = false,
- context = exceptionHandler + IjentThreadPool.asCoroutineDispatcher(),
- )
-
// stderr logger should outlive the current scope. In case if an error appears, the scope is cancelled immediately, but the whole
// intention of the stderr logger is to write logs of the remote process, which come from the remote machine to the local one with
// a delay.
- GlobalScope.launch(blockingDispatcher + CoroutineName("ijent $ijentId > stderr logger")) {
- ijentProcessStderrLogger(process, ijentId, lastStderrMessages)
+ GlobalScope.launch(blockingDispatcher + ijentProcessScope.coroutineNameAppended("stderr logger")) {
+ ijentProcessStderrLogger(process, ijentLabel, lastStderrMessages)
}
val processExit = CompletableDeferred()
- val mediator = IjentSessionMediator(connectionScope, process, processExit)
+ val mediator = IjentSessionMediator(ijentProcessScope, process, processExit)
- val awaiterScope = IjentApplicationScope.instance().launch(CoroutineName("ijent $ijentId > exit awaiter scope") + exceptionHandler) {
- ijentProcessExitAwaiter(ijentId, mediator, lastStderrMessages)
+ val awaiterScope = ijentProcessScope.launch(context = context + ijentProcessScope.coroutineNameAppended("exit awaiter scope")) {
+ ijentProcessExitAwaiter(ijentLabel, mediator, lastStderrMessages)
}
- val finalizerScope = connectionScope.launch(CoroutineName("ijent $ijentId > finalizer scope")) {
- ijentProcessFinalizer(ijentId, mediator)
+ val finalizerScope = ijentProcessScope.launch(context = context + ijentProcessScope.coroutineNameAppended("finalizer scope")) {
+ ijentProcessFinalizer(ijentLabel, mediator)
}
awaiterScope.invokeOnCompletion { err ->
@@ -104,21 +151,19 @@ class IjentSessionMediator private constructor(
}
}
-private class IjentSessionCoroutineExceptionHandler(
- private val ijentId: IjentId,
-) : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
- private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap())
+private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap())
- override fun toString(): String = javaClass.simpleName
-
- override fun handleException(context: CoroutineContext, exception: Throwable) {
+private fun logIjentError(ijentLabel: String, exception: Throwable) {
+ // The logger can create new services, and since this function is called inside an already failed coroutine context,
+ // service creation would be impossible without `executeInNonCancelableSection`.
+ Cancellation.executeInNonCancelableSection {
when (exception) {
is IjentUnavailableException -> when (exception) {
is IjentUnavailableException.ClosedByApplication -> Unit
is IjentUnavailableException.CommunicationFailure -> {
if (!exception.exitedExpectedly && loggedErrors.add(exception)) {
- LOG.error("Exception in connection with IJent $ijentId: ${exception.message}", exception)
+ LOG.error("Exception in connection with IJent $ijentLabel: ${exception.message}", exception)
}
}
}
@@ -127,27 +172,27 @@ private class IjentSessionCoroutineExceptionHandler(
else -> {
if (loggedErrors.add(exception)) {
- LOG.error("Unexpected error during communnication with IJent $ijentId", exception)
+ LOG.error("Unexpected error during communnication with IJent $ijentLabel", exception)
}
}
}
}
}
-private suspend fun ijentProcessStderrLogger(process: Process, ijentId: IjentId, lastStderrMessages: MutableSharedFlow) {
+private suspend fun ijentProcessStderrLogger(process: Process, ijentLabel: String, lastStderrMessages: MutableSharedFlow) {
try {
process.errorStream.reader().useLines { lines ->
for (line in lines) {
yield()
if (line.isNotEmpty()) {
- logIjentStderr(ijentId, line)
+ logIjentStderr(ijentLabel, line)
lastStderrMessages.emit(line)
}
}
}
}
catch (err: IOException) {
- LOG.debug { "$ijentId bootstrap got an error: $err" }
+ LOG.debug { "$ijentLabel bootstrap got an error: $err" }
}
finally {
lastStderrMessages.emit(null)
@@ -165,13 +210,13 @@ private val ijentLogMessageRegex = Regex(
RegexOption.COMMENTS,
)
-private fun logIjentStderr(ijentId: IjentId, line: String) {
+private fun logIjentStderr(ijentLabel: String, line: String) {
val hostDateTime = ZonedDateTime.now()
val (rawRemoteDateTime, level, message) =
ijentLogMessageRegex.matchEntire(line)?.destructured
?: run {
- LOG.debug { "$ijentId log: $line" }
+ LOG.debug { "$ijentLabel log: $line" }
return
}
@@ -179,7 +224,7 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
java.time.Duration.between(ZonedDateTime.parse(rawRemoteDateTime), hostDateTime).toKotlinDuration()
}
catch (_: DateTimeParseException) {
- LOG.debug { "$ijentId log: $line" }
+ LOG.debug { "$ijentLabel log: $line" }
return
}
@@ -192,7 +237,7 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
}
logger(buildString {
- append(ijentId)
+ append(ijentLabel)
append(" log: ")
if (dateTimeDiff.absoluteValue > 50.milliseconds) { // The timeout is taken at random.
append(rawRemoteDateTime)
@@ -205,12 +250,12 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
}
private suspend fun ijentProcessExitAwaiter(
- ijentId: IjentId,
+ ijentLabel: String,
mediator: IjentSessionMediator,
lastStderrMessages: MutableSharedFlow,
): Nothing {
val exitCode = mediator.process.awaitExit()
- LOG.debug { "IJent process $ijentId exited with code $exitCode" }
+ LOG.debug { "IJent process $ijentLabel exited with code $exitCode" }
val isExitExpected = when (mediator.expectedErrorCode) {
IjentSessionMediator.ExpectedErrorCode.NO -> false
@@ -233,7 +278,7 @@ private suspend fun ijentProcessExitAwaiter(
}
}
IjentUnavailableException.CommunicationFailure(
- "The process $ijentId suddenly exited with the code $exitCode",
+ "The process $ijentLabel suddenly exited with the code $exitCode",
Attachment("stderr", stderr.toString()),
)
}
@@ -250,37 +295,41 @@ private suspend fun collectLines(lastStderrMessages: SharedFlow, stderr
}
@OptIn(DelicateCoroutinesApi::class)
-private suspend fun ijentProcessFinalizer(ijentId: IjentId, mediator: IjentSessionMediator): Nothing {
+private suspend fun ijentProcessFinalizer(ijentLabel: String, mediator: IjentSessionMediator): Nothing {
try {
awaitCancellation()
}
catch (err: Exception) {
- throw when (val cause = generateSequence(err, Throwable::cause).firstOrNull { it !is CancellationException }) {
- null -> err
- is IjentUnavailableException -> cause
- else -> {
- LOG.debug(err) { "$ijentId is going to be terminated due to receiving an error" }
- IjentUnavailableException.CommunicationFailure("IJent communication terminated due to an error", err)
- }
+ val actualErrors = generateSequence(err, Throwable::cause).filterTo(mutableListOf()) { it !is CancellationException }
+
+ val existingIjentUnavailableException = actualErrors.filterIsInstance().firstOrNull()
+ if (existingIjentUnavailableException != null) {
+ throw existingIjentUnavailableException
}
+
+ val cause = actualErrors.firstOrNull() ?: err
+ val message =
+ if (cause is CancellationException) "The coroutine scope of $ijentLabel was cancelled"
+ else "IJent communication terminated due to an error"
+ throw IjentUnavailableException.ClosedByApplication(message, cause)
}
finally {
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ANY
val process = mediator.process
if (process.isAlive) {
- GlobalScope.launch(Dispatchers.IO + CoroutineName("$ijentId destruction")) {
+ GlobalScope.launch(Dispatchers.IO + CoroutineName("$ijentLabel destruction")) {
try {
process.waitFor(5, TimeUnit.SECONDS) // A random timeout.
}
finally {
if (process.isAlive) {
- LOG.warn("The process $ijentId is still alive, it will be killed")
+ LOG.warn("The process $ijentLabel is still alive, it will be killed")
process.destroy()
}
}
}
GlobalScope.launch(Dispatchers.IO) {
- LOG.debug { "Closing stdin of $ijentId" }
+ LOG.debug { "Closing stdin of $ijentLabel" }
process.outputStream.close()
}
}
diff --git a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt
index 349f846d9b63..cfb9620fe5b0 100644
--- a/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt
+++ b/platform/ijent/src/com/intellij/platform/ijent/spi/IjentSessionProvider.kt
@@ -50,22 +50,24 @@ internal class DefaultIjentSessionProvider : IjentSessionProvider {
* [ijentName] is used for debugging utilities like logs and thread names.
*
* The process terminates automatically only when the IDE exits, or if [IjentApi.close] is called explicitly.
- * [com.intellij.platform.ijent.bindToScope] may be useful for terminating the IJent process earlier.
*/
-suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform, process: Process): IjentApi {
- val ijentSessionRegistry = IjentSessionRegistry.instanceAsync()
- val ijentId = ijentSessionRegistry.register(ijentName, oneOff = true) { ijentId ->
- val mediator = IjentSessionMediator.create(process, ijentId)
- mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
- IjentSessionProvider.instanceAsync().connect(strategy, platform, mediator)
- }
- return ijentSessionRegistry.get(ijentId)
+suspend fun connectToRunningIjent(strategy: IjentConnectionStrategy, platform: EelPlatform, mediator: IjentSessionMediator): IjentApi {
+ mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
+ return IjentSessionProvider.instanceAsync().connect(strategy, platform, mediator)
}
/** A specialized overload of [connectToRunningIjent] */
-suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform.Posix, process: Process): IjentPosixApi =
- connectToRunningIjent(ijentName, strategy, platform as EelPlatform, process) as IjentPosixApi
+suspend fun connectToRunningIjent(
+ strategy: IjentConnectionStrategy,
+ platform: EelPlatform.Posix,
+ mediator: IjentSessionMediator,
+): IjentPosixApi =
+ connectToRunningIjent(strategy, platform as EelPlatform, mediator) as IjentPosixApi
/** A specialized overload of [connectToRunningIjent] */
-suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform.Windows, process: Process): IjentWindowsApi =
- connectToRunningIjent(ijentName, strategy, platform as EelPlatform, process) as IjentWindowsApi
+suspend fun connectToRunningIjent(
+ strategy: IjentConnectionStrategy,
+ platform: EelPlatform.Windows,
+ mediator: IjentSessionMediator,
+): IjentWindowsApi =
+ connectToRunningIjent(strategy, platform as EelPlatform, mediator) as IjentWindowsApi
diff --git a/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml b/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml
new file mode 100644
index 000000000000..7573f0de9487
--- /dev/null
+++ b/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt b/platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt
new file mode 100644
index 000000000000..4062a396fef4
--- /dev/null
+++ b/platform/ijent/testFramework/src/com/intellij/ijent/testFramework/wsl/WslIjentTestUtil.kt
@@ -0,0 +1,51 @@
+// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
+@file:JvmName("WslIjentTestUtil")
+package com.intellij.ijent.testFramework.wsl
+
+import com.intellij.execution.wsl.ProductionWslIjentManager
+import com.intellij.execution.wsl.WslIjentManager
+import com.intellij.execution.wsl.ijent.nio.toggle.IjentWslNioFsToggler
+import com.intellij.openapi.Disposable
+import com.intellij.openapi.application.ApplicationManager
+import com.intellij.openapi.util.Disposer
+import com.intellij.platform.util.coroutines.childScope
+import com.intellij.testFramework.replaceService
+import kotlinx.coroutines.*
+import java.util.concurrent.CancellationException
+
+fun replaceProductionWslIjentManager(disposable: Disposable) {
+ replaceService(WslIjentManager::class.java, ::ProductionWslIjentManager, disposable)
+}
+
+fun replaceProductionWslIjentManager(newServiceScope: CoroutineScope) {
+ replaceService(WslIjentManager::class.java, ::ProductionWslIjentManager, newServiceScope)
+}
+
+fun replaceIjentWslNioFsToggler(newServiceScope: CoroutineScope) {
+ replaceService(IjentWslNioFsToggler::class.java, ::IjentWslNioFsToggler, newServiceScope)
+}
+
+private fun replaceService(iface: Class, constructor: (CoroutineScope) -> T, newServiceScope: CoroutineScope) {
+ val disposable = Disposer.newDisposable(newServiceScope.toString())
+ newServiceScope.coroutineContext.job.invokeOnCompletion {
+ Disposer.dispose(disposable)
+ }
+ ApplicationManager.getApplication().replaceService(
+ iface,
+ constructor(newServiceScope),
+ disposable,
+ )
+}
+
+@OptIn(DelicateCoroutinesApi::class)
+private fun replaceService(iface: Class, constructor: (CoroutineScope) -> T, disposable: Disposable) {
+ val newServiceScope = GlobalScope.childScope("Disposable $disposable", supervisor = true)
+ Disposer.register(disposable) {
+ newServiceScope.cancel(CancellationException("Disposed $disposable"))
+ }
+ ApplicationManager.getApplication().replaceService(
+ iface,
+ constructor(newServiceScope),
+ disposable,
+ )
+}
\ No newline at end of file
diff --git a/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt b/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt
index a505fdeb766f..561bd62cc54d 100644
--- a/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt
+++ b/platform/platform-impl/src/com/intellij/execution/wsl/ProductionWslIjentManager.kt
@@ -6,7 +6,6 @@ import com.intellij.openapi.project.Project
import com.intellij.platform.ijent.IjentId
import com.intellij.platform.ijent.IjentPosixApi
import com.intellij.platform.ijent.IjentSessionRegistry
-import com.intellij.platform.ijent.bindToScope
import com.intellij.platform.ijent.spi.IjentThreadPool
import com.intellij.platform.util.coroutines.childScope
import kotlinx.coroutines.CoroutineScope
@@ -37,9 +36,17 @@ class ProductionWslIjentManager(private val scope: CoroutineScope) : WslIjentMan
override suspend fun getIjentApi(wslDistribution: WSLDistribution, project: Project?, rootUser: Boolean): IjentPosixApi {
val ijentSessionRegistry = IjentSessionRegistry.instanceAsync()
val ijentId = myCache.computeIfAbsent("""wsl:${wslDistribution.id}${if (rootUser) ":root" else ""}""") { ijentName ->
- val ijentId = ijentSessionRegistry.register(ijentName, oneOff = false) {
- val ijent = deployAndLaunchIjent(project, wslDistribution, wslCommandLineOptionsModifier = { it.setSudo(rootUser) })
- ijent.bindToScope(scope)
+ val ijentId = ijentSessionRegistry.register(ijentName) { ijentId ->
+ val ijent = deployAndLaunchIjent(
+ scope,
+ project,
+ ijentId.toString(),
+ wslDistribution,
+ wslCommandLineOptionsModifier = { it.setSudo(rootUser) },
+ )
+ scope.coroutineContext.job.invokeOnCompletion {
+ ijent.close()
+ }
ijent
}
scope.coroutineContext.job.invokeOnCompletion {
@@ -51,6 +58,12 @@ class ProductionWslIjentManager(private val scope: CoroutineScope) : WslIjentMan
return ijentSessionRegistry.get(ijentId) as IjentPosixApi
}
+ init {
+ scope.coroutineContext.job.invokeOnCompletion {
+ dropCache()
+ }
+ }
+
@VisibleForTesting
fun dropCache() {
val ijentSessionRegistry = serviceIfCreated()
diff --git a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt
index 5d8cc6b7f083..a2f95b134c02 100644
--- a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt
+++ b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentDeployingStrategy.kt
@@ -16,6 +16,7 @@ import java.nio.file.Path
@ApiStatus.Internal
class WslIjentDeployingStrategy(
scope: CoroutineScope,
+ override val ijentLabel: String,
private val distribution: WSLDistribution,
private val project: Project?,
private val wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {}
@@ -24,7 +25,7 @@ class WslIjentDeployingStrategy(
distribution.getWslPath(path)
@OptIn(IntellijInternalApi::class, DelicateCoroutinesApi::class)
- override suspend fun createShellProcess(): ShellProcessWrapper {
+ override suspend fun createShellProcess(): Process {
// IJent can start an interactive shell by itself whenever it needs.
// Enabling an interactive shell for IJent by default can bring problems, because stdio of IJent must not be populated
// with possible user extensions in ~/.profile
@@ -38,7 +39,7 @@ class WslIjentDeployingStrategy(
val commandLine = WSLDistribution.neverRunTTYFix(GeneralCommandLine("/bin/sh"))
distribution.doPatchCommandLine(commandLine, project, wslCommandLineOptions)
- return ShellProcessWrapper(computeDetached { commandLine.createProcess() })
+ return computeDetached { commandLine.createProcess() }
}
override suspend fun getConnectionStrategy(): IjentConnectionStrategy {
diff --git a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt
index 4af00b82b85a..e9b679c84b11 100644
--- a/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt
+++ b/platform/platform-impl/src/com/intellij/execution/wsl/WslIjentManager.kt
@@ -12,7 +12,6 @@ import com.intellij.platform.ijent.deploy
import com.intellij.platform.ijent.spi.DeployedIjent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
-import kotlinx.coroutines.coroutineScope
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.VisibleForTesting
@@ -51,24 +50,28 @@ interface WslIjentManager {
}
}
-suspend fun deployAndLaunchIjent(
+internal suspend fun deployAndLaunchIjent(
+ parentScope: CoroutineScope,
project: Project?,
+ ijentLabel: String,
wslDistribution: WSLDistribution,
wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {},
-): IjentPosixApi = deployAndLaunchIjentGettingPath(project, wslDistribution, wslCommandLineOptionsModifier).ijentApi
+): IjentPosixApi =
+ deployAndLaunchIjentGettingPath(parentScope, project, ijentLabel, wslDistribution, wslCommandLineOptionsModifier).ijentApi
@VisibleForTesting
suspend fun deployAndLaunchIjentGettingPath(
+ parentScope: CoroutineScope,
project: Project?,
+ ijentLabel: String,
wslDistribution: WSLDistribution,
wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {},
): DeployedIjent.Posix {
- return coroutineScope {
- WslIjentDeployingStrategy(
- scope = this,
- distribution = wslDistribution,
- project = project,
- wslCommandLineOptionsModifier = wslCommandLineOptionsModifier
- ).deploy("WSL-${wslDistribution.id}")
- }
+ return WslIjentDeployingStrategy(
+ scope = parentScope,
+ ijentLabel = ijentLabel,
+ distribution = wslDistribution,
+ project = project,
+ wslCommandLineOptionsModifier = wslCommandLineOptionsModifier
+ ).deploy()
}
\ No newline at end of file