mirror of
https://gitflic.ru/project/openide/openide.git
synced 2026-04-30 10:20:15 +07:00
IJPL-158860 IJent: introduce IjentUnavailableException
This error is thrown from every suspending method of Ijent*Api if the IJent process is dead or being terminated. GitOrigin-RevId: 5cd3664c59fa7ea94ff508165c1eb7a6ad186ac2
This commit is contained in:
committed by
intellij-monorepo-bot
parent
934bcde1bd
commit
a041c80947
@@ -36,9 +36,17 @@ sealed interface IjentApi : AutoCloseable {
|
||||
* Explicitly terminates the process on the remote machine.
|
||||
*
|
||||
* The method is not supposed to block the current thread.
|
||||
*
|
||||
* For awaiting, use [waitUntilExit].
|
||||
*/
|
||||
override fun close()
|
||||
|
||||
/**
|
||||
* Suspends until the IJent process on the remote side terminates.
|
||||
* This method doesn't throw exceptions.
|
||||
*/
|
||||
suspend fun waitUntilExit()
|
||||
|
||||
/** Docs: [IjentExecApi] */
|
||||
val exec: IjentExecApi
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ interface IjentChildProcess {
|
||||
*
|
||||
* Notice that every data chunk is flushed into the process separately. There's no buffering.
|
||||
*/
|
||||
@Throws(SendStdinError::class)
|
||||
@Throws(SendStdinError::class, IjentUnavailableException::class)
|
||||
suspend fun sendStdinWithConfirmation(data: ByteArray)
|
||||
|
||||
sealed class SendStdinError(msg: String) : Exception(msg) {
|
||||
@@ -51,6 +51,7 @@ interface IjentChildProcess {
|
||||
*
|
||||
* Does nothing yet on Windows.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun interrupt()
|
||||
|
||||
/**
|
||||
@@ -58,6 +59,7 @@ interface IjentChildProcess {
|
||||
*
|
||||
* Calls [`ExitProcess`](https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-exitprocess) on Windows.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun terminate()
|
||||
|
||||
/**
|
||||
@@ -66,9 +68,10 @@ interface IjentChildProcess {
|
||||
* Calls [`TerminateProcess`](https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-terminateprocess)
|
||||
* on Windows.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun kill()
|
||||
|
||||
@Throws(ResizePtyError::class) // Can't use @CheckReturnValue: KTIJ-7061
|
||||
@Throws(ResizePtyError::class, IjentUnavailableException::class) // Can't use @CheckReturnValue: KTIJ-7061
|
||||
suspend fun resizePty(columns: Int, rows: Int)
|
||||
|
||||
sealed class ResizePtyError(msg: String) : Exception(msg) {
|
||||
|
||||
@@ -41,6 +41,7 @@ interface IjentExecApi {
|
||||
/**
|
||||
* Gets the same environment variables on the remote machine as the user would get if they run the shell.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun fetchLoginShellEnvVariables(): Map<String, String>
|
||||
|
||||
sealed interface ExecuteProcessResult {
|
||||
@@ -53,6 +54,7 @@ interface IjentExecApi {
|
||||
}
|
||||
|
||||
/** Docs: [IjentExecApi.executeProcessBuilder] */
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun IjentExecApi.executeProcess(exe: String, vararg args: String): IjentExecApi.ExecuteProcessResult =
|
||||
executeProcessBuilder(exe).args(listOf(*args)).execute()
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ sealed interface IjentTunnelsApi {
|
||||
*
|
||||
* One should not forget to invoke [Connection.close] when the connection is not needed.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun getConnectionToRemotePort(address: HostAddress): IjentNetworkResult<Connection, IjentConnectionError>
|
||||
|
||||
/**
|
||||
@@ -120,41 +121,48 @@ sealed interface IjentTunnelsApi {
|
||||
* Sets the size of send buffer of the socket
|
||||
* @see java.net.SocketOptions.SO_SNDBUF
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun setSendBufferSize(size: UInt)
|
||||
|
||||
/**
|
||||
* Sets the receive buffer size of the socket
|
||||
* @see java.net.SocketOptions.SO_RCVBUF
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun setReceiveBufferSize(size: UInt)
|
||||
|
||||
/**
|
||||
* Sets the keep alive option for the socket
|
||||
* @see java.net.SocketOptions.SO_KEEPALIVE
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun setKeepAlive(keepAlive: Boolean)
|
||||
|
||||
/**
|
||||
* Sets the possibility to reuse address of the socket
|
||||
* @see java.net.SocketOptions.SO_REUSEADDR
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun setReuseAddr(reuseAddr: Boolean)
|
||||
|
||||
/**
|
||||
* Sets linger timeout for the socket
|
||||
* @see java.net.SocketOptions.SO_LINGER
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun setLinger(lingerInterval: Duration)
|
||||
|
||||
/**
|
||||
* Disables pending data until acknowledgement
|
||||
* @see java.net.SocketOptions.TCP_NODELAY
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun setNoDelay(noDelay: Boolean)
|
||||
|
||||
/**
|
||||
* Closes the connection to the socket.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun close()
|
||||
}
|
||||
|
||||
@@ -204,6 +212,7 @@ interface IjentTunnelsPosixApi : IjentTunnelsApi {
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun listenOnUnixSocket(path: CreateFilePath = CreateFilePath.MkTemp()): ListenOnUnixSocketResult
|
||||
|
||||
data class ListenOnUnixSocketResult(
|
||||
@@ -243,6 +252,7 @@ interface IjentTunnelsWindowsApi : IjentTunnelsApi
|
||||
*
|
||||
* @see com.intellij.platform.ijent.IjentTunnelsApi.getConnectionToRemotePort for more details on the behavior of [Connection]
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun <T> IjentTunnelsApi.withConnectionToRemotePort(
|
||||
hostAddress: IjentTunnelsApi.HostAddress,
|
||||
errorHandler: suspend (IjentConnectionError) -> T,
|
||||
@@ -258,12 +268,14 @@ suspend fun <T> IjentTunnelsApi.withConnectionToRemotePort(
|
||||
}
|
||||
}
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun <T> IjentTunnelsApi.withConnectionToRemotePort(
|
||||
host: String, port: UShort,
|
||||
errorHandler: suspend (IjentConnectionError) -> T,
|
||||
action: suspend CoroutineScope.(Connection) -> T,
|
||||
): T = withConnectionToRemotePort(hostAddressBuilder(port).hostname(host).build(), errorHandler, action)
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun <T> IjentTunnelsApi.withConnectionToRemotePort(
|
||||
remotePort: UShort,
|
||||
errorHandler: suspend (IjentConnectionError) -> T,
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
|
||||
package com.intellij.platform.ijent
|
||||
|
||||
import com.intellij.openapi.diagnostic.Attachment
|
||||
import com.intellij.openapi.diagnostic.ExceptionWithAttachments
|
||||
import org.jetbrains.annotations.ApiStatus.Internal
|
||||
import java.io.IOException
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
|
||||
/**
|
||||
* This error declares that communication with a specific IJent is impossible anymore.
|
||||
* To keep working with a remote machine, a new IJent should be launched.
|
||||
*/
|
||||
sealed class IjentUnavailableException : IOException, ExceptionWithAttachments {
|
||||
private val attachments: Array<out Attachment>
|
||||
|
||||
constructor(message: String) : super(message) {
|
||||
attachments = emptyArray()
|
||||
}
|
||||
|
||||
constructor(message: String, cause: Throwable) : super(message, cause) {
|
||||
attachments = emptyArray()
|
||||
}
|
||||
|
||||
constructor(message: String, vararg attachments: Attachment) : super(message) {
|
||||
this.attachments = attachments
|
||||
}
|
||||
|
||||
constructor(message: String, cause: Throwable, vararg attachments: Attachment) : super(message, cause) {
|
||||
this.attachments = attachments
|
||||
}
|
||||
|
||||
class ClosedByApplication : IjentUnavailableException {
|
||||
constructor(message: String) : super(message)
|
||||
constructor(message: String, cause: Throwable) : super(message, cause)
|
||||
}
|
||||
|
||||
class CommunicationFailure : IjentUnavailableException {
|
||||
constructor(message: String) : super(message)
|
||||
constructor(message: String, cause: Throwable) : super(message, cause)
|
||||
constructor(message: String, vararg attachments: Attachment) : super(message, *attachments)
|
||||
constructor(message: String, cause: Throwable, vararg attachments: Attachment) : super(message, cause, *attachments)
|
||||
|
||||
var exitedExpectedly: Boolean = false
|
||||
internal set
|
||||
}
|
||||
|
||||
override fun getAttachments(): Array<out Attachment> = attachments
|
||||
|
||||
companion object {
|
||||
@Internal
|
||||
@JvmStatic
|
||||
inline fun <T> unwrapFromCancellationExceptions(body: () -> T): T =
|
||||
try {
|
||||
body()
|
||||
}
|
||||
catch (initialError: Throwable) {
|
||||
throw unwrapFromCancellationExceptions(initialError)
|
||||
}
|
||||
|
||||
@Internal
|
||||
@JvmStatic
|
||||
fun unwrapFromCancellationExceptions(initialError: Throwable): Throwable {
|
||||
var err: Throwable? = initialError
|
||||
while (true) {
|
||||
when (err) {
|
||||
is CancellationException -> err = err.cause
|
||||
|
||||
is IjentUnavailableException -> return err
|
||||
|
||||
else -> break
|
||||
}
|
||||
}
|
||||
return initialError
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,7 @@
|
||||
package com.intellij.platform.ijent.fs
|
||||
|
||||
import com.intellij.openapi.util.NlsSafe
|
||||
import com.intellij.platform.ijent.IjentId
|
||||
import com.intellij.platform.ijent.IjentInfo
|
||||
import com.intellij.platform.ijent.IjentPosixInfo
|
||||
import com.intellij.platform.ijent.IjentWindowsInfo
|
||||
import com.intellij.platform.ijent.*
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
@@ -36,11 +33,13 @@ sealed interface IjentFileSystemApi {
|
||||
/**
|
||||
* A user may have no home directory on Unix-like systems, for example, the user `nobody`.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun userHome(): IjentPath.Absolute?
|
||||
|
||||
/**
|
||||
* Returns names of files in a directory. If [path] is a symlink, it will be resolved, but no symlinks are resolved among children.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun listDirectory(path: IjentPath.Absolute): IjentFsResult<
|
||||
Collection<String>,
|
||||
ListDirectoryError>
|
||||
@@ -53,6 +52,7 @@ sealed interface IjentFileSystemApi {
|
||||
* [resolveSymlinks] controls resolution of symlinks among children.
|
||||
* TODO The behaviour is different from resolveSymlinks in [stat]. To be fixed.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun listDirectoryWithAttrs(
|
||||
path: IjentPath.Absolute,
|
||||
resolveSymlinks: Boolean = true,
|
||||
@@ -71,6 +71,7 @@ sealed interface IjentFileSystemApi {
|
||||
/**
|
||||
* Resolves all symlinks in the path. Corresponds to realpath(3) on Unix and GetFinalPathNameByHandle on Windows.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun canonicalize(path: IjentPath.Absolute): IjentFsResult<
|
||||
IjentPath.Absolute,
|
||||
CanonicalizeError>
|
||||
@@ -86,6 +87,7 @@ sealed interface IjentFileSystemApi {
|
||||
/**
|
||||
* Similar to stat(2) and lstat(2). [resolveSymlinks] has an impact only on [IjentFileInfo.fileType] if [path] points on a symlink.
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun stat(path: IjentPath.Absolute, resolveSymlinks: Boolean): IjentFsResult<out IjentFileInfo, StatError>
|
||||
|
||||
sealed interface StatError : IjentFsError {
|
||||
@@ -100,6 +102,7 @@ sealed interface IjentFileSystemApi {
|
||||
* on Unix return true if both paths have the same inode.
|
||||
* On Windows some heuristics are used, for more details see https://docs.rs/same-file/1.0.6/same_file/
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun sameFile(source: IjentPath.Absolute, target: IjentPath.Absolute): IjentFsResult<
|
||||
Boolean,
|
||||
SameFileError>
|
||||
@@ -115,6 +118,7 @@ sealed interface IjentFileSystemApi {
|
||||
/**
|
||||
* Opens file only for reading
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun openForReading(path: IjentPath.Absolute): IjentFsResult<
|
||||
IjentOpenedFile.Reader,
|
||||
FileReaderError>
|
||||
@@ -131,6 +135,7 @@ sealed interface IjentFileSystemApi {
|
||||
/**
|
||||
* Opens file only for writing
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun openForWriting(
|
||||
options: WriteOptions,
|
||||
): IjentFsResult<
|
||||
@@ -176,10 +181,11 @@ sealed interface IjentFileSystemApi {
|
||||
interface Other : FileWriterError, IjentFsError.Other
|
||||
}
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun openForReadingAndWriting(options: WriteOptions): IjentFsResult<IjentOpenedFile.ReaderWriter, FileWriterError>
|
||||
|
||||
|
||||
@Throws(DeleteException::class)
|
||||
@Throws(DeleteException::class, IjentUnavailableException::class)
|
||||
suspend fun deleteDirectory(path: IjentPath.Absolute, removeContent: Boolean)
|
||||
|
||||
sealed class DeleteException(
|
||||
@@ -194,7 +200,7 @@ sealed interface IjentFileSystemApi {
|
||||
}
|
||||
|
||||
|
||||
@Throws(CopyException::class)
|
||||
@Throws(CopyException::class, IjentUnavailableException::class)
|
||||
suspend fun copy(options: CopyOptions)
|
||||
|
||||
interface CopyOptions
|
||||
@@ -224,7 +230,7 @@ sealed interface IjentFileSystemApi {
|
||||
sealed interface IjentOpenedFile {
|
||||
val path: IjentPath.Absolute
|
||||
|
||||
@Throws(CloseException::class)
|
||||
@Throws(CloseException::class, IjentUnavailableException::class)
|
||||
suspend fun close()
|
||||
|
||||
sealed class CloseException(
|
||||
@@ -235,6 +241,7 @@ sealed interface IjentOpenedFile {
|
||||
: CloseException(where, additionalMessage), IjentFsError.Other
|
||||
}
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun tell(): IjentFsResult<
|
||||
Long,
|
||||
TellError>
|
||||
@@ -243,6 +250,7 @@ sealed interface IjentOpenedFile {
|
||||
interface Other : TellError, IjentFsError.Other
|
||||
}
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun seek(offset: Long, whence: SeekWhence): IjentFsResult<
|
||||
Long,
|
||||
SeekError>
|
||||
@@ -271,6 +279,7 @@ sealed interface IjentOpenedFile {
|
||||
*
|
||||
* It reads not more than [com.intellij.platform.ijent.spi.RECOMMENDED_MAX_PACKET_SIZE].
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun read(buf: ByteBuffer): IjentFsResult<ReadResult, ReadError>
|
||||
|
||||
/**
|
||||
@@ -280,6 +289,7 @@ sealed interface IjentOpenedFile {
|
||||
*
|
||||
* It reads not more than [com.intellij.platform.ijent.spi.RECOMMENDED_MAX_PACKET_SIZE].
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun read(buf: ByteBuffer, offset: Long): IjentFsResult<ReadResult, ReadError>
|
||||
|
||||
sealed interface ReadResult {
|
||||
@@ -302,6 +312,7 @@ sealed interface IjentOpenedFile {
|
||||
*
|
||||
* It writes not more than [com.intellij.platform.ijent.spi.RECOMMENDED_MAX_PACKET_SIZE].
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun write(buf: ByteBuffer): IjentFsResult<
|
||||
Int,
|
||||
WriteError>
|
||||
@@ -311,6 +322,7 @@ sealed interface IjentOpenedFile {
|
||||
*
|
||||
* It writes not more than [com.intellij.platform.ijent.spi.RECOMMENDED_MAX_PACKET_SIZE].
|
||||
*/
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun write(buf: ByteBuffer, pos: Long): IjentFsResult<
|
||||
Int,
|
||||
WriteError>
|
||||
@@ -327,7 +339,7 @@ sealed interface IjentOpenedFile {
|
||||
interface Other : WriteError, IjentFsError.Other
|
||||
}
|
||||
|
||||
@Throws(FlushException::class)
|
||||
@Throws(FlushException::class, IjentUnavailableException::class)
|
||||
suspend fun flush()
|
||||
|
||||
sealed class FlushException(
|
||||
@@ -338,7 +350,7 @@ sealed interface IjentOpenedFile {
|
||||
: FlushException(where, additionalMessage), IjentFsError.Other
|
||||
}
|
||||
|
||||
@Throws(TruncateException::class)
|
||||
@Throws(TruncateException::class, IjentUnavailableException::class)
|
||||
suspend fun truncate(size: Long)
|
||||
|
||||
sealed class TruncateException(
|
||||
@@ -364,7 +376,7 @@ interface IjentFileSystemPosixApi : IjentFileSystemApi {
|
||||
// todo
|
||||
}
|
||||
|
||||
@kotlin.jvm.Throws(CreateDirectoryException::class)
|
||||
@Throws(CreateDirectoryException::class, IjentUnavailableException::class)
|
||||
suspend fun createDirectory(path: IjentPath.Absolute, attributes: List<CreateDirAttributePosix>)
|
||||
|
||||
sealed class CreateDirectoryException(
|
||||
@@ -378,6 +390,7 @@ interface IjentFileSystemPosixApi : IjentFileSystemApi {
|
||||
class Other(where: IjentPath.Absolute, additionalMessage: @NlsSafe String) : CreateDirectoryException(where, additionalMessage), IjentFsError.Other
|
||||
}
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
override suspend fun listDirectoryWithAttrs(
|
||||
path: IjentPath.Absolute,
|
||||
resolveSymlinks: Boolean,
|
||||
@@ -385,6 +398,7 @@ interface IjentFileSystemPosixApi : IjentFileSystemApi {
|
||||
Collection<Pair<String, IjentPosixFileInfo>>,
|
||||
IjentFileSystemApi.ListDirectoryError>
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
override suspend fun stat(path: IjentPath.Absolute, resolveSymlinks: Boolean): IjentFsResult<
|
||||
IjentPosixFileInfo,
|
||||
IjentFileSystemApi.StatError>
|
||||
@@ -393,8 +407,10 @@ interface IjentFileSystemPosixApi : IjentFileSystemApi {
|
||||
interface IjentFileSystemWindowsApi : IjentFileSystemApi {
|
||||
override val user: IjentWindowsInfo.User
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
suspend fun getRootDirectories(): Collection<IjentPath.Absolute>
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
override suspend fun listDirectoryWithAttrs(
|
||||
path: IjentPath.Absolute,
|
||||
resolveSymlinks: Boolean,
|
||||
@@ -402,6 +418,7 @@ interface IjentFileSystemWindowsApi : IjentFileSystemApi {
|
||||
Collection<Pair<String, IjentWindowsFileInfo>>,
|
||||
IjentFileSystemApi.ListDirectoryError>
|
||||
|
||||
@Throws(IjentUnavailableException::class)
|
||||
override suspend fun stat(path: IjentPath.Absolute, resolveSymlinks: Boolean): IjentFsResult<
|
||||
IjentWindowsFileInfo,
|
||||
IjentFileSystemApi.StatError>
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
|
||||
package com.intellij.platform.ijent.spi
|
||||
|
||||
import com.intellij.openapi.diagnostic.*
|
||||
import com.intellij.openapi.diagnostic.Attachment
|
||||
import com.intellij.openapi.diagnostic.debug
|
||||
import com.intellij.openapi.diagnostic.logger
|
||||
import com.intellij.platform.ijent.IjentApplicationScope
|
||||
import com.intellij.platform.ijent.IjentId
|
||||
import com.intellij.platform.ijent.IjentUnavailableException
|
||||
import com.intellij.platform.util.coroutines.childScope
|
||||
import com.intellij.util.containers.ContainerUtil
|
||||
import com.intellij.util.io.awaitExit
|
||||
import com.intellij.util.io.blockingDispatcher
|
||||
import kotlinx.coroutines.*
|
||||
@@ -13,11 +17,13 @@ import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.filterNotNull
|
||||
import kotlinx.coroutines.flow.takeWhile
|
||||
import org.jetbrains.annotations.VisibleForTesting
|
||||
import java.io.IOException
|
||||
import java.time.ZonedDateTime
|
||||
import java.time.format.DateTimeParseException
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.coroutines.AbstractCoroutineContextElement
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toKotlinDuration
|
||||
@@ -25,8 +31,11 @@ import kotlin.time.toKotlinDuration
|
||||
/**
|
||||
* A wrapper for a [Process] that runs IJent. The wrapper logs stderr lines, waits for the exit code, terminates the process in case
|
||||
* of problems in the IDE.
|
||||
*
|
||||
* [processExit] never throws. When it completes, it either means that the process has finished, or that the whole scope of IJent processes
|
||||
* is canceled.
|
||||
*/
|
||||
class IjentSessionMediator private constructor(val scope: CoroutineScope, val process: Process) {
|
||||
class IjentSessionMediator private constructor(val scope: CoroutineScope, val process: Process, val processExit: Deferred<Unit>) {
|
||||
enum class ExpectedErrorCode {
|
||||
/** During initialization, even a sudden successful exit is an error. */
|
||||
NO,
|
||||
@@ -46,12 +55,17 @@ class IjentSessionMediator private constructor(val scope: CoroutineScope, val pr
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
fun create(process: Process, ijentId: IjentId): IjentSessionMediator {
|
||||
val lastStderrMessages = MutableSharedFlow<String?>(
|
||||
replay = 0,
|
||||
extraBufferCapacity = 30,
|
||||
replay = 30,
|
||||
extraBufferCapacity = 0,
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST,
|
||||
)
|
||||
|
||||
val connectionScope = IjentApplicationScope.instance().childScope("ijent $ijentId > connection scope", supervisor = false)
|
||||
val exceptionHandler = IjentSessionCoroutineExceptionHandler(ijentId)
|
||||
val connectionScope = IjentApplicationScope.instance().childScope(
|
||||
"ijent $ijentId > connection scope",
|
||||
supervisor = false,
|
||||
context = exceptionHandler,
|
||||
)
|
||||
|
||||
// stderr logger should outlive the current scope. In case if an error appears, the scope is cancelled immediately, but the whole
|
||||
// intention of the stderr logger is to write logs of the remote process, which come from the remote machine to the local one with
|
||||
@@ -60,9 +74,11 @@ class IjentSessionMediator private constructor(val scope: CoroutineScope, val pr
|
||||
ijentProcessStderrLogger(process, ijentId, lastStderrMessages)
|
||||
}
|
||||
|
||||
val mediator = IjentSessionMediator(connectionScope, process)
|
||||
val processExit = CompletableDeferred<Unit>()
|
||||
|
||||
val awaiterScope = IjentApplicationScope.instance().launch(CoroutineName("ijent $ijentId > exit awaiter scope")) {
|
||||
val mediator = IjentSessionMediator(connectionScope, process, processExit)
|
||||
|
||||
val awaiterScope = IjentApplicationScope.instance().launch(CoroutineName("ijent $ijentId > exit awaiter scope") + exceptionHandler) {
|
||||
ijentProcessExitAwaiter(ijentId, mediator, lastStderrMessages)
|
||||
}
|
||||
|
||||
@@ -70,19 +86,43 @@ class IjentSessionMediator private constructor(val scope: CoroutineScope, val pr
|
||||
ijentProcessFinalizer(ijentId, mediator)
|
||||
}
|
||||
|
||||
awaiterScope.invokeOnCompletion {
|
||||
finalizerScope.cancel()
|
||||
}
|
||||
|
||||
finalizerScope.invokeOnCompletion {
|
||||
connectionScope.cancel()
|
||||
awaiterScope.invokeOnCompletion { err ->
|
||||
processExit.complete(Unit)
|
||||
finalizerScope.cancel(if (err != null) CancellationException(err.message, err) else null)
|
||||
}
|
||||
|
||||
return mediator
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
val lastStderrMessagesTimeout = 5.seconds // A random timeout.
|
||||
private class IjentSessionCoroutineExceptionHandler(
|
||||
private val ijentId: IjentId,
|
||||
) : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
|
||||
private val loggedErrors = Collections.newSetFromMap(ContainerUtil.createConcurrentWeakMap<Throwable, Boolean>())
|
||||
|
||||
override fun toString(): String = javaClass.simpleName
|
||||
|
||||
override fun handleException(context: CoroutineContext, exception: Throwable) {
|
||||
when (exception) {
|
||||
is IjentUnavailableException -> when (exception) {
|
||||
is IjentUnavailableException.ClosedByApplication -> Unit
|
||||
|
||||
is IjentUnavailableException.CommunicationFailure -> {
|
||||
if (!exception.exitedExpectedly && loggedErrors.add(exception)) {
|
||||
LOG.error("Exception in connection with IJent $ijentId: ${exception.message}", exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is CancellationException -> Unit
|
||||
|
||||
else -> {
|
||||
if (loggedErrors.add(exception)) {
|
||||
LOG.error("Unexpected error during communnication with IJent $ijentId", exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,12 +196,11 @@ private fun logIjentStderr(ijentId: IjentId, line: String) {
|
||||
})
|
||||
}
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
private suspend fun ijentProcessExitAwaiter(
|
||||
ijentId: IjentId,
|
||||
mediator: IjentSessionMediator,
|
||||
lastStderrMessages: MutableSharedFlow<String?>,
|
||||
) {
|
||||
): Nothing {
|
||||
val exitCode = mediator.process.awaitExit()
|
||||
LOG.debug { "IJent process $ijentId exited with code $exitCode" }
|
||||
|
||||
@@ -171,24 +210,24 @@ private suspend fun ijentProcessExitAwaiter(
|
||||
IjentSessionMediator.ExpectedErrorCode.ANY -> true
|
||||
}
|
||||
|
||||
if (!isExitExpected) {
|
||||
// This coroutine must be bound to something that outlives `coroutineScope`, in order to not block its cancellation and
|
||||
// to not truncate the last lines of the logs, which are usually the most important.
|
||||
GlobalScope.launch {
|
||||
val stderr = StringBuilder()
|
||||
try {
|
||||
withTimeout(IjentSessionMediator.lastStderrMessagesTimeout) {
|
||||
collectLines(lastStderrMessages, stderr)
|
||||
}
|
||||
throw if (isExitExpected) {
|
||||
IjentUnavailableException.CommunicationFailure("IJent process exited successfully").apply { exitedExpectedly = true }
|
||||
}
|
||||
else {
|
||||
val stderr = StringBuilder()
|
||||
// This code blocks the whole coroutine scope, so it should
|
||||
withContext(NonCancellable) {
|
||||
val timeoutResult: Unit? = withTimeoutOrNull(1.seconds) {
|
||||
collectLines(lastStderrMessages, stderr)
|
||||
}
|
||||
finally {
|
||||
// There's `LOG.error(message, Attachment)`, but it doesn't work well with `LoggedErrorProcessor.executeAndReturnLoggedError`.
|
||||
LOG.error(RuntimeExceptionWithAttachments(
|
||||
"The process $ijentId suddenly exited with the code $exitCode",
|
||||
Attachment("stderr", stderr.toString()),
|
||||
))
|
||||
if (timeoutResult == null) {
|
||||
stderr.append("\n<didn't collect the whole stderr>")
|
||||
}
|
||||
}
|
||||
IjentUnavailableException.CommunicationFailure(
|
||||
"The process $ijentId suddenly exited with the code $exitCode",
|
||||
Attachment("stderr", stderr.toString()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,13 +242,19 @@ private suspend fun collectLines(lastStderrMessages: SharedFlow<String?>, stderr
|
||||
}
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
private suspend fun ijentProcessFinalizer(ijentId: IjentId, mediator: IjentSessionMediator) {
|
||||
private suspend fun ijentProcessFinalizer(ijentId: IjentId, mediator: IjentSessionMediator): Nothing {
|
||||
try {
|
||||
awaitCancellation()
|
||||
}
|
||||
catch (err: Exception) {
|
||||
LOG.debug(err) { "$ijentId is going to be terminated due to receiving an error" }
|
||||
throw err
|
||||
throw when (val cause = generateSequence(err, Throwable::cause).firstOrNull { it !is CancellationException }) {
|
||||
null -> err
|
||||
is IjentUnavailableException -> cause
|
||||
else -> {
|
||||
LOG.debug(err) { "$ijentId is going to be terminated due to receiving an error" }
|
||||
IjentUnavailableException.CommunicationFailure("IJent communication terminated due to an error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
mediator.expectedErrorCode = IjentSessionMediator.ExpectedErrorCode.ANY
|
||||
|
||||
@@ -514,6 +514,8 @@ private class MockIjentApi(private val adapter: GeneralCommandLine, val rootUser
|
||||
|
||||
override fun close(): Unit = Unit
|
||||
|
||||
override suspend fun waitUntilExit(): Unit = Unit
|
||||
|
||||
override val exec: IjentExecApi get() = MockIjentExecApi(adapter, rootUser)
|
||||
|
||||
override val fs: IjentFileSystemPosixApi get() = throw UnsupportedOperationException()
|
||||
|
||||
Reference in New Issue
Block a user