IJent: Refactor IjentDeployingStrategy to have stderr reported during shell bootstrap failure

There was a problem that `com.intellij.platform.ijent.spi.IjentDeployingOverShellProcessStrategy.ShellProcessWrapper#readWholeErrorStream` used to start reading stderr when the shell process is already dead, so in case of problems there was no stderr.

The new refactoring brings `IjentSessionMediator` into `IjentDeployingStrategy`, and `IjentSessionMediator` knows how to collect stderr correctly.

GitOrigin-RevId: 26563aa82b8842ae945ca37ce19c2fc23c7492f8
This commit is contained in:
Vladimir Lagunov
2024-09-05 13:53:53 +02:00
committed by intellij-monorepo-bot
parent 9ed3f287d0
commit 04c4bee35a
14 changed files with 289 additions and 174 deletions

1
.idea/modules.xml generated
View File

@@ -685,6 +685,7 @@
<module fileurl="file://$PROJECT_DIR$/platform/ijent/intellij.platform.ijent.iml" filepath="$PROJECT_DIR$/platform/ijent/intellij.platform.ijent.iml" />
<module fileurl="file://$PROJECT_DIR$/platform/ijent/buildConstants/intellij.platform.ijent.community.buildConstants.iml" filepath="$PROJECT_DIR$/platform/ijent/buildConstants/intellij.platform.ijent.community.buildConstants.iml" />
<module fileurl="file://$PROJECT_DIR$/platform/ijent/impl/intellij.platform.ijent.community.impl.iml" filepath="$PROJECT_DIR$/platform/ijent/impl/intellij.platform.ijent.community.impl.iml" />
<module fileurl="file://$PROJECT_DIR$/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml" filepath="$PROJECT_DIR$/platform/ijent/testFramework/intellij.platform.ijent.community.testFramework.iml" />
<module fileurl="file://$PROJECT_DIR$/images/intellij.platform.images.iml" filepath="$PROJECT_DIR$/images/intellij.platform.images.iml" />
<module fileurl="file://$PROJECT_DIR$/images/backend.svg/intellij.platform.images.backend.svg.iml" filepath="$PROJECT_DIR$/images/backend.svg/intellij.platform.images.backend.svg.iml" />
<module fileurl="file://$PROJECT_DIR$/platform/build-scripts/icons/intellij.platform.images.build.iml" filepath="$PROJECT_DIR$/platform/build-scripts/icons/intellij.platform.images.build.iml" />

View File

@@ -58,8 +58,8 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() {
try {
withModalProgress(modalTaskOwner, e.presentation.text, TaskCancellation.cancellable()) {
coroutineScope {
val (title, deployingStrategy) = deployingStrategy()
deployingStrategy.deploy("IjentVerificationAction").ijentApi.use { ijent ->
val (title, deployingStrategy) = deployingStrategy(this)
deployingStrategy.deploy().ijentApi.use { ijent ->
coroutineScope {
launch {
val info = ijent.ijentProcessInfo
@@ -115,7 +115,7 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() {
}
}
protected abstract suspend fun deployingStrategy(): Pair<String, IjentDeployingStrategy>
protected abstract suspend fun deployingStrategy(ijentProcessScope: CoroutineScope): Pair<String, IjentDeployingStrategy>
companion object {
protected val LOG = logger<AbstractIjentVerificationAction>()

View File

@@ -5,21 +5,18 @@ package com.intellij.platform.ijent
import com.intellij.platform.ijent.spi.DeployedIjent
import com.intellij.platform.ijent.spi.IjentDeployingStrategy
import com.intellij.platform.ijent.spi.connectToRunningIjent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.job
/**
* Starts IJent on some machine, defined by [com.intellij.platform.ijent.spi.IjentDeployingStrategy].
*
* [ijentName] is a label used for logging, debugging, thread names, etc.
*
* By default, the IJent executable exits when the IDE exits.
* [com.intellij.platform.eel.IjentApi.close] and [com.intellij.platform.ijent.bindToScope] may be used to terminate IJent earlier.
* By default, the IJent executable exits when
* the coroutine scope of [IjentProcessMediator] from [IjentDeployingStrategy.createProcess] exits.
* [com.intellij.platform.ijent.IjentApi.close] may be used to terminate IJent earlier.
*
* TODO Either define thrown exceptions or return something like Result.
*/
suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent {
val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
suspend fun IjentDeployingStrategy.deploy(): DeployedIjent {
val (remotePathToBinary, ijentApi) = doDeploy()
return object : DeployedIjent {
override val ijentApi: IjentApi = ijentApi
override val remotePathToBinary: String = remotePathToBinary
@@ -27,8 +24,8 @@ suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent {
}
/** A specialized version of [com.intellij.platform.ijent.deploy] */
suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjent.Posix {
val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
suspend fun IjentDeployingStrategy.Posix.deploy(): DeployedIjent.Posix {
val (remotePathToBinary, ijentApi) = doDeploy()
ijentApi as IjentPosixApi
return object : DeployedIjent.Posix {
override val ijentApi: IjentPosixApi = ijentApi
@@ -37,8 +34,8 @@ suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjen
}
/** A specialized version of [com.intellij.platform.ijent.deploy] */
suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIjent.Windows {
val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
suspend fun IjentDeployingStrategy.Windows.deploy(): DeployedIjent.Windows {
val (remotePathToBinary, ijentApi) = doDeploy()
ijentApi as IjentWindowsApi
return object : DeployedIjent.Windows {
override val ijentApi: IjentWindowsApi = ijentApi
@@ -46,20 +43,13 @@ suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIj
}
}
/** A shortcut for terminating an [IjentApi] when the [coroutineScope] completes. */
fun IjentApi.bindToScope(coroutineScope: CoroutineScope) {
coroutineScope.coroutineContext.job.invokeOnCompletion {
this@bindToScope.close()
}
}
private suspend fun IjentDeployingStrategy.doDeploy(ijentName: String): Pair<String, IjentApi> =
private suspend fun IjentDeployingStrategy.doDeploy(): Pair<String, IjentApi> =
try {
val targetPlatform = getTargetPlatform()
val remotePathToBinary = copyFile(IjentExecFileProvider.getInstance().getIjentBinary(targetPlatform))
val process = createProcess(remotePathToBinary)
val mediator = createProcess(remotePathToBinary)
val ijentApi = connectToRunningIjent(ijentName, getConnectionStrategy(), targetPlatform, process)
val ijentApi = connectToRunningIjent(getConnectionStrategy(), targetPlatform, mediator)
remotePathToBinary to ijentApi
}
finally {

View File

@@ -1,16 +0,0 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.components.Service
import com.intellij.openapi.components.service
import com.intellij.openapi.components.serviceAsync
import kotlinx.coroutines.CoroutineScope
@Service
internal class IjentApplicationScope private constructor(scope: CoroutineScope) : CoroutineScope by scope {
companion object {
fun instance(): IjentApplicationScope = ApplicationManager.getApplication().service()
suspend fun instanceAsync(): IjentApplicationScope = serviceAsync()
}
}

View File

@@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
/**
* This service MUST know about all running IJents.
* This service MUST know about a running IJent if it's going to be used through [java.nio.file.spi.FileSystemProvider].
* It also MAY know about delegates and wrappers over interfaces, if they are registered with [register].
*/
@Service(Service.Level.APP)
@@ -28,21 +28,15 @@ class IjentSessionRegistry(private val coroutineScope: CoroutineScope) {
/**
* [ijentName] is used for debugging utilities like logs and thread names.
*
* When [oneOff] is true, [launcher] may be called at most once, and if something wrong happens with the returned IJent,
* [get] still keeps returning that broken instance of [IjentApi].
*
* When [oneOff] is false, [get] checks if an already created [IjentApi] is functional, and if there's a problem, [get] calls
* [launcher] again and remembers the new instance.
*
* [launcher] should use the provided coroutine scope for launching various jobs, passing to the implementation of [IjentApi], etc.
* If something happens with IJent, and it starts throwing [IjentUnavailableException],
* [launch] is called again to make a new instance of [IjentApi].
*/
fun register(
ijentName: String,
oneOff: Boolean,
launcher: suspend (ijentId: IjentId) -> IjentApi,
): IjentId {
val ijentId = IjentId("ijent-${counter.getAndIncrement()}-${ijentName.replace(Regex("[^A-Za-z0-9-]"), "-")}")
ijents[ijentId] = IjentBundle(launcher, null, oneOff)
ijents[ijentId] = IjentBundle(launcher, null, oneOff = false)
return ijentId
}
@@ -100,7 +94,12 @@ class IjentSessionRegistry(private val coroutineScope: CoroutineScope) {
oneOff = oldBundle.oneOff,
)
})!!
return bundle.deferred!!.await()
try {
return bundle.deferred!!.await()
}
catch (err: Throwable) {
throw IjentUnavailableException.unwrapFromCancellationExceptions(err)
}
}
companion object {

View File

@@ -2,15 +2,15 @@
package com.intellij.platform.ijent.spi
import com.intellij.execution.CommandLineUtil.posixQuote
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.diagnostic.*
import com.intellij.openapi.diagnostic.debug
import com.intellij.openapi.diagnostic.logger
import com.intellij.openapi.diagnostic.trace
import com.intellij.platform.eel.EelPlatform
import com.intellij.platform.ijent.IjentUnavailableException
import com.intellij.platform.ijent.getIjentGrpcArgv
import com.intellij.util.io.computeDetached
import com.intellij.util.io.copyToAsync
import kotlinx.coroutines.*
import org.jetbrains.annotations.VisibleForTesting
import java.io.IOException
import java.io.InputStream
import java.nio.file.Path
import kotlin.io.path.fileSize
@@ -18,18 +18,20 @@ import kotlin.io.path.inputStream
import kotlin.time.Duration.Companion.seconds
abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : IjentDeployingStrategy.Posix {
protected abstract val ijentLabel: String
/**
* If there's some bind mount, returns the path for the remote machine/container that corresponds to [path].
* Otherwise, returns null.
*/
protected abstract suspend fun mapPath(path: Path): String?
protected abstract suspend fun createShellProcess(): ShellProcessWrapper
protected abstract suspend fun createShellProcess(): Process
private val myContext: Deferred<DeployingContextAndShell> = run {
var createdShellProcess: ShellProcessWrapper? = null
val context = scope.async(start = CoroutineStart.LAZY) {
val shellProcess = createShellProcess()
val shellProcess = ShellProcessWrapper(IjentSessionMediator.create(scope, createShellProcess(), ijentLabel))
createdShellProcess = shellProcess
createDeployingContext(shellProcess.apply {
// The timeout is taken at random.
@@ -40,11 +42,6 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
})
}
context.invokeOnCompletion { error ->
if (error != null && error !is CancellationException) {
createdShellProcess?.destroyForcibly()
}
}
context
}
@@ -54,7 +51,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
}
final override suspend fun createProcess(binaryPath: String): Process {
final override suspend fun createProcess(binaryPath: String): IjentSessionMediator {
return myContext.await().execCommand {
execIjent(binaryPath)
}
@@ -74,9 +71,10 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
override suspend fun getConnectionStrategy(): IjentConnectionStrategy = IjentConnectionStrategy.Default
class ShellProcessWrapper(private var wrapped: Process?) {
internal class ShellProcessWrapper(private var mediator: IjentSessionMediator?) {
@OptIn(DelicateCoroutinesApi::class)
suspend fun write(data: String) {
val process = wrapped!!
val process = mediator!!.process
@Suppress("NAME_SHADOWING")
val data = if (data.endsWith("\n")) data else "$data\n"
@@ -96,7 +94,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
suspend fun readLineWithoutBuffering(): String =
withContext(Dispatchers.IO) {
val buffer = StringBuilder()
val stream = wrapped!!.inputStream
val stream = mediator!!.process.inputStream
while (true) {
ensureActive()
val c = stream.read()
@@ -112,7 +110,7 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
suspend fun copyDataFrom(stream: InputStream) {
val process = wrapped!!
val process = mediator!!.process
withContext(Dispatchers.IO) {
stream.copyToAsync(process.outputStream)
ensureActive()
@@ -120,17 +118,22 @@ abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : I
}
}
fun destroyForcibly() {
wrapped!!.destroyForcibly()
@OptIn(InternalCoroutinesApi::class)
suspend fun destroyForciblyAndGetError(): Throwable {
mediator!!.process.destroyForcibly()
try {
val job = mediator!!.ijentProcessScope.coroutineContext.job
job.join()
throw job.getCancellationException()
}
catch (err: Throwable) {
return IjentUnavailableException.unwrapFromCancellationExceptions(err)
}
}
@OptIn(DelicateCoroutinesApi::class)
suspend fun readWholeErrorStream(): ByteArray =
computeDetached { wrapped!!.errorStream.readAllBytes() }
fun extractProcess(): Process {
val result = wrapped!!
wrapped = null
fun extractProcess(): IjentSessionMediator {
val result = mediator!!
mediator = null
return result
}
}
@@ -140,24 +143,26 @@ private suspend fun <T : Any> DeployingContextAndShell.execCommand(block: suspen
return try {
block()
}
catch (err: Throwable) {
runCatching { process.destroyForcibly() }.exceptionOrNull()?.let(err::addSuppressed)
catch (initialErrorFromStack: Throwable) {
val errorFromScope = process.destroyForciblyAndGetError()
val errorFromStack = IjentUnavailableException.unwrapFromCancellationExceptions(initialErrorFromStack)
val attachment = Attachment("stderr", String(process.readWholeErrorStream()))
attachment.isIncluded = attachment.isIncluded or ApplicationManager.getApplication().isInternal
// It happens sometimes that some valuable error is wrapped into CancellationException.
// However, if there's no valuable error,
// then it's a fair CancellationException that should be thrown futher, to cancel the context.
val (mainError, suppressedError) =
when (errorFromStack) {
is IjentUnavailableException, is CancellationException -> errorFromStack to errorFromScope
else -> errorFromScope to errorFromStack
}
val errorWithAttachments = RuntimeExceptionWithAttachments(err.message ?: "", err, attachment)
if (mainError != suppressedError) {
mainError.addSuppressed(suppressedError)
}
// TODO Suppress RuntimeExceptionWithAttachments instead of wrapping when KT-66006 is resolved.
//err.addSuppressed(RuntimeExceptionWithAttachments(
// "The error happened during handling $process",
// Attachment("stderr", stderr.toString()).apply { isIncluded = isIncluded or ApplicationManager.getApplication().isInternal },
//))
//throw err
throw when (err) {
is TimeoutCancellationException, is IOException -> IjentStartupError.CommunicationError(errorWithAttachments)
else -> errorWithAttachments
throw when (mainError) {
is IjentUnavailableException, is CancellationException -> mainError
else -> IjentStartupError.CommunicationError(mainError)
}
}
}
@@ -348,7 +353,7 @@ private suspend fun DeployingContextAndShell.uploadIjentBinary(
return process.readLineWithoutBuffering()
}
private suspend fun DeployingContextAndShell.execIjent(remotePathToBinary: String): Process {
private suspend fun DeployingContextAndShell.execIjent(remotePathToBinary: String): IjentSessionMediator {
val joinedCmd = getIjentGrpcArgv(remotePathToBinary, selfDeleteOnExit = true, usrBinEnv = context.env).joinToString(" ")
val commandLineArgs = context.run {
"""

View File

@@ -48,7 +48,7 @@ interface IjentDeployingStrategy {
* @param binaryPath path to ijent binary on target environment
* @return process that will be used for communication
*/
suspend fun createProcess(binaryPath: String): Process
suspend fun createProcess(binaryPath: String): IjentSessionMediator
/**
* Copy files to the target environment. Typically used to transfer the ijent binary to the target machine.

View File

@@ -4,9 +4,9 @@ package com.intellij.platform.ijent.spi
import com.intellij.openapi.diagnostic.Attachment
import com.intellij.openapi.diagnostic.debug
import com.intellij.openapi.diagnostic.logger
import com.intellij.platform.ijent.IjentApplicationScope
import com.intellij.platform.ijent.IjentId
import com.intellij.openapi.progress.Cancellation
import com.intellij.platform.ijent.IjentUnavailableException
import com.intellij.platform.ijent.coroutineNameAppended
import com.intellij.platform.util.coroutines.childScope
import com.intellij.util.containers.ContainerUtil
import com.intellij.util.io.awaitExit
@@ -22,8 +22,6 @@ import java.time.ZonedDateTime
import java.time.format.DateTimeParseException
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration
@@ -34,6 +32,10 @@ import kotlin.time.toKotlinDuration
*
* [processExit] never throws. When it completes, it either means that the process has finished, or that the whole scope of IJent processes
* is canceled.
*
* [ijentProcessScope] should be used by the [com.intellij.platform.ijent.IjentApi] implementation for launching internal coroutines.
* No matter if IJent exits expectedly or not, an attempt to do anything with [ijentProcessScope] after the IJent has exited
* throws [IjentUnavailableException].
*/
class IjentSessionMediator private constructor(
val ijentProcessScope: CoroutineScope,
@@ -58,40 +60,85 @@ class IjentSessionMediator private constructor(
/**
* See the docs of [IjentSessionMediator].
*
* [ijentId] is used only for logging.
* [ijentLabel] is used only for logging.
*
* Beware that [parentScope] receives [IjentUnavailableException.CommunicationFailure] if IJent _suddenly_ exits, f.i., after SIGKILL.
* Nothing happens with [parentScope] if IJent exits expectedly, f.i., after [com.intellij.platform.ijent.IjentApi.close].
*/
@OptIn(DelicateCoroutinesApi::class)
fun create(process: Process, ijentId: IjentId): IjentSessionMediator {
fun create(parentScope: CoroutineScope, process: Process, ijentLabel: String): IjentSessionMediator {
// TODO What about https://youtrack.jetbrains.com/issue/IJPL-156891 ?
require(parentScope.coroutineContext[Job] != null) {
"Scope $parentScope has no Job"
}
val context = IjentThreadPool.asCoroutineDispatcher()
val ijentProcessScope = run {
// Prevents from logging the error by the default exception handler.
// Errors are logged explicitly in this function.
val dummyExceptionHandler = CoroutineExceptionHandler { _, err -> /* nothing */ }
// This supervisor scope exists only to prevent automatic propagation of IjentUnavailableException to the parent scope.
// Instead, there's a logic below that decides if a specific IjentUnavailableException should be propagated to the parent scope.
val trickySupervisorScope = parentScope.childScope(ijentLabel, context + dummyExceptionHandler, supervisor = true)
val ijentProcessScope = trickySupervisorScope.childScope(ijentLabel, supervisor = false)
ijentProcessScope.coroutineContext.job.invokeOnCompletion { err ->
trickySupervisorScope.cancel()
if (err != null) {
val propagateToParentScope = when (err) {
is IjentUnavailableException -> when (err) {
is IjentUnavailableException.ClosedByApplication -> false
is IjentUnavailableException.CommunicationFailure -> !err.exitedExpectedly
}
else -> true
}
if (propagateToParentScope) {
try {
err.addSuppressed(Throwable("Rethrown from here"))
parentScope.launch(start = CoroutineStart.UNDISPATCHED) {
throw err
}
}
catch (_: Throwable) {
// It seems that the scope has already been canceled with something else.
}
}
// TODO Callers should be able to define their own exception handlers.
logIjentError(ijentLabel, err)
}
}
ijentProcessScope
}
val lastStderrMessages = MutableSharedFlow<String?>(
replay = 30,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
val exceptionHandler = IjentSessionCoroutineExceptionHandler(ijentId)
val connectionScope = IjentApplicationScope.instance().childScope(
"ijent $ijentId > connection scope",
supervisor = false,
context = exceptionHandler + IjentThreadPool.asCoroutineDispatcher(),
)
// stderr logger should outlive the current scope. In case if an error appears, the scope is cancelled immediately, but the whole
// intention of the stderr logger is to write logs of the remote process, which come from the remote machine to the local one with
// a delay.
GlobalScope.launch(blockingDispatcher + CoroutineName("ijent $ijentId > stderr logger")) {
ijentProcessStderrLogger(process, ijentId, lastStderrMessages)
GlobalScope.launch(blockingDispatcher + ijentProcessScope.coroutineNameAppended("stderr logger")) {
ijentProcessStderrLogger(process, ijentLabel, lastStderrMessages)
}
val processExit = CompletableDeferred<Unit>()
val mediator = IjentSessionMediator(connectionScope, process, processExit)
val mediator = IjentSessionMediator(ijentProcessScope, process, processExit)
val awaiterScope = IjentApplicationScope.instance().launch(CoroutineName("ijent $ijentId > exit awaiter scope") + exceptionHandler) {
ijentProcessExitAwaiter(ijentId, mediator, lastStderrMessages)
val awaiterScope = ijentProcessScope.launch(context = context + ijentProcessScope.coroutineNameAppended("exit awaiter scope")) {
ijentProcessExitAwaiter(ijentLabel, mediator, lastStderrMessages)
}
val finalizerScope = connectionScope.launch(CoroutineName("ijent $ijentId > finalizer scope")) {
ijentProcessFinalizer(ijentId, mediator)
val finalizerScope = ijentProcessScope.launch(context = context + ijentProcessScope.coroutineNameAppended("finalizer scope")) {
ijentProcessFinalizer(ijentLabel, mediator)
}
awaiterScope.invokeOnCompletion { err ->
@@ -104,21 +151,19 @@ class IjentSessionMediator private constructor(
}
}
private class IjentSessionCoroutineExceptionHandler(
private val ijentId: IjentId,
) : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap<Throwable, Boolean>())
private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap<Throwable, Boolean>())
override fun toString(): String = javaClass.simpleName
override fun handleException(context: CoroutineContext, exception: Throwable) {
private fun logIjentError(ijentLabel: String, exception: Throwable) {
// The logger can create new services, and since this function is called inside an already failed coroutine context,
// service creation would be impossible without `executeInNonCancelableSection`.
Cancellation.executeInNonCancelableSection {
when (exception) {
is IjentUnavailableException -> when (exception) {
is IjentUnavailableException.ClosedByApplication -> Unit
is IjentUnavailableException.CommunicationFailure -> {
if (!exception.exitedExpectedly && loggedErrors.add(exception)) {
LOG.error("Exception in connection with IJent $ijentId: ${exception.message}", exception)
LOG.error("Exception in connection with IJent $ijentLabel: ${exception.message}", exception)
}
}
}
@@ -127,27 +172,27 @@ private class IjentSessionCoroutineExceptionHandler(
else -> {
if (loggedErrors.add(exception)) {
LOG.error("Unexpected error during communnication with IJent $ijentId", exception)
LOG.error("Unexpected error during communnication with IJent $ijentLabel", exception)
}
}
}
}
}
private suspend fun ijentProcessStderrLogger(process: Process, ijentId: IjentId, lastStderrMessages: MutableSharedFlow<String?>) {
private suspend fun ijentProcessStderrLogger(process: Process, ijentLabel: String, lastStderrMessages: MutableSharedFlow<String?>) {
try {
process.errorStream.reader().useLines { lines ->
for (line in lines) {
yield()
if (line.isNotEmpty()) {
logIjentStderr(ijentId, line)
logIjentStderr(ijentLabel, line)
lastStderrMessages.emit(line)
}
}
}
}
catch (err: IOException) {
LOG.debug { "$ijentId bootstrap got an error: $err" }
LOG.debug { "$ijentLabel bootstrap got an error: $err" }
}
finally {
lastStderrMessages.emit(null)
@@ -165,13 +210,13 @@ private val ijentLogMessageRegex = Regex(
RegexOption.COMMENTS,
)
private fun logIjentStderr(ijentId: IjentId, line: String) {
private fun logIjentStderr(ijentLabel: String, line: String) {
val hostDateTime = ZonedDateTime.now()
val (rawRemoteDateTime, level, message) =
ijentLogMessageRegex.matchEntire(line)?.destructured
?: run {
LOG.debug { "$ijentId log: $line" }
LOG.debug { "$ijentLabel log: $line" }
return
}
@@ -179,7 +224,7 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
java.time.Duration.between(ZonedDateTime.parse(rawRemoteDateTime), hostDateTime).toKotlinDuration()
}
catch (_: DateTimeParseException) {
LOG.debug { "$ijentId log: $line" }
LOG.debug { "$ijentLabel log: $line" }
return
}
@@ -192,7 +237,7 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
}
logger(buildString {
append(ijentId)
append(ijentLabel)
append(" log: ")
if (dateTimeDiff.absoluteValue > 50.milliseconds) { // The timeout is taken at random.
append(rawRemoteDateTime)
@@ -205,12 +250,12 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
}
private suspend fun ijentProcessExitAwaiter(
ijentId: IjentId,
ijentLabel: String,
mediator: IjentSessionMediator,
lastStderrMessages: MutableSharedFlow<String?>,
): Nothing {
val exitCode = mediator.process.awaitExit()
LOG.debug { "IJent process $ijentId exited with code $exitCode" }
LOG.debug { "IJent process $ijentLabel exited with code $exitCode" }
val isExitExpected = when (mediator.expectedErrorCode) {
IjentSessionMediator.ExpectedErrorCode.NO -> false
@@ -233,7 +278,7 @@ private suspend fun ijentProcessExitAwaiter(
}
}
IjentUnavailableException.CommunicationFailure(
"The process $ijentId suddenly exited with the code $exitCode",
"The process $ijentLabel suddenly exited with the code $exitCode",
Attachment("stderr", stderr.toString()),
)
}
@@ -250,37 +295,41 @@ private suspend fun collectLines(lastStderrMessages: SharedFlow<String?>, stderr
}
@OptIn(DelicateCoroutinesApi::class)
private suspend fun ijentProcessFinalizer(ijentId: IjentId, mediator: IjentSessionMediator): Nothing {
private suspend fun ijentProcessFinalizer(ijentLabel: String, mediator: IjentSessionMediator): Nothing {
try {
awaitCancellation()
}
catch (err: Exception) {
throw when (val cause = generateSequence(err, Throwable::cause).firstOrNull { it !is CancellationException }) {
null -> err
is IjentUnavailableException -> cause
else -> {
LOG.debug(err) { "$ijentId is going to be terminated due to receiving an error" }
IjentUnavailableException.CommunicationFailure("IJent communication terminated due to an error", err)
}
val actualErrors = generateSequence(err, Throwable::cause).filterTo(mutableListOf()) { it !is CancellationException }
val existingIjentUnavailableException = actualErrors.filterIsInstance<IjentUnavailableException>().firstOrNull()
if (existingIjentUnavailableException != null) {
throw existingIjentUnavailableException
}
val cause = actualErrors.firstOrNull() ?: err
val message =
if (cause is CancellationException) "The coroutine scope of $ijentLabel was cancelled"
else "IJent communication terminated due to an error"
throw IjentUnavailableException.ClosedByApplication(message, cause)
}
finally {
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ANY
val process = mediator.process
if (process.isAlive) {
GlobalScope.launch(Dispatchers.IO + CoroutineName("$ijentId destruction")) {
GlobalScope.launch(Dispatchers.IO + CoroutineName("$ijentLabel destruction")) {
try {
process.waitFor(5, TimeUnit.SECONDS) // A random timeout.
}
finally {
if (process.isAlive) {
LOG.warn("The process $ijentId is still alive, it will be killed")
LOG.warn("The process $ijentLabel is still alive, it will be killed")
process.destroy()
}
}
}
GlobalScope.launch(Dispatchers.IO) {
LOG.debug { "Closing stdin of $ijentId" }
LOG.debug { "Closing stdin of $ijentLabel" }
process.outputStream.close()
}
}

View File

@@ -50,22 +50,24 @@ internal class DefaultIjentSessionProvider : IjentSessionProvider {
* [ijentName] is used for debugging utilities like logs and thread names.
*
* The process terminates automatically only when the IDE exits, or if [IjentApi.close] is called explicitly.
* [com.intellij.platform.ijent.bindToScope] may be useful for terminating the IJent process earlier.
*/
suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform, process: Process): IjentApi {
val ijentSessionRegistry = IjentSessionRegistry.instanceAsync()
val ijentId = ijentSessionRegistry.register(ijentName, oneOff = true) { ijentId ->
val mediator = IjentSessionMediator.create(process, ijentId)
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
IjentSessionProvider.instanceAsync().connect(strategy, platform, mediator)
}
return ijentSessionRegistry.get(ijentId)
suspend fun connectToRunningIjent(strategy: IjentConnectionStrategy, platform: EelPlatform, mediator: IjentSessionMediator): IjentApi {
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
return IjentSessionProvider.instanceAsync().connect(strategy, platform, mediator)
}
/** A specialized overload of [connectToRunningIjent] */
suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform.Posix, process: Process): IjentPosixApi =
connectToRunningIjent(ijentName, strategy, platform as EelPlatform, process) as IjentPosixApi
suspend fun connectToRunningIjent(
strategy: IjentConnectionStrategy,
platform: EelPlatform.Posix,
mediator: IjentSessionMediator,
): IjentPosixApi =
connectToRunningIjent(strategy, platform as EelPlatform, mediator) as IjentPosixApi
/** A specialized overload of [connectToRunningIjent] */
suspend fun connectToRunningIjent(ijentName: String, strategy: IjentConnectionStrategy, platform: EelPlatform.Windows, process: Process): IjentWindowsApi =
connectToRunningIjent(ijentName, strategy, platform as EelPlatform, process) as IjentWindowsApi
suspend fun connectToRunningIjent(
strategy: IjentConnectionStrategy,
platform: EelPlatform.Windows,
mediator: IjentSessionMediator,
): IjentWindowsApi =
connectToRunningIjent(strategy, platform as EelPlatform, mediator) as IjentWindowsApi

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="kotlin-stdlib" level="project" />
<orderEntry type="library" name="kotlinx-coroutines-core" level="project" />
<orderEntry type="module" module-name="intellij.platform.core" />
<orderEntry type="module" module-name="intellij.platform.ide.impl" />
<orderEntry type="module" module-name="intellij.platform.testFramework" />
<orderEntry type="module" module-name="intellij.platform.util.coroutines" />
</component>
</module>

View File

@@ -0,0 +1,51 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:JvmName("WslIjentTestUtil")
package com.intellij.ijent.testFramework.wsl
import com.intellij.execution.wsl.ProductionWslIjentManager
import com.intellij.execution.wsl.WslIjentManager
import com.intellij.execution.wsl.ijent.nio.toggle.IjentWslNioFsToggler
import com.intellij.openapi.Disposable
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.util.Disposer
import com.intellij.platform.util.coroutines.childScope
import com.intellij.testFramework.replaceService
import kotlinx.coroutines.*
import java.util.concurrent.CancellationException
fun replaceProductionWslIjentManager(disposable: Disposable) {
replaceService(WslIjentManager::class.java, ::ProductionWslIjentManager, disposable)
}
fun replaceProductionWslIjentManager(newServiceScope: CoroutineScope) {
replaceService(WslIjentManager::class.java, ::ProductionWslIjentManager, newServiceScope)
}
fun replaceIjentWslNioFsToggler(newServiceScope: CoroutineScope) {
replaceService(IjentWslNioFsToggler::class.java, ::IjentWslNioFsToggler, newServiceScope)
}
private fun <T : Any> replaceService(iface: Class<T>, constructor: (CoroutineScope) -> T, newServiceScope: CoroutineScope) {
val disposable = Disposer.newDisposable(newServiceScope.toString())
newServiceScope.coroutineContext.job.invokeOnCompletion {
Disposer.dispose(disposable)
}
ApplicationManager.getApplication().replaceService(
iface,
constructor(newServiceScope),
disposable,
)
}
@OptIn(DelicateCoroutinesApi::class)
private fun <T : Any> replaceService(iface: Class<T>, constructor: (CoroutineScope) -> T, disposable: Disposable) {
val newServiceScope = GlobalScope.childScope("Disposable $disposable", supervisor = true)
Disposer.register(disposable) {
newServiceScope.cancel(CancellationException("Disposed $disposable"))
}
ApplicationManager.getApplication().replaceService(
iface,
constructor(newServiceScope),
disposable,
)
}

View File

@@ -6,7 +6,6 @@ import com.intellij.openapi.project.Project
import com.intellij.platform.ijent.IjentId
import com.intellij.platform.ijent.IjentPosixApi
import com.intellij.platform.ijent.IjentSessionRegistry
import com.intellij.platform.ijent.bindToScope
import com.intellij.platform.ijent.spi.IjentThreadPool
import com.intellij.platform.util.coroutines.childScope
import kotlinx.coroutines.CoroutineScope
@@ -37,9 +36,17 @@ class ProductionWslIjentManager(private val scope: CoroutineScope) : WslIjentMan
override suspend fun getIjentApi(wslDistribution: WSLDistribution, project: Project?, rootUser: Boolean): IjentPosixApi {
val ijentSessionRegistry = IjentSessionRegistry.instanceAsync()
val ijentId = myCache.computeIfAbsent("""wsl:${wslDistribution.id}${if (rootUser) ":root" else ""}""") { ijentName ->
val ijentId = ijentSessionRegistry.register(ijentName, oneOff = false) {
val ijent = deployAndLaunchIjent(project, wslDistribution, wslCommandLineOptionsModifier = { it.setSudo(rootUser) })
ijent.bindToScope(scope)
val ijentId = ijentSessionRegistry.register(ijentName) { ijentId ->
val ijent = deployAndLaunchIjent(
scope,
project,
ijentId.toString(),
wslDistribution,
wslCommandLineOptionsModifier = { it.setSudo(rootUser) },
)
scope.coroutineContext.job.invokeOnCompletion {
ijent.close()
}
ijent
}
scope.coroutineContext.job.invokeOnCompletion {
@@ -51,6 +58,12 @@ class ProductionWslIjentManager(private val scope: CoroutineScope) : WslIjentMan
return ijentSessionRegistry.get(ijentId) as IjentPosixApi
}
init {
scope.coroutineContext.job.invokeOnCompletion {
dropCache()
}
}
@VisibleForTesting
fun dropCache() {
val ijentSessionRegistry = serviceIfCreated<IjentSessionRegistry>()

View File

@@ -16,6 +16,7 @@ import java.nio.file.Path
@ApiStatus.Internal
class WslIjentDeployingStrategy(
scope: CoroutineScope,
override val ijentLabel: String,
private val distribution: WSLDistribution,
private val project: Project?,
private val wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {}
@@ -24,7 +25,7 @@ class WslIjentDeployingStrategy(
distribution.getWslPath(path)
@OptIn(IntellijInternalApi::class, DelicateCoroutinesApi::class)
override suspend fun createShellProcess(): ShellProcessWrapper {
override suspend fun createShellProcess(): Process {
// IJent can start an interactive shell by itself whenever it needs.
// Enabling an interactive shell for IJent by default can bring problems, because stdio of IJent must not be populated
// with possible user extensions in ~/.profile
@@ -38,7 +39,7 @@ class WslIjentDeployingStrategy(
val commandLine = WSLDistribution.neverRunTTYFix(GeneralCommandLine("/bin/sh"))
distribution.doPatchCommandLine(commandLine, project, wslCommandLineOptions)
return ShellProcessWrapper(computeDetached { commandLine.createProcess() })
return computeDetached { commandLine.createProcess() }
}
override suspend fun getConnectionStrategy(): IjentConnectionStrategy {

View File

@@ -12,7 +12,6 @@ import com.intellij.platform.ijent.deploy
import com.intellij.platform.ijent.spi.DeployedIjent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.coroutineScope
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.VisibleForTesting
@@ -51,24 +50,28 @@ interface WslIjentManager {
}
}
suspend fun deployAndLaunchIjent(
internal suspend fun deployAndLaunchIjent(
parentScope: CoroutineScope,
project: Project?,
ijentLabel: String,
wslDistribution: WSLDistribution,
wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {},
): IjentPosixApi = deployAndLaunchIjentGettingPath(project, wslDistribution, wslCommandLineOptionsModifier).ijentApi
): IjentPosixApi =
deployAndLaunchIjentGettingPath(parentScope, project, ijentLabel, wslDistribution, wslCommandLineOptionsModifier).ijentApi
@VisibleForTesting
suspend fun deployAndLaunchIjentGettingPath(
parentScope: CoroutineScope,
project: Project?,
ijentLabel: String,
wslDistribution: WSLDistribution,
wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {},
): DeployedIjent.Posix {
return coroutineScope {
WslIjentDeployingStrategy(
scope = this,
distribution = wslDistribution,
project = project,
wslCommandLineOptionsModifier = wslCommandLineOptionsModifier
).deploy("WSL-${wslDistribution.id}")
}
return WslIjentDeployingStrategy(
scope = parentScope,
ijentLabel = ijentLabel,
distribution = wslDistribution,
project = project,
wslCommandLineOptionsModifier = wslCommandLineOptionsModifier
).deploy()
}