IJ-MR-176106-to-253

[pycharm] PY-81494 Fix deadlocking code


Merge-request: IJ-MR-179432
Merged-by: David Lysenko <david.lysenko@jetbrains.com>
[pycharm] PY-81494 Config fixes

PY-81494

[pycharm] PY-81494 Fix further flakiness

[pycharm] PY-81494 Address feedback

[pycharm] PY-81494 Fix flakiness

[pycharm] PY-81494 Separate modules

[pycharm] PY-81494 Fix tests

[pycharm] PY-81494 Change waitFor to awaitExit for coroutines

[pycharm] PY-81494 Configuration fixes

[pycharm] PY-81494 Add usage statistics

[pycharm] PY-81494 Address feedback

[pycharm] PY-81494 Add more limit tests

[pycharm] PY-81494 Post-rebase fixes

[pycharm] PY-81494 Implement logging tests

[pycharm] PY-81494 Final design adjustments

[pycharm] PY-81494 Refactor flows

[pycharm] PY-81494 Add more OutputSection tests

[pycharm] PY-81494 Add Toolbar tests

[pycharm] PY-81494 Add InterText tests

[pycharm] PY-81494 Add FilterActionGroup tests

[pycharm] PY-81494 Add EmptyContainerNotice tests

[pycharm] PY-81494 Add CollapsibleListSection tests

[pycharm] PY-81494 Add ActionIconButton tests

[pycharm] PY-81494 Address feedback

[pycharm] PY-81494 Address feedback

[pycharm] PY-81494 Post-rebase fixes

[pycharm] PY-81494 Address initial feedback

[pycharm] PY-81494 Fix existing tests & add new to tree

[pycharm] PY-81494 Implement copy to clipboard button

[pycharm] PY-81494 Amend design

[pycharm] PY-81494 Begin implementing output tests

[pycharm] PY-81494 Refactor file structure

[pycharm] PY-81494 Implement tests for process list

[pycharm] PY-81494 wip tests for process list

[pycharm] PY-81494 Finishing touches

[pycharm] PY-81494 Consolidate list logic in the model

[pycharm] PY-81494 Add logging limits

[pycharm] PY-81494 Implement open tool window on exec service error

[pycharm] PY-81494 Implement open command in terminal

[pycharm] PY-81494 Add expansion actions

[pycharm] PY-81494 Implement categorization by coroutine names

[pycharm] PY-81494 Memorize expansion states between tool window openings

[pycharm] PY-81494 Memorize scroll state between tool window openings

[pycharm] PY-81494 Introduce collapsible section for process info

[pycharm] PY-81494 Implement view setting filtering

[pycharm] PY-81494 Refactor process logging to use shared flows

[pycharm] PY-81494 Implement tests for ProcessList composable

[pycharm] PY-81494 Implement process toolwindow prototype

Merge-request: IJ-MR-176106
Merged-by: David Lysenko <david.lysenko@jetbrains.com>


Merge-request: IJ-MR-179303
Merged-by: David Lysenko <david.lysenko@jetbrains.com>

GitOrigin-RevId: 44552a582dd628d206b207e02e6f24c7749b4d9f
This commit is contained in:
David Lysenko
2025-10-22 09:04:47 +00:00
committed by intellij-monorepo-bot
parent bf299d23c2
commit fc7d863a50
93 changed files with 4349 additions and 64 deletions

View File

@@ -41,6 +41,7 @@ jvm_library(
"//libraries/kotlinx/serialization/json",
"//libraries/kotlinx/serialization/core",
"//platform/remote-servers/impl",
"@lib//:guava",
]
)
@@ -76,6 +77,7 @@ jvm_library(
"//libraries/kotlinx/serialization/json",
"//libraries/kotlinx/serialization/core",
"//platform/remote-servers/impl",
"@lib//:guava",
]
)
### auto-generated section `build intellij.python.community.execService` end

View File

@@ -52,5 +52,6 @@
<orderEntry type="module" module-name="intellij.libraries.kotlinx.serialization.json" />
<orderEntry type="module" module-name="intellij.libraries.kotlinx.serialization.core" />
<orderEntry type="module" module-name="intellij.platform.remoteServers.impl" />
<orderEntry type="library" name="Guava" level="project" />
</component>
</module>

View File

@@ -73,7 +73,8 @@ internal object ExecServiceImpl : ExecService {
}
return@withTimeout processLauncher.createExecError(
messageToUser = additionalMessage,
errorReason = ExecErrorReason.UnexpectedProcessTermination(output)
errorReason = ExecErrorReason.UnexpectedProcessTermination(output),
loggedProcessId = process.loggedProcess.id,
)
}
Result.success(successResult)
@@ -83,7 +84,8 @@ internal object ExecServiceImpl : ExecService {
processLauncher.killAndJoin()
processLauncher.createExecError(
messageToUser = PyExecBundle.message("py.exec.timeout.error", description, options.timeout),
errorReason = ExecErrorReason.Timeout
errorReason = ExecErrorReason.Timeout,
loggedProcessId = process.loggedProcess.id,
)
}
return@coroutineScope result
@@ -91,12 +93,17 @@ internal object ExecServiceImpl : ExecService {
}
}
private fun <T : ExecErrorReason> ProcessLauncher.createExecError(messageToUser: @Nls String, errorReason: T): Result.Failure<ExecErrorImpl<T>> =
private fun <T : ExecErrorReason> ProcessLauncher.createExecError(
messageToUser: @Nls String,
errorReason: T,
loggedProcessId: Int? = null,
): Result.Failure<ExecErrorImpl<T>> =
ExecErrorImpl(
exe = exeForError,
args = args.toTypedArray(),
additionalMessageToUser = messageToUser,
errorReason = errorReason
errorReason = errorReason,
loggedProcessId = loggedProcessId,
).logAndFail()

View File

@@ -0,0 +1,240 @@
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.python.community.execService.impl
import com.google.common.io.ByteStreams
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.components.Service
import com.intellij.openapi.components.service
import com.intellij.util.io.awaitExit
import com.intellij.util.io.readLineAsync
import com.jetbrains.python.TraceContext
import com.jetbrains.python.errorProcessing.Exe
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import org.jetbrains.annotations.ApiStatus
import org.jetbrains.annotations.Nls
import java.io.BufferedReader
import java.io.ByteArrayInputStream
import java.io.IOException
import java.io.InputStream
import java.io.InputStreamReader
import java.io.OutputStream
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Clock
import kotlin.time.Instant
internal object LoggingLimits {
/**
* The maximum buffer size of a LoggingProcess
*/
const val MAX_OUTPUT_SIZE = 10_000_000
const val MAX_LINES = 1024
}
@ApiStatus.Internal
data class LoggedProcess(
val traceContext: TraceContext?,
val pid: Long?,
val startedAt: Instant,
val cwd: String?,
val exe: LoggedProcessExe,
val args: List<String>,
val env: Map<String, String>,
val lines: SharedFlow<LoggedProcessLine>,
val exitInfo: MutableStateFlow<LoggedProcessExitInfo?>,
) {
val id: Int = nextId.getAndAdd(1)
val commandString: String
get() = commandFromSegments(listOf(exe.path) + args)
/**
* Command string with the full path of the exe trimmed only to the latest segments. E.g., `/usr/bin/uv` -> `uv`.
*/
val shortenedCommandString: String
get() = commandFromSegments(listOf(exe.parts.last()) + args)
companion object {
private val nextId: AtomicInteger = AtomicInteger(0)
private fun commandFromSegments(segments: List<String>) =
segments.joinToString(" ")
}
}
@ApiStatus.Internal
data class LoggedProcessExe(
val path: String,
val parts: List<String>,
)
@ApiStatus.Internal
data class LoggedProcessExitInfo(
val exitedAt: Instant,
val exitValue: Int,
val additionalMessageToUser: @Nls String? = null,
)
@ApiStatus.Internal
data class LoggedProcessLine(
val text: String,
val kind: Kind,
) {
enum class Kind {
OUT,
ERR
}
}
@ApiStatus.Internal
@Service
class ExecLoggerService(val scope: CoroutineScope) {
internal val processesInternal = MutableSharedFlow<LoggedProcess>()
val processes: Flow<LoggedProcess> = processesInternal.asSharedFlow()
}
@ApiStatus.Internal
class LoggingProcess(
private val backingProcess: Process,
traceContext: TraceContext?,
startedAt: Instant,
cwd: String?,
exe: Exe,
args: List<String>,
env: Map<String, String>,
) : Process() {
val loggedProcess: LoggedProcess
private val stdoutStream = LoggingInputStream(backingProcess.inputStream)
private val stderrStream = LoggingInputStream(backingProcess.errorStream)
init {
val service = ApplicationManager.getApplication().service<ExecLoggerService>()
val linesFlow = MutableSharedFlow<LoggedProcessLine>(replay = LoggingLimits.MAX_LINES)
val exitInfoFlow = MutableStateFlow<LoggedProcessExitInfo?>(null)
loggedProcess =
LoggedProcess(
traceContext,
try {
backingProcess.pid()
}
catch (_: UnsupportedOperationException) {
null
},
startedAt,
cwd,
LoggedProcessExe(
path = exe.toString(),
parts = exe.pathParts(),
),
args,
env,
linesFlow,
exitInfoFlow,
)
service.scope.launch {
service.processesInternal.emit(loggedProcess)
awaitExit()
val stdoutReader = BufferedReader(InputStreamReader(ByteArrayInputStream(stdoutStream.byteArray)))
val stderrReader = BufferedReader(InputStreamReader(ByteArrayInputStream(stderrStream.byteArray)))
collectOutputLines(stdoutReader, linesFlow, LoggedProcessLine.Kind.OUT)
collectOutputLines(stderrReader, linesFlow, LoggedProcessLine.Kind.ERR)
exitInfoFlow.value = LoggedProcessExitInfo(
exitedAt = Clock.System.now(),
exitValue = exitValue(),
)
}
}
override fun getOutputStream(): OutputStream =
backingProcess.outputStream
override fun getInputStream(): InputStream =
stdoutStream
override fun getErrorStream(): InputStream =
stderrStream
override fun waitFor(): Int {
return backingProcess.waitFor()
}
override fun waitFor(timeout: Long, unit: TimeUnit): Boolean {
return backingProcess.waitFor(timeout, unit)
}
override fun exitValue(): Int =
backingProcess.exitValue()
override fun destroy(): Unit =
backingProcess.destroy()
override fun destroyForcibly(): Process? =
backingProcess.destroyForcibly()
override fun toHandle(): ProcessHandle? =
backingProcess.toHandle()
override fun supportsNormalTermination(): Boolean =
backingProcess.supportsNormalTermination()
}
private class LoggingInputStream(
private val backingInputStream: InputStream,
) : InputStream() {
private val bytes = ByteStreams.newDataOutput()
private var tail = 0
val byteArray
get() = bytes.toByteArray()
override fun read(): Int {
val byte = try {
backingInputStream.read()
}
catch (e: IOException) {
// ugly hack; but the Process' `.destroy` methods abruptly close
// the stream, making all pending readers throw an exception.
// we can handle this case as legal here
if (e.message == "Stream closed") {
return -1
}
throw e
}
if (tail < LoggingLimits.MAX_OUTPUT_SIZE && byte != -1) {
bytes.write(byte)
tail += 1
}
return byte
}
}
private suspend fun collectOutputLines(
reader: BufferedReader,
linesFlow: MutableSharedFlow<LoggedProcessLine>,
kind: LoggedProcessLine.Kind,
) {
var line: String? = null
while (reader.readLineAsync()?.also { line = it } != null) {
linesFlow.emit(LoggedProcessLine(
text = line!!,
kind = kind,
))
}
}

View File

@@ -5,10 +5,13 @@ import com.intellij.openapi.diagnostic.fileLogger
import com.intellij.platform.eel.provider.utils.ProcessFunctions
import com.intellij.python.community.execService.Args
import com.intellij.python.community.execService.TtySize
import com.intellij.python.community.execService.impl.LoggingProcess
import com.jetbrains.python.Result
import com.jetbrains.python.TraceContext
import com.jetbrains.python.errorProcessing.Exe
import com.jetbrains.python.errorProcessing.ExecErrorReason
import kotlinx.coroutines.CoroutineScope
import kotlin.time.Clock
private val logger = fileLogger()
@@ -17,7 +20,20 @@ internal class ProcessLauncher(
val args: List<String>,
private val processCommands: ProcessCommands,
) {
suspend fun start(): Result<Process, ExecErrorReason.CantStart> = processCommands.start()
suspend fun start(): Result<LoggingProcess, ExecErrorReason.CantStart> =
processCommands.start()
.mapSuccess {
LoggingProcess(
it,
processCommands.scopeToBind.coroutineContext[TraceContext.Key],
Clock.System.now(),
processCommands.cwd,
exeForError,
args,
processCommands.env,
)
}
suspend fun killAndJoin() {
processCommands.processFunctions.killAndJoin(logger, exeForError.toString())
}
@@ -26,11 +42,14 @@ internal class ProcessLauncher(
internal interface ProcessCommands {
suspend fun start(): Result<Process, ExecErrorReason.CantStart>
val processFunctions: ProcessFunctions
val scopeToBind: CoroutineScope
val env: Map<String, String>
val cwd: String?
}
internal data class LaunchRequest(
val scopeToBind: CoroutineScope,
val args: Args,
val env: Map<String, String>,
val usePty: TtySize?
)
val usePty: TtySize?,
)

View File

@@ -15,6 +15,8 @@ import com.jetbrains.python.Result
import com.jetbrains.python.errorProcessing.Exe
import com.jetbrains.python.errorProcessing.ExecErrorReason
import kotlinx.coroutines.CoroutineScope
import java.nio.file.Path
import kotlin.io.path.pathString
internal suspend fun createProcessLauncherOnEel(binOnEel: BinOnEel, launchRequest: LaunchRequest): ProcessLauncher {
val exePath: EelPath = with(binOnEel) {
@@ -35,15 +37,18 @@ internal suspend fun createProcessLauncherOnEel(binOnEel: BinOnEel, launchReques
}
private class EelProcessCommands(
private val scopeToBind: CoroutineScope,
override val scopeToBind: CoroutineScope,
private val binOnEel: BinOnEel,
private val path: EelPath,
private val args: List<String>,
private val env: Map<String, String>,
override val env: Map<String, String>,
private val tty: TtySize?,
) : ProcessCommands {
private var eelProcess: EelProcess? = null
override val cwd: String?
get() = binOnEel.workDir?.toRealPath()?.pathString
override val processFunctions: ProcessFunctions = ProcessFunctions(
waitForExit = { eelProcess?.exitCode?.await() },
killProcess = { eelProcess?.kill() }

View File

@@ -10,6 +10,7 @@ import com.intellij.execution.target.local.LocalTargetEnvironmentRequest
import com.intellij.execution.target.local.LocalTargetPtyOptions
import com.intellij.openapi.diagnostic.fileLogger
import com.intellij.openapi.project.ProjectManager
import com.intellij.openapi.util.io.toNioPathOrNull
import com.intellij.platform.eel.provider.utils.ProcessFunctions
import com.intellij.platform.eel.provider.utils.bindProcessToScopeImpl
import com.intellij.python.community.execService.BinOnTarget
@@ -24,6 +25,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import com.intellij.remoteServer.util.ServerRuntimeException
import java.nio.file.Path
import kotlin.io.path.pathString
import kotlin.time.Duration.Companion.milliseconds
@@ -83,11 +85,16 @@ internal suspend fun createProcessLauncherOnTarget(binOnTarget: BinOnTarget, lau
}
private class TargetProcessCommands(
private val scopeToBind: CoroutineScope,
override val scopeToBind: CoroutineScope,
private val exePath: FullPathOnTarget,
private val targetEnv: TargetEnvironment,
private val cmdLine: TargetedCommandLine,
) : ProcessCommands {
override val env: Map<String, String>
get() = cmdLine.environmentVariables
override val cwd: String?
get() = cmdLine.workingDirectory
private var process: Process? = null
@@ -115,4 +122,4 @@ private class TargetProcessCommands(
}
}
private fun ExecutionException.asCantStart(): Result.Failure<ExecErrorReason.CantStart> = Result.failure(ExecErrorReason.CantStart(null, localizedMessage))
private fun ExecutionException.asCantStart(): Result.Failure<ExecErrorReason.CantStart> = Result.failure(ExecErrorReason.CantStart(null, localizedMessage))

View File

@@ -0,0 +1,240 @@
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.python.junit5Tests.unit
import com.intellij.python.community.execService.impl.LoggedProcess
import com.intellij.python.community.execService.impl.LoggedProcessExe
import com.intellij.python.community.execService.impl.LoggedProcessLine
import com.intellij.python.community.execService.impl.LoggingLimits
import com.intellij.python.community.execService.impl.LoggingProcess
import com.intellij.testFramework.common.timeoutRunBlocking
import com.intellij.testFramework.common.waitUntil
import com.intellij.testFramework.junit5.TestApplication
import com.jetbrains.python.TraceContext
import com.jetbrains.python.errorProcessing.Exe
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.util.concurrent.CompletableFuture
import kotlin.time.Clock
import kotlin.time.Instant
private class LoggingTest {
@Nested
inner class LoggedProcessTest {
@Test
fun `loggedProcess id should increment with each instantiation`() {
val process1 = process("process1")
val process2 = process("process2")
val process3 = process("process2")
assert(process1.id + 1 == process2.id)
assert(process2.id + 1 == process3.id)
}
@Test
fun `commandString is constructed as expected`() {
val process1 = process("/usr/bin/uv", "install", "requests")
assertEquals("/usr/bin/uv install requests", process1.commandString)
}
@Test
fun `shortenedCommandString is constructed as expected from multiple segments`() {
val process1 = process("/usr/bin/uv", "install", "requests")
assertEquals("uv install requests", process1.shortenedCommandString)
}
@Test
fun `shortenedCommandString is constructed as expected from single segment`() {
val process1 = process("uv", "install", "requests")
assertEquals("uv install requests", process1.shortenedCommandString)
}
}
@TestApplication
@Nested
inner class LoggingProcessTest {
@Test
fun `logged process gets created correctly`() = timeoutRunBlocking {
val traceContext = TraceContext("some trace")
val loggingProcess = fakeLoggingProcess(
stdout = "stdout text",
stderr = "stderr text",
exitValue = 10,
pid = 100,
traceContext = traceContext,
startedAt = Instant.fromEpochSeconds(100),
cwd = "/some/cwd",
pathToExe = "/usr/bin/exe",
args = listOf("foo", "bar"),
env = mapOf("foo" to "bar")
)
val loggedProcess = loggingProcess.loggedProcess
val stdout = loggingProcess.inputStream.readAllBytes().toString(charset = Charsets.UTF_8)
val stderr = loggingProcess.errorStream.readAllBytes().toString(charset = Charsets.UTF_8)
assert(traceContext == loggedProcess.traceContext)
assert(100L == loggedProcess.pid)
assert(Instant.fromEpochSeconds(100) == loggedProcess.startedAt)
assert("/some/cwd" == loggedProcess.cwd)
assert(LoggedProcessExe(path = "/usr/bin/exe", listOf("usr", "bin", "exe")) == loggedProcess.exe)
assert(listOf("foo", "bar") == loggedProcess.args)
assert(mapOf("foo" to "bar") == loggedProcess.env)
assert(stdout == "stdout text")
assert(stderr == "stderr text")
loggingProcess.destroy()
}
@Test
fun `lines get properly collected from out and err`() = timeoutRunBlocking {
val loggingProcess = fakeLoggingProcess(
"outline1\noutline2\noutline3",
"errline1\nerrline2\nerrline3"
)
val loggedProcess = loggingProcess.loggedProcess
assert(loggedProcess.lines.replayCache.isEmpty())
loggingProcess.inputStream.readAllBytes()
loggingProcess.errorStream.readAllBytes()
waitUntil { loggedProcess.lines.replayCache.size == 6 }
(1..3).forEach {
assert(loggedProcess.lines.replayCache[it - 1].text == "outline$it")
assert(loggedProcess.lines.replayCache[it - 1].kind == LoggedProcessLine.Kind.OUT)
}
(4..6).forEach {
assert(loggedProcess.lines.replayCache[it - 1].text == "errline${it - 3}")
assert(loggedProcess.lines.replayCache[it - 1].kind == LoggedProcessLine.Kind.ERR)
}
loggingProcess.destroy()
}
@Test
fun `exit info gets properly populated`() = timeoutRunBlocking {
val now = Clock.System.now()
val loggingProcess = fakeLoggingProcess(
exitValue = 30
)
val loggedProcess = loggingProcess.loggedProcess
loggingProcess.destroy()
waitUntil { loggedProcess.exitInfo.value != null }
assert(loggedProcess.exitInfo.value!!.exitValue == 30)
assert(loggedProcess.exitInfo.value!!.exitedAt >= now)
}
@Disabled
@Test
fun `old lines are evicted when the line limit is reached`() = timeoutRunBlocking {
val loggingProcess = fakeLoggingProcess(
stdout = buildString {
repeat(LoggingLimits.MAX_LINES + 2) {
appendLine("line$it")
}
},
stderr = ""
)
val loggedProcess = loggingProcess.loggedProcess
loggingProcess.inputStream.readAllBytes()
loggingProcess.errorStream.readAllBytes()
loggingProcess.destroy()
waitUntil { loggedProcess.lines.replayCache.last().text == "line${LoggingLimits.MAX_LINES + 1}" }
assert(loggedProcess.lines.replayCache.size == LoggingLimits.MAX_LINES)
assert(loggedProcess.lines.replayCache[0].text == "line2")
}
// todo: add limits test
}
companion object {
fun process(vararg command: String) =
LoggedProcess(
traceContext = null,
pid = 123,
startedAt = Clock.System.now(),
cwd = null,
exe = LoggedProcessExe(
path = command.first(),
parts = command.first().split(Regex("[/\\\\]+"))
),
args = command.drop(1),
env = mapOf(),
lines = MutableSharedFlow(),
exitInfo = MutableStateFlow(null),
)
fun fakeLoggingProcess(
stdout: String = "stdout",
stderr: String = "stderr",
exitValue: Int = 0,
pid: Long = 0,
traceContext: TraceContext? = null,
startedAt: Instant = Instant.fromEpochSeconds(0),
cwd: String? = "/some/cwd",
pathToExe: String = "/usr/bin/exe",
args: List<String> = listOf("foo", "bar"),
env: Map<String, String> = mapOf("foo" to "bar"),
) =
LoggingProcess(
object : Process() {
val stdoutStream = ByteArrayInputStream(stdout.toByteArray())
val stderrStream = ByteArrayInputStream(stderr.toByteArray())
val stdinStream = ByteArrayOutputStream()
val destroyFuture = CompletableFuture<Int>()
override fun getOutputStream(): OutputStream =
stdinStream
override fun getInputStream(): InputStream =
stdoutStream
override fun getErrorStream(): InputStream =
stderrStream
override fun waitFor(): Int {
destroyFuture.get()
return exitValue
}
override fun exitValue(): Int {
return exitValue
}
override fun destroy() {
destroyFuture.complete(10)
}
override fun pid(): Long =
pid
},
traceContext,
startedAt,
cwd,
Exe.fromString(pathToExe),
args,
env
)
}
}