[ijent] API refactoring

- introduce IjentDeployingStrategy
- simplify API

GitOrigin-RevId: af6da49e6da67bb5993d6a953d778ef4d9394c28
This commit is contained in:
Andrii Zinchenko
2024-04-05 13:28:19 +02:00
committed by intellij-monorepo-bot
parent ab78d1110d
commit d566339d5e
13 changed files with 632 additions and 494 deletions

View File

@@ -16,12 +16,12 @@ import com.intellij.openapi.ui.Messages
import com.intellij.platform.ide.progress.ModalTaskOwner
import com.intellij.platform.ide.progress.TaskCancellation
import com.intellij.platform.ide.progress.withModalProgress
import com.intellij.platform.ijent.IjentApi
import com.intellij.platform.ijent.IjentExecApi
import com.intellij.platform.ijent.IjentMissingBinary
import com.intellij.platform.ijent.community.impl.nio.asNioFileSystem
import com.intellij.platform.ijent.deploy
import com.intellij.platform.ijent.executeProcess
import com.intellij.platform.util.coroutines.childScope
import com.intellij.platform.ijent.spi.IjentDeployingStrategy
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import org.jetbrains.annotations.ApiStatus.Internal
@@ -56,45 +56,45 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() {
try {
withModalProgress(modalTaskOwner, e.presentation.text, TaskCancellation.cancellable()) {
coroutineScope {
val (ijent, title) = launchIjent(childScope())
coroutineScope {
launch {
val info = ijent.info
withContext(Dispatchers.EDT + ModalityState.any().asContextElement()) {
Messages.showInfoMessage(
"""
Architecture: ${info.architecture}
Remote PID: ${info.remotePid}
Version: ${info.version}
""".trimIndent(),
title
)
val (title, deployingStrategy) = deployingStrategy()
deployingStrategy.deploy("IjentVerificationAction").ijentApi.use { ijent ->
coroutineScope {
launch {
val info = ijent.info
withContext(Dispatchers.EDT + ModalityState.any().asContextElement()) {
Messages.showInfoMessage(
"""
Architecture: ${info.architecture}
Remote PID: ${info.remotePid}
Version: ${info.version}
""".trimIndent(),
title
)
}
}
}
launch {
val process = when (val p = ijent.exec.executeProcess("uname", "-a")) {
is IjentExecApi.ExecuteProcessResult.Failure -> error(p)
is IjentExecApi.ExecuteProcessResult.Success -> p.process
launch {
val process = when (val p = ijent.exec.executeProcess("uname", "-a")) {
is IjentExecApi.ExecuteProcessResult.Failure -> error(p)
is IjentExecApi.ExecuteProcessResult.Success -> p.process
}
val stdout = ByteArrayOutputStream()
process.stdout.consumeEach(stdout::write)
withContext(Dispatchers.EDT + ModalityState.any().asContextElement()) {
Messages.showInfoMessage(stdout.toString(), title)
}
}
val stdout = ByteArrayOutputStream()
process.stdout.consumeEach(stdout::write)
withContext(Dispatchers.EDT + ModalityState.any().asContextElement()) {
Messages.showInfoMessage(stdout.toString(), title)
}
}
launch(Dispatchers.IO) {
val nioFs = ijent.fs.asNioFileSystem()
val path = "/etc"
val isDir = nioFs.getPath(path).isDirectory()
withContext(Dispatchers.EDT + ModalityState.any().asContextElement()) {
Messages.showInfoMessage("$path is directory: $isDir", title)
launch(Dispatchers.IO) {
val nioFs = ijent.fs.asNioFileSystem()
val path = "/etc"
val isDir = nioFs.getPath(path).isDirectory()
withContext(Dispatchers.EDT + ModalityState.any().asContextElement()) {
Messages.showInfoMessage("$path is directory: $isDir", title)
}
}
}
}
coroutineContext.cancelChildren()
}
}
}
@@ -107,7 +107,7 @@ abstract class AbstractIjentVerificationAction : DumbAwareAction() {
}
}
protected abstract suspend fun launchIjent(childScope: CoroutineScope): Pair<IjentApi, String>
protected abstract suspend fun deployingStrategy(): Pair<String, IjentDeployingStrategy>
companion object {
protected val LOG = logger<AbstractIjentVerificationAction>()

View File

@@ -1,8 +1,8 @@
<idea-plugin>
<extensions defaultExtensionNs="com.intellij">
<applicationService
serviceInterface="com.intellij.platform.ijent.IjentSessionProvider"
serviceImplementation="com.intellij.platform.ijent.DefaultIjentSessionProvider"/>
serviceInterface="com.intellij.platform.ijent.spi.IjentSessionProvider"
serviceImplementation="com.intellij.platform.ijent.spi.DefaultIjentSessionProvider"/>
<applicationService
serviceInterface="com.intellij.platform.ijent.IjentExecFileProvider"

View File

@@ -10,7 +10,7 @@ import com.intellij.platform.ijent.fs.IjentFileSystemWindowsApi
* on a local or a remote machine. Every instance corresponds to a single machine, i.e. unlike Run Targets, if IJent is launched
* in a Docker container, every call to execute a process (see [IjentExecApi]) runs a command in the same Docker container.
*
* Usually, [IjentSessionProvider] creates instances of [IjentApi].
* Usually, [com.intellij.platform.ijent.deploy] creates instances of [IjentApi].
*/
sealed interface IjentApi : AutoCloseable {
val id: IjentId

View File

@@ -0,0 +1,67 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
@file:JvmName("IjentDeployer")
package com.intellij.platform.ijent
import com.intellij.platform.ijent.spi.DeployedIjent
import com.intellij.platform.ijent.spi.IjentDeployingStrategy
import com.intellij.platform.ijent.spi.connectToRunningIjent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.job
/**
* Starts IJent on some machine, defined by [com.intellij.platform.ijent.spi.IjentDeployingStrategy].
*
* [ijentName] is a label used for logging, debugging, thread names, etc.
*
* By default, the IJent executable exits when the IDE exits.
* [IjentApi.close] and [com.intellij.platform.ijent.bindToScope] may be used to terminate IJent earlier.
*
* TODO Either define thrown exceptions or return something like Result.
*/
suspend fun IjentDeployingStrategy.deploy(ijentName: String): DeployedIjent {
val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
return object : DeployedIjent {
override val ijentApi: IjentApi = ijentApi
override val remotePathToBinary: String = remotePathToBinary
}
}
/** A specialized version of [com.intellij.platform.ijent.deploy] */
suspend fun IjentDeployingStrategy.Posix.deploy(ijentName: String): DeployedIjent.Posix {
val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
ijentApi as IjentPosixApi
return object : DeployedIjent.Posix {
override val ijentApi: IjentPosixApi = ijentApi
override val remotePathToBinary: String = remotePathToBinary
}
}
/** A specialized version of [com.intellij.platform.ijent.deploy] */
suspend fun IjentDeployingStrategy.Windows.deploy(ijentName: String): DeployedIjent.Windows {
val (remotePathToBinary, ijentApi) = doDeploy(ijentName)
ijentApi as IjentWindowsApi
return object : DeployedIjent.Windows {
override val ijentApi: IjentWindowsApi = ijentApi
override val remotePathToBinary: String = remotePathToBinary
}
}
/** A shortcut for terminating an [IjentApi] when the [coroutineScope] completes. */
fun IjentApi.bindToScope(coroutineScope: CoroutineScope) {
coroutineScope.coroutineContext.job.invokeOnCompletion {
this@bindToScope.close()
}
}
private suspend fun IjentDeployingStrategy.doDeploy(ijentName: String): Pair<String, IjentApi> =
try {
val targetPlatform = getTargetPlatform()
val remotePathToBinary = copyFile(IjentExecFileProvider.getInstance().getIjentBinary(targetPlatform))
val process = createProcess(remotePathToBinary)
val ijentApi = connectToRunningIjent(ijentName, targetPlatform, process)
remotePathToBinary to ijentApi
}
finally {
close()
}

View File

@@ -1,389 +0,0 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent
import com.intellij.execution.CommandLineUtil.posixQuote
import com.intellij.openapi.components.serviceAsync
import com.intellij.openapi.diagnostic.debug
import com.intellij.openapi.diagnostic.logger
import com.intellij.openapi.diagnostic.trace
import com.intellij.util.io.computeDetached
import com.intellij.util.io.copyToAsync
import kotlinx.coroutines.*
import org.jetbrains.annotations.ApiStatus
import java.io.IOException
import java.io.InputStream
import java.nio.file.Path
import kotlin.io.path.fileSize
import kotlin.io.path.inputStream
import kotlin.time.Duration.Companion.seconds
/**
* Given that there is some IJent process launched, this extension gets handles to stdin+stdout of the process and returns
* an [IjentApi] instance for calling procedures on IJent side.
*/
@ApiStatus.Internal
interface IjentSessionProvider {
/**
* Supposed to be used inside [IjentSessionRegistry.register].
*
* [ijentCoroutineScope] must be the scope generated inside [IjentSessionRegistry.register]
*/
suspend fun connect(
ijentId: IjentId,
platform: IjentPlatform,
mediator: IjentSessionMediator
): IjentApi
companion object {
suspend fun instanceAsync(): IjentSessionProvider = serviceAsync()
}
}
sealed class IjentStartupError : RuntimeException {
constructor(message: String) : super(message)
constructor(message: String, cause: Throwable) : super(message, cause)
class MissingImplPlugin : IjentStartupError("The plugin `intellij.platform.ijent.impl` is not installed")
sealed class BootstrapOverShell : IjentStartupError {
constructor(message: String) : super(message)
constructor(message: String, cause: Throwable) : super(message, cause)
}
class IncompatibleTarget(message: String) : BootstrapOverShell(message)
class CommunicationError(cause: Throwable) : BootstrapOverShell(cause.message.orEmpty(), cause)
}
internal class DefaultIjentSessionProvider : IjentSessionProvider {
override suspend fun connect(ijentId: IjentId, platform: IjentPlatform, mediator: IjentSessionMediator): IjentApi {
throw IjentStartupError.MissingImplPlugin()
}
}
/** A shortcut for terminating an [IjentApi] when the [coroutineScope] completes. */
fun IjentApi.bindToScope(coroutineScope: CoroutineScope) {
coroutineScope.coroutineContext.job.invokeOnCompletion {
this@bindToScope.close()
}
}
/**
* Make [IjentApi] from an already running [process].
* [ijentName] is used for debugging utilities like logs and thread names.
*
* The process terminates automatically only when the IDE exits, or if [IjentApi.close] is called explicitly.
* [bindToScope] may be useful for terminating the IJent process earlier.
*/
suspend fun connectToRunningIjent(ijentName: String, platform: IjentPlatform, process: Process): IjentApi =
IjentSessionRegistry.instanceAsync().register(ijentName) { ijentId ->
val mediator = IjentSessionMediator.create(process, ijentId)
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
IjentSessionProvider.instanceAsync().connect(ijentId, platform, mediator)
}
suspend fun connectToRunningIjent(
ijentName: String,
platform: IjentPlatform.Posix,
process: Process,
): IjentPosixApi =
connectToRunningIjent(ijentName, platform as IjentPlatform, process) as IjentPosixApi
suspend fun connectToRunningIjent(
ijentName: String,
platform: IjentPlatform.Windows,
process: Process,
): IjentWindowsApi =
connectToRunningIjent(ijentName, platform as IjentPlatform, process) as IjentWindowsApi
/**
* Interactively requests IJent through a running POSIX-compliant command interpreter: sh, bash, ash, ksh, zsh.
*
* After determination of the remote operating system and architecture, an appropriate IJent binary is uploaded and executed.
* All requests and data transfer with the remote machine is performed through stdin and stdout of [shellProcess].
*
* It is recommended to always use `/bin/sh` for [shellProcess], but any other POSIX-compliant interpreter is accepted too. The shell
* is later changed to the default user's shell before starting IJent, in order that [IjentExecApi.fetchLoginShellEnvVariables] returns
* the variables from the appropriate shell configs.
*
* [shellProcess] must have stdin, stdout and stderr piped.
*
* [shellProcess] must NOT run inside a PTY.
*
* The line delimiter must be '\n'.
*
* The function takes the ownership of [shellProcess]: it invokes `exec(1)` inside the process and terminates [shellProcess]
* in case of problems.
*
* [ijentName] is used for debugging utilities like logs and thread names.
*
* The process terminates automatically only when the IDE exits, or if [IjentApi.close] is called explicitly.
* [bindToScope] may be useful for terminating the IJent process earlier.
*
* [pathMapper] is a workaround function that allows to upload IJent to the remote target explicitly.
* The argument passed to the function is the path to the corresponding IJent binary on the local machine.
* The function must return a path on the remote machine.
* If the function returns null, the binary is transferred to the server directly via the same shell process,
* which turned out to be unreliable unfortunately.
*/
// TODO Change string paths to IjentPath.Absolute.
@Throws(IjentStartupError::class)
suspend fun bootstrapOverShellSession(
ijentName: String,
shellProcess: Process,
pathMapper: suspend (Path) -> String?,
): Pair<String, IjentPosixApi> {
val remoteIjentPath: String
val ijentApi = IjentSessionRegistry.instanceAsync().register(ijentName) { ijentId ->
val mediator = IjentSessionMediator.create(shellProcess, ijentId)
val (path, targetPlatform) =
try {
mediator.attachStderrOnError {
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ANY
doBootstrapOverShellSession(shellProcess, pathMapper)
}
}
catch (err: Throwable) {
runCatching { shellProcess.destroyForcibly() }.exceptionOrNull()?.let(err::addSuppressed)
throw err
}
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
remoteIjentPath = path
try {
IjentSessionProvider.instanceAsync().connect(
ijentId = ijentId,
platform = targetPlatform,
mediator = mediator
)
}
catch (err: Throwable) {
try {
shellProcess.destroy()
}
catch (err2: Throwable) {
err.addSuppressed(err)
}
throw err
}
}
return remoteIjentPath to (ijentApi as IjentPosixApi)
}
@OptIn(DelicateCoroutinesApi::class)
private suspend fun doBootstrapOverShellSession(
shellProcess: Process,
pathMapper: suspend (Path) -> String?,
): Pair<String, IjentPlatform> = computeDetached {
try {
@Suppress("NAME_SHADOWING") val shellProcess = ShellProcess(shellProcess)
// The timeout is taken at random.
withTimeout(10.seconds) {
shellProcess.write("set -ex")
ensureActive()
filterOutBanners(shellProcess)
val commands = getCommandPaths(shellProcess)
with(commands) {
val targetPlatform = getTargetPlatform(shellProcess)
val remotePathToIjent = uploadIjentBinary(shellProcess, targetPlatform, pathMapper)
execIjent(shellProcess, remotePathToIjent)
remotePathToIjent to targetPlatform
}
}
}
catch (err: Throwable) {
throw when (err) {
is TimeoutCancellationException, is IOException -> IjentStartupError.CommunicationError(err)
else -> err
}
}
}
private suspend fun filterOutBanners(shellProcess: ShellProcess) {
// The boundary is for skipping various banners, greeting messages, PS1, etc.
val boundary = (0..31).joinToString("") { "abcdefghijklmnopqrstuvwxyz0123456789".random().toString() }
shellProcess.write("echo $boundary")
do {
val line = shellProcess.readLineWithoutBuffering()
}
while (line != boundary)
}
private class Commands(
val chmod: String,
val cp: String,
val cut: String,
val env: String,
val getent: String,
val head: String,
val mktemp: String,
val uname: String,
val whoami: String,
)
/**
* There are distributions like rancher-desktop-data where /bin/busybox exists, but there are no symlinks to uname, head, etc.
*
* This tricky function checks if the necessary core utils exist and tries to substitute them with busybox otherwise.
*/
private suspend fun getCommandPaths(shellProcess: ShellProcess): Commands {
var busybox: Lazy<String>? = null
// This strange at first glance code helps reduce copy-paste errors.
val commands: Set<String> = setOf(
"busybox",
"chmod",
"cp",
"cut",
"env",
"getent",
"head",
"mktemp",
"uname",
"whoami",
)
val outputOfWhich = mutableListOf<String>()
fun getCommandPath(name: String): String {
assert(name in commands)
return outputOfWhich.firstOrNull { it.endsWith("/$name") }
?: busybox?.value?.let { "$it $name" }
?: throw IjentStartupError.IncompatibleTarget(setOf("busybox", name).joinToString(prefix = "The remote machine has none of: "))
}
val done = "done"
val whichCmd = commands.joinToString(" ").let { joined ->
"set +e; which $joined || /bin/busybox which $joined || /usr/bin/busybox which $joined; echo $done; set -e"
}
shellProcess.write(whichCmd)
while (true) {
val line = shellProcess.readLineWithoutBuffering()
if (line == done) break
outputOfWhich += line
}
busybox = lazy { getCommandPath("busybox") }
return Commands(
chmod = getCommandPath("chmod"),
cp = getCommandPath("cp"),
cut = getCommandPath("cut"),
env = getCommandPath("env"),
getent = getCommandPath("getent"),
head = getCommandPath("head"),
mktemp = getCommandPath("mktemp"),
uname = getCommandPath("uname"),
whoami = getCommandPath("whoami"),
)
}
private suspend fun Commands.getTargetPlatform(shellProcess: ShellProcess): IjentPlatform {
// There are two arguments in `uname` that can show the process architecture: `-m` and `-p`. According to `man uname`, `-p` is more
// verbose, and that information may be sufficient for choosing the right binary.
// https://man.freebsd.org/cgi/man.cgi?query=uname&sektion=1
shellProcess.write("$uname -pm")
val arch = shellProcess.readLineWithoutBuffering().split(" ").filterTo(linkedSetOf(), String::isNotEmpty)
val targetPlatform = when {
arch.isEmpty() -> throw IjentStartupError.IncompatibleTarget("Empty output of `uname`")
"x86_64" in arch -> IjentPlatform.X8664Linux
"aarch64" in arch -> IjentPlatform.Aarch64Linux
else -> throw IjentStartupError.IncompatibleTarget("No binary for architecture $arch")
}
return targetPlatform
}
private suspend fun Commands.uploadIjentBinary(
shellProcess: ShellProcess,
targetPlatform: IjentPlatform,
pathMapper: suspend (Path) -> String?,
): String {
val ijentBinaryOnLocalDisk = IjentExecFileProvider.getInstance().getIjentBinary(targetPlatform)
// TODO Don't upload a new binary every time if the binary is already on the server. However, hashes must be checked.
val ijentBinarySize = ijentBinaryOnLocalDisk.fileSize()
val ijentBinaryPreparedOnTarget = pathMapper(ijentBinaryOnLocalDisk)
val script = run {
val ijentPathUploadScript =
pathMapper(ijentBinaryOnLocalDisk)
?.let { "$cp ${posixQuote(it)} \$BINARY" }
?: run {
"LC_ALL=C $head -c $ijentBinarySize > \$BINARY"
}
"BINARY=\"$($mktemp -d)/ijent\" ; $ijentPathUploadScript ; $chmod 500 \"\$BINARY\" ; echo \"\$BINARY\" "
}
shellProcess.write(script)
if (ijentBinaryPreparedOnTarget == null) {
LOG.debug { "Writing $ijentBinarySize bytes of IJent binary into the stream" }
ijentBinaryOnLocalDisk.inputStream().use { stream ->
shellProcess.copyDataFrom(stream)
}
LOG.debug { "Sent the IJent binary for $targetPlatform" }
}
return shellProcess.readLineWithoutBuffering()
}
private suspend fun Commands.execIjent(shellProcess: ShellProcess, remotePathToBinary: String) {
val joinedCmd = getIjentGrpcArgv(remotePathToBinary, selfDeleteOnExit = true, usrBinEnv = env).joinToString(" ")
val commandLineArgs =
"""
| cd ${posixQuote(remotePathToBinary.substringBeforeLast('/'))};
| export SHELL="${'$'}($getent passwd "${'$'}($whoami)" | $cut -d: -f7)";
| if [ -z "${'$'}SHELL" ]; then export SHELL='/bin/sh' ; fi;
| exec "${'$'}SHELL" -c ${posixQuote(joinedCmd)}
""".trimMargin()
shellProcess.write(commandLineArgs)
}
@JvmInline
private value class ShellProcess(private val process: Process) {
suspend fun write(data: String) {
@Suppress("NAME_SHADOWING")
val data = if (data.endsWith("\n")) data else "$data\n"
LOG.debug { "Executing a script inside the shell: $data" }
withContext(Dispatchers.IO) {
process.outputStream.write(data.toByteArray())
ensureActive()
process.outputStream.flush()
ensureActive()
}
}
/** The same stdin and stdout will be used for transferring binary data. Some buffering wrapper may occasionally consume too much data. */
suspend fun readLineWithoutBuffering(): String =
withContext(Dispatchers.IO) {
val buffer = StringBuilder()
val stream = process.inputStream
while (true) {
ensureActive()
val c = stream.read()
if (c < 0 || c == '\n'.code) {
break
}
buffer.append(c.toChar())
}
LOG.trace { "Read line from stdout: $buffer" }
buffer.toString()
}
suspend fun copyDataFrom(stream: InputStream) {
withContext(Dispatchers.IO) {
stream.copyToAsync(process.outputStream)
ensureActive()
process.outputStream.flush()
}
}
}
private val LOG = logger<IjentSessionProvider>()

View File

@@ -0,0 +1,20 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent.spi
import com.intellij.platform.ijent.IjentApi
import com.intellij.platform.ijent.IjentPosixApi
import com.intellij.platform.ijent.IjentWindowsApi
interface DeployedIjent {
val ijentApi: IjentApi
val remotePathToBinary: String // TODO Use IjentPath.Absolute.
interface Posix : DeployedIjent {
override val ijentApi: IjentPosixApi
}
interface Windows : DeployedIjent {
override val ijentApi: IjentWindowsApi
}
}

View File

@@ -0,0 +1,301 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent.spi
import com.intellij.execution.CommandLineUtil.posixQuote
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.diagnostic.*
import com.intellij.platform.ijent.IjentPlatform
import com.intellij.platform.ijent.getIjentGrpcArgv
import com.intellij.util.io.computeDetached
import com.intellij.util.io.copyToAsync
import kotlinx.coroutines.*
import java.io.IOException
import java.io.InputStream
import java.nio.file.Path
import kotlin.io.path.fileSize
import kotlin.io.path.inputStream
import kotlin.time.Duration.Companion.seconds
abstract class IjentDeployingOverShellProcessStrategy(scope: CoroutineScope) : IjentDeployingStrategy.Posix {
/**
* If there's some bind mount, returns the path for the remote machine/container that corresponds to [path].
* Otherwise, returns null.
*/
protected abstract suspend fun mapPath(path: Path): String?
protected abstract suspend fun createShellProcess(): ShellProcessWrapper
private val myContext = run {
var createdShellProcess: ShellProcessWrapper? = null
val context = scope.async(start = CoroutineStart.LAZY) {
val shellProcess = createShellProcess()
createdShellProcess = shellProcess
createDeployingContext(shellProcess.apply {
// The timeout is taken at random.
withTimeout(10.seconds) {
write("set -ex")
ensureActive()
filterOutBanners()
}
})
}
context.invokeOnCompletion { error ->
if (error != null && error !is CancellationException) {
createdShellProcess?.destroyForcibly()
}
}
context
}
final override suspend fun getTargetPlatform(): IjentPlatform.Posix {
return myContext.await().execCommand {
getTargetPlatform()
}
}
final override suspend fun createProcess(binaryPath: String): Process {
return myContext.await().execCommand {
execIjent(binaryPath)
}
}
final override suspend fun copyFile(file: Path): String {
return myContext.await().execCommand {
uploadIjentBinary(file, ::mapPath)
}
}
final override fun close() {
if (myContext.isActive) {
myContext.cancel(CancellationException("Closed explicitly"))
}
}
class ShellProcessWrapper(private var wrapped: Process?) {
suspend fun write(data: String) {
val wrapped = wrapped!!
@Suppress("NAME_SHADOWING")
val data = if (data.endsWith("\n")) data else "$data\n"
LOG.debug { "Executing a script inside the shell: $data" }
withContext(Dispatchers.IO) {
wrapped.outputStream.write(data.toByteArray())
ensureActive()
wrapped.outputStream.flush()
ensureActive()
}
}
/** The same stdin and stdout will be used for transferring binary data. Some buffering wrapper may occasionally consume too much data. */
suspend fun readLineWithoutBuffering(): String =
withContext(Dispatchers.IO) {
val buffer = StringBuilder()
val stream = wrapped!!.inputStream
while (true) {
ensureActive()
val c = stream.read()
if (c < 0 || c == '\n'.code) {
break
}
buffer.append(c.toChar())
}
LOG.trace { "Read line from stdout: $buffer" }
buffer.toString()
}
suspend fun copyDataFrom(stream: InputStream) {
val wrapped = wrapped!!
withContext(Dispatchers.IO) {
stream.copyToAsync(wrapped.outputStream)
ensureActive()
wrapped.outputStream.flush()
}
}
fun destroyForcibly() {
wrapped!!.destroyForcibly()
}
@OptIn(DelicateCoroutinesApi::class)
suspend fun readWholeErrorStream(): ByteArray =
computeDetached { wrapped!!.errorStream.readAllBytes() }
fun extractProcess(): Process {
val result = wrapped!!
wrapped = null
return result
}
}
}
private suspend fun <T : Any> DeployingContext.execCommand(block: suspend DeployingContext.() -> T): T {
return try {
block()
}
catch (err: Throwable) {
runCatching { process.destroyForcibly() }.exceptionOrNull()?.let(err::addSuppressed)
val attachment = Attachment("stderr", String(process.readWholeErrorStream()))
attachment.isIncluded = attachment.isIncluded or ApplicationManager.getApplication().isInternal
val errorWithAttachments = RuntimeExceptionWithAttachments(err.message ?: "", err, attachment)
// TODO Suppress RuntimeExceptionWithAttachments instead of wrapping when KT-66006 is resolved.
//err.addSuppressed(RuntimeExceptionWithAttachments(
// "The error happened during handling $process",
// Attachment("stderr", stderr.toString()).apply { isIncluded = isIncluded or ApplicationManager.getApplication().isInternal },
//))
//throw err
throw when (err) {
is TimeoutCancellationException, is IOException -> IjentStartupError.CommunicationError(errorWithAttachments)
else -> errorWithAttachments
}
}
}
private suspend fun IjentDeployingOverShellProcessStrategy.ShellProcessWrapper.filterOutBanners() {
// The boundary is for skipping various banners, greeting messages, PS1, etc.
val boundary = (0..31).joinToString("") { "abcdefghijklmnopqrstuvwxyz0123456789".random().toString() }
write("echo $boundary")
do {
val line = readLineWithoutBuffering()
}
while (line != boundary)
}
private class DeployingContext(
val process: IjentDeployingOverShellProcessStrategy.ShellProcessWrapper,
val chmod: String,
val cp: String,
val cut: String,
val env: String,
val getent: String,
val head: String,
val mktemp: String,
val uname: String,
val whoami: String,
)
/**
* There are distributions like rancher-desktop-data where /bin/busybox exists, but there are no symlinks to uname, head, etc.
*
* This tricky function checks if the necessary core utils exist and tries to substitute them with busybox otherwise.
*/
private suspend fun createDeployingContext(shellProcess: IjentDeployingOverShellProcessStrategy.ShellProcessWrapper): DeployingContext {
var busybox: Lazy<String>? = null
// This strange at first glance code helps reduce copy-paste errors.
val commands: Set<String> = setOf(
"busybox",
"chmod",
"cp",
"cut",
"env",
"getent",
"head",
"mktemp",
"uname",
"whoami",
)
val outputOfWhich = mutableListOf<String>()
fun getCommandPath(name: String): String {
assert(name in commands)
return outputOfWhich.firstOrNull { it.endsWith("/$name") }
?: busybox?.value?.let { "$it $name" }
?: throw IjentStartupError.IncompatibleTarget(setOf("busybox", name).joinToString(prefix = "The remote machine has none of: "))
}
val done = "done"
val whichCmd = commands.joinToString(" ").let { joined ->
"set +e; which $joined || /bin/busybox which $joined || /usr/bin/busybox which $joined; echo $done; set -e"
}
shellProcess.write(whichCmd)
while (true) {
val line = shellProcess.readLineWithoutBuffering()
if (line == done) break
outputOfWhich += line
}
busybox = lazy { getCommandPath("busybox") }
return DeployingContext(
process = shellProcess,
chmod = getCommandPath("chmod"),
cp = getCommandPath("cp"),
cut = getCommandPath("cut"),
env = getCommandPath("env"),
getent = getCommandPath("getent"),
head = getCommandPath("head"),
mktemp = getCommandPath("mktemp"),
uname = getCommandPath("uname"),
whoami = getCommandPath("whoami"),
)
}
private suspend fun DeployingContext.getTargetPlatform(): IjentPlatform.Posix {
// There are two arguments in `uname` that can show the process architecture: `-m` and `-p`. According to `man uname`, `-p` is more
// verbose, and that information may be sufficient for choosing the right binary.
// https://man.freebsd.org/cgi/man.cgi?query=uname&sektion=1
process.write("$uname -pm")
val arch = process.readLineWithoutBuffering().split(" ").filterTo(linkedSetOf(), String::isNotEmpty)
val targetPlatform = when {
arch.isEmpty() -> throw IjentStartupError.IncompatibleTarget("Empty output of `uname`")
"x86_64" in arch -> IjentPlatform.X8664Linux
"aarch64" in arch -> IjentPlatform.Aarch64Linux
else -> throw IjentStartupError.IncompatibleTarget("No binary for architecture $arch")
}
return targetPlatform
}
private suspend fun DeployingContext.uploadIjentBinary(
ijentBinaryOnLocalDisk: Path,
pathMapper: suspend (Path) -> String?,
): String {
// TODO Don't upload a new binary every time if the binary is already on the server. However, hashes must be checked.
val ijentBinarySize = ijentBinaryOnLocalDisk.fileSize()
val ijentBinaryPreparedOnTarget = pathMapper(ijentBinaryOnLocalDisk)
val script = run {
val ijentPathUploadScript =
pathMapper(ijentBinaryOnLocalDisk)
?.let { "$cp ${posixQuote(it)} \$BINARY" }
?: run {
"LC_ALL=C $head -c $ijentBinarySize > \$BINARY"
}
"BINARY=\"$($mktemp -d)/ijent\" ; $ijentPathUploadScript ; $chmod 500 \"\$BINARY\" ; echo \"\$BINARY\" "
}
process.write(script)
if (ijentBinaryPreparedOnTarget == null) {
LOG.debug { "Writing $ijentBinarySize bytes of IJent binary into the stream" }
ijentBinaryOnLocalDisk.inputStream().use { stream ->
process.copyDataFrom(stream)
}
LOG.debug { "Sent the IJent binary $ijentBinaryOnLocalDisk" }
}
return process.readLineWithoutBuffering()
}
private suspend fun DeployingContext.execIjent(remotePathToBinary: String): Process {
val joinedCmd = getIjentGrpcArgv(remotePathToBinary, selfDeleteOnExit = true, usrBinEnv = env).joinToString(" ")
val commandLineArgs =
"""
| cd ${posixQuote(remotePathToBinary.substringBeforeLast('/'))};
| export SHELL="${'$'}($getent passwd "${'$'}($whoami)" | $cut -d: -f7)";
| if [ -z "${'$'}SHELL" ]; then export SHELL='/bin/sh' ; fi;
| exec "${'$'}SHELL" -c ${posixQuote(joinedCmd)}
""".trimMargin()
process.write(commandLineArgs)
return process.extractProcess()
}
private val LOG = logger<IjentDeployingOverShellProcessStrategy>()

View File

@@ -0,0 +1,67 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent.spi
import com.intellij.platform.ijent.IjentPlatform
import com.intellij.platform.ijent.deploy
import org.jetbrains.annotations.ApiStatus
import java.nio.file.Path
/**
* Strategy for deploying and launching ijent on target environment which is used to create an IjentApi instance.
*
* Use [deploy] for launching IJent using this interface.
*
* Every instance of [IjentDeployingStrategy] is used to start exactly one IJent.
*
* @see com.intellij.platform.ijent.IjentApi
*/
@ApiStatus.OverrideOnly
interface IjentDeployingStrategy {
/**
* Architecture of the ijent binary that will be copied to the target machine.
* Typically, the ijent architecture should match the target environment architecture.
*
* The implementation doesn't need to cache the result, because the function is called exactly once.
*
* @see com.intellij.platform.ijent.IjentExecFileProvider.getIjentBinary
*/
suspend fun getTargetPlatform(): IjentPlatform
/**
* Should start the ijent process.
*
* This function is called the last and called exactly once.
*
* After it has been called, only [close] may be called.
* Nothing else will be called for the same instance of [IjentDeployingStrategy].
*
* @see com.intellij.platform.ijent.getIjentGrpcArgv
* @param binaryPath path to ijent binary on target environment
* @return process that will be used for communication
*/
suspend fun createProcess(binaryPath: String): Process
/**
* Copy files to the target environment. Typically used to transfer the ijent binary to the target machine.
*
* @param file path to local file that should be copied to target environment.
*/
suspend fun copyFile(file: Path): String
/**
* Clears resources after the usage.
*
* The function should not block the thread.
*/
fun close()
interface Posix : IjentDeployingStrategy {
/** @see [IjentDeployingStrategy.getTargetPlatform] */
override suspend fun getTargetPlatform(): IjentPlatform.Posix
}
interface Windows : IjentDeployingStrategy {
/** @see [IjentDeployingStrategy.getTargetPlatform] */
override suspend fun getTargetPlatform(): IjentPlatform.Windows
}
}

View File

@@ -1,8 +1,9 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent
package com.intellij.platform.ijent.spi
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.diagnostic.*
import com.intellij.platform.ijent.IjentApplicationScope
import com.intellij.platform.ijent.IjentId
import com.intellij.platform.util.coroutines.childScope
import com.intellij.util.io.awaitExit
import com.intellij.util.io.blockingDispatcher
@@ -12,7 +13,6 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.takeWhile
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.VisibleForTesting
import java.io.IOException
import java.util.concurrent.TimeUnit
@@ -22,8 +22,7 @@ import kotlin.time.Duration.Companion.seconds
* A wrapper for a [Process] that runs IJent. The wrapper logs stderr lines, waits for the exit code, terminates the process in case
* of problems in the IDE.
*/
@ApiStatus.Internal
class IjentSessionMediator private constructor(val scope: CoroutineScope, val process: Process, private val lastStderrMessages: SharedFlow<String?>) {
class IjentSessionMediator private constructor(val scope: CoroutineScope, val process: Process) {
enum class ExpectedErrorCode {
/** During initialization, even a sudden successful exit is an error. */
NO,
@@ -38,45 +37,6 @@ class IjentSessionMediator private constructor(val scope: CoroutineScope, val pr
@Volatile
var expectedErrorCode = ExpectedErrorCode.NO
/**
* If an error happens, it is rethrown, but also the function waits for [lastStderrMessagesTimeout] and attaches all stderr lines
* received since the start of execution of [body].
*/
suspend fun <T> attachStderrOnError(body: suspend () -> T): T =
coroutineScope {
val stderr = StringBuilder()
val collector = launch(CoroutineName("attachStderrOnError")) {
collectLines(lastStderrMessages, stderr)
}
try {
val result = body()
collector.cancel()
result
}
catch (err: Throwable) {
runCatching {
withTimeoutOrNull(lastStderrMessagesTimeout) {
collector.join()
}
}.exceptionOrNull()?.let(err::addSuppressed)
collector.cancel()
// TODO Suppress RuntimeExceptionWithAttachments instead of wrapping when KT-66006 is resolved.
//err.addSuppressed(RuntimeExceptionWithAttachments(
// "The error happened during handling $process",
// Attachment("stderr", stderr.toString()).apply { isIncluded = isIncluded or ApplicationManager.getApplication().isInternal },
//))
//throw err
throw RuntimeExceptionWithAttachments(
err.message ?: "",
err,
Attachment("stderr", stderr.toString()).apply { isIncluded = isIncluded or ApplicationManager.getApplication().isInternal },
)
}
}
companion object {
/** See the docs of [IjentSessionMediator] */
@OptIn(DelicateCoroutinesApi::class)
@@ -96,7 +56,7 @@ class IjentSessionMediator private constructor(val scope: CoroutineScope, val pr
ijentProcessStderrLogger(process, ijentId, lastStderrMessages)
}
val mediator = IjentSessionMediator(connectionScope, process, lastStderrMessages)
val mediator = IjentSessionMediator(connectionScope, process)
val awaiterScope = IjentApplicationScope.instance().launch(CoroutineName("ijent $ijentId > exit awaiter scope")) {
ijentProcessExitAwaiter(ijentId, mediator, lastStderrMessages)

View File

@@ -0,0 +1,72 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.platform.ijent.spi
import com.intellij.openapi.components.serviceAsync
import com.intellij.platform.ijent.IjentApi
import com.intellij.platform.ijent.IjentId
import com.intellij.platform.ijent.IjentPlatform
import com.intellij.platform.ijent.IjentPosixApi
import com.intellij.platform.ijent.IjentSessionRegistry
import com.intellij.platform.ijent.IjentWindowsApi
/**
* Given that there is some IJent process launched, this extension gets handles to stdin+stdout of the process and returns
* an [IjentApi] instance for calling procedures on IJent side.
*/
interface IjentSessionProvider {
/**
* Supposed to be used inside [IjentSessionRegistry.register].
*/
suspend fun connect(
ijentId: IjentId,
platform: IjentPlatform,
mediator: IjentSessionMediator
): IjentApi
companion object {
suspend fun instanceAsync(): IjentSessionProvider = serviceAsync()
}
}
sealed class IjentStartupError : RuntimeException {
constructor(message: String) : super(message)
constructor(message: String, cause: Throwable) : super(message, cause)
class MissingImplPlugin : IjentStartupError("The plugin `intellij.platform.ijent.impl` is not installed")
sealed class BootstrapOverShell : IjentStartupError {
constructor(message: String) : super(message)
constructor(message: String, cause: Throwable) : super(message, cause)
}
class IncompatibleTarget(message: String) : BootstrapOverShell(message)
class CommunicationError(cause: Throwable) : BootstrapOverShell(cause.message.orEmpty(), cause)
}
internal class DefaultIjentSessionProvider : IjentSessionProvider {
override suspend fun connect(ijentId: IjentId, platform: IjentPlatform, mediator: IjentSessionMediator): IjentApi {
throw IjentStartupError.MissingImplPlugin()
}
}
/**
* Make [IjentApi] from an already running [process].
* [ijentName] is used for debugging utilities like logs and thread names.
*
* The process terminates automatically only when the IDE exits, or if [IjentApi.close] is called explicitly.
* [com.intellij.platform.ijent.bindToScope] may be useful for terminating the IJent process earlier.
*/
suspend fun connectToRunningIjent(ijentName: String, platform: IjentPlatform, process: Process): IjentApi =
IjentSessionRegistry.instanceAsync().register(ijentName) { ijentId ->
val mediator = IjentSessionMediator.create(process, ijentId)
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ZERO
IjentSessionProvider.instanceAsync().connect(ijentId, platform, mediator)
}
/** A specialized overload of [connectToRunningIjent] */
suspend fun connectToRunningIjent(ijentName: String, platform: IjentPlatform.Posix, process: Process): IjentPosixApi =
connectToRunningIjent(ijentName, platform as IjentPlatform, process) as IjentPosixApi
/** A specialized overload of [connectToRunningIjent] */
suspend fun connectToRunningIjent(ijentName: String, platform: IjentPlatform.Windows, process: Process): IjentWindowsApi =
connectToRunningIjent(ijentName, platform as IjentPlatform, process) as IjentWindowsApi

View File

@@ -0,0 +1,8 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
/**
* This API is going to be released later, but now it's too unstable even for being called as Experimental.
*/
@ApiStatus.Internal
package com.intellij.platform.ijent.spi;
import org.jetbrains.annotations.ApiStatus;

View File

@@ -0,0 +1,41 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.execution.wsl
import com.intellij.execution.configurations.GeneralCommandLine
import com.intellij.openapi.project.Project
import com.intellij.openapi.util.IntellijInternalApi
import com.intellij.platform.ijent.spi.IjentDeployingOverShellProcessStrategy
import com.intellij.util.io.computeDetached
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import org.jetbrains.annotations.ApiStatus
import java.nio.file.Path
@ApiStatus.Internal
class WslIjentDeployingStrategy(
scope: CoroutineScope,
private val distribution: WSLDistribution,
private val project: Project?,
private val wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {}
) : IjentDeployingOverShellProcessStrategy(scope) {
override suspend fun mapPath(path: Path): String? =
distribution.getWslPath(path)
@OptIn(IntellijInternalApi::class, DelicateCoroutinesApi::class)
override suspend fun createShellProcess(): ShellProcessWrapper {
// IJent can start an interactive shell by itself whenever it needs.
// Enabling an interactive shell for IJent by default can bring problems, because stdio of IJent must not be populated
// with possible user extensions in ~/.profile
val wslCommandLineOptions = WSLCommandLineOptions()
.setExecuteCommandInInteractiveShell(false)
.setExecuteCommandInLoginShell(false)
.setExecuteCommandInShell(false)
wslCommandLineOptionsModifier(wslCommandLineOptions)
val commandLine = WSLDistribution.neverRunTTYFix(GeneralCommandLine("/bin/sh"))
distribution.doPatchCommandLine(commandLine, project, wslCommandLineOptions)
return ShellProcessWrapper(computeDetached { commandLine.createProcess() })
}
}

View File

@@ -1,16 +1,15 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.execution.wsl
import com.intellij.execution.configurations.GeneralCommandLine
import com.intellij.openapi.components.service
import com.intellij.openapi.project.Project
import com.intellij.openapi.util.IntellijInternalApi
import com.intellij.platform.ijent.IjentApi
import com.intellij.platform.ijent.IjentPosixApi
import com.intellij.platform.ijent.bootstrapOverShellSession
import com.intellij.util.io.computeDetached
import com.intellij.platform.ijent.deploy
import com.intellij.platform.ijent.spi.DeployedIjent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.coroutineScope
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.VisibleForTesting
@@ -50,28 +49,20 @@ suspend fun deployAndLaunchIjent(
project: Project?,
wslDistribution: WSLDistribution,
wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {},
): IjentPosixApi = deployAndLaunchIjentGettingPath(project, wslDistribution, wslCommandLineOptionsModifier).second
): IjentPosixApi = deployAndLaunchIjentGettingPath(project, wslDistribution, wslCommandLineOptionsModifier).ijentApi
@OptIn(IntellijInternalApi::class, DelicateCoroutinesApi::class)
@VisibleForTesting
suspend fun deployAndLaunchIjentGettingPath(
project: Project?,
wslDistribution: WSLDistribution,
wslCommandLineOptionsModifier: (WSLCommandLineOptions) -> Unit = {},
): Pair<String, IjentPosixApi> {
// IJent can start an interactive shell by itself whenever it needs.
// Enabling an interactive shell for IJent by default can bring problems, because stdio of IJent must not be populated
// with possible user extensions in ~/.profile
val wslCommandLineOptions = WSLCommandLineOptions()
.setExecuteCommandInInteractiveShell(false)
.setExecuteCommandInLoginShell(false)
.setExecuteCommandInShell(false)
wslCommandLineOptionsModifier(wslCommandLineOptions)
val commandLine = WSLDistribution.neverRunTTYFix(GeneralCommandLine("/bin/sh"))
wslDistribution.doPatchCommandLine(commandLine, project, wslCommandLineOptions)
val process = computeDetached { commandLine.createProcess() }
return bootstrapOverShellSession("WSL-${wslDistribution.id}", process, wslDistribution::getWslPath)
): DeployedIjent.Posix {
return coroutineScope {
WslIjentDeployingStrategy(
scope = this,
distribution = wslDistribution,
project = project,
wslCommandLineOptionsModifier = wslCommandLineOptionsModifier
).deploy("WSL-${wslDistribution.id}")
}
}