IJPL-162040: as simple as possible variant

- Put lock state into coroutine context in `runBlockingCancellable`.
 - Use lock state from context if thread local is empty.
 - Fix tests which become not valid in new mode.
 - Add tests for new functionality.
 - Add feature flag to enable this new mode by default.

Feature flag: `-Dide.store.lock.in.context={true|false}`. `true` (new mode) is a default.

Merge-request: IJ-MR-146415
Merged-by: Lev Serebryakov <Lev.Serebryakov@jetbrains.com>

(cherry picked from commit 49b4f448f652a2b074501f0cc6fca047cd4d5a8b)

IJ-CR-147289

GitOrigin-RevId: a13959a241fd939dbe71ba52135362c30ff81b59
This commit is contained in:
Lev Serebryakov
2024-10-15 17:46:25 +00:00
committed by intellij-monorepo-bot
parent fb0757f5d4
commit ecb3e54b63
11 changed files with 304 additions and 22 deletions

View File

@@ -1100,8 +1100,10 @@ com.intellij.openapi.application.Application
- exit(Z,Z,Z,I):V
- a:getDefaultModalityState():com.intellij.openapi.application.ModalityState
- a:getIdleTime():J
- getLockStateAsCoroutineContext():kotlin.coroutines.CoroutineContext
- a:getModalityStateForComponent(java.awt.Component):com.intellij.openapi.application.ModalityState
- a:getStartTime():J
- hasLockStateInContext(kotlin.coroutines.CoroutineContext):Z
- a:hasWriteAction(java.lang.Class):Z
- a:invokeAndWait(java.lang.Runnable):V
- a:invokeAndWait(java.lang.Runnable,com.intellij.openapi.application.ModalityState):V

View File

@@ -10,6 +10,8 @@ import com.intellij.openapi.util.ThrowableComputable;
import com.intellij.util.ThrowableRunnable;
import com.intellij.util.concurrency.ThreadingAssertions;
import com.intellij.util.concurrency.annotations.*;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.EmptyCoroutineContext;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
@@ -672,4 +674,12 @@ public interface Application extends ComponentManager {
assertWriteIntentLockAcquired();
}
//</editor-fold>
default CoroutineContext getLockStateAsCoroutineContext() {
return EmptyCoroutineContext.INSTANCE;
}
default boolean hasLockStateInContext(CoroutineContext context) {
return false;
}
}

View File

@@ -26,3 +26,11 @@ val isCoroutineWILEnabled: Boolean =
@get:ApiStatus.Internal
val isMessageBusErrorPropagationEnabled: Boolean =
System.getProperty("ijpl.message.bus.rethrows.errors.from.subscribers", "false").toBoolean()
/**
* - `false` means lock permits are bound only to threads
* - `true` means lock permits also stored in coroutine contexts.
*/
@get:ApiStatus.Internal
val isLockStoredInContext: Boolean =
System.getProperty("ide.store.lock.in.context", "true").toBoolean()

View File

@@ -15,6 +15,7 @@ import java.util.concurrent.Future
import java.util.function.BooleanSupplier
import java.util.function.Consumer
import javax.swing.JComponent
import kotlin.coroutines.CoroutineContext
interface ThreadingSupport {
@ApiStatus.Internal
@@ -348,4 +349,10 @@ interface ThreadingSupport {
*/
@ApiStatus.Internal
fun isInsideUnlockedWriteIntentLock(): Boolean
@ApiStatus.Internal
fun getPermitAsContextElement(): CoroutineContext
@ApiStatus.Internal
fun hasPermitAsContextElement(context: CoroutineContext): Boolean
}

View File

@@ -8,6 +8,7 @@ import com.intellij.concurrency.installThreadContext
import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.application.ModalityState
import com.intellij.openapi.application.contextModality
import com.intellij.openapi.application.isLockStoredInContext
import com.intellij.openapi.diagnostic.Logger
import com.intellij.openapi.util.Computable
import com.intellij.openapi.util.IntellijInternalApi
@@ -479,8 +480,16 @@ private fun assertBackgroundThreadOrWriteAction() {
@Internal
fun readActionContext(): CoroutineContext {
val application = ApplicationManager.getApplication()
return if (application != null && application.isReadAccessAllowed) {
RunBlockingUnderReadActionMarker
return if (application != null) {
if (isLockStoredInContext) {
application.lockStateAsCoroutineContext
}
else if (application.isReadAccessAllowed) {
RunBlockingUnderReadActionMarker
}
else {
EmptyCoroutineContext
}
}
else {
EmptyCoroutineContext
@@ -490,7 +499,13 @@ fun readActionContext(): CoroutineContext {
@IntellijInternalApi
@Internal
fun CoroutineContext.isRunBlockingUnderReadAction(): Boolean {
return this[RunBlockingUnderReadActionMarker] != null
return if (isLockStoredInContext) {
val application = ApplicationManager.getApplication()
application != null && application.hasLockStateInContext(this)
}
else {
this[RunBlockingUnderReadActionMarker] != null
}
}
private object RunBlockingUnderReadActionMarker

View File

@@ -3,6 +3,7 @@ package com.intellij.openapi.application.impl
import com.intellij.codeWithMe.ClientId.Companion.decorateCallable
import com.intellij.codeWithMe.ClientId.Companion.decorateRunnable
import com.intellij.concurrency.currentThreadContext
import com.intellij.core.rwmutex.*
import com.intellij.diagnostic.PerformanceWatcher
import com.intellij.diagnostic.PluginException
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.function.BooleanSupplier
import java.util.function.Consumer
import javax.swing.JComponent
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
private class ThreadState(var permit: Permit? = null, var prevPermit: WriteIntentPermit? = null, var inListener: Boolean = false, var impatientReader: Boolean = false) {
fun release() {
@@ -48,6 +51,14 @@ private class ThreadState(var permit: Permit? = null, var prevPermit: WriteInten
val hasWrite get() = permit is WritePermit
}
private class LockStateContextElement(val threadState: ThreadState): CoroutineContext.Element {
override val key: CoroutineContext.Key<*>
get() = LockStateContextElement
companion object : CoroutineContext.Key<LockStateContextElement>
}
@Suppress("SSBasedInspection")
@ApiStatus.Internal
internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
@@ -66,14 +77,44 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
private val myWriteActionPending = AtomicInteger(0)
private var myNoWriteActionCounter = AtomicInteger()
private val myState = ThreadLocal.withInitial { ThreadState(null, null,false) }
private val myState = ThreadLocal.withInitial { ThreadState() }
@Volatile
private var myWriteAcquired: Thread? = null
override fun getPermitAsContextElement(): CoroutineContext {
if (!isLockStoredInContext) {
return EmptyCoroutineContext
}
val element = currentThreadContext()[LockStateContextElement]
if (element != null && element.threadState.permit != null) {
return element
}
val ts = myState.get()
if (ts.permit != null) {
return LockStateContextElement(ts)
}
return EmptyCoroutineContext
}
override fun hasPermitAsContextElement(context: CoroutineContext): Boolean = isLockStoredInContext && context[LockStateContextElement] != null
private fun getThreadState(): ThreadState {
val ctxState = if (isLockStoredInContext) currentThreadContext()[LockStateContextElement]?.threadState else null
val thrState = myState.get()
if (ctxState != null) {
check(thrState?.permit == null || ctxState == thrState) { "Lock inconsistency: thread has ${thrState.permit} and context has ${ctxState.permit}" }
return ctxState
}
return thrState
}
// @Throws(E::class)
override fun <T, E : Throwable?> runWriteIntentReadAction(computation: ThrowableComputable<T, E>): T {
val ts = myState.get()
val ts = getThreadState()
var release = true
when(ts.permit) {
null -> ts.permit = getWriteIntentPermit()
@@ -108,11 +149,11 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
}
override fun isWriteIntentLocked(): Boolean {
val ts = myState.get()
val ts = getThreadState()
return ts.hasWrite || ts.hasWriteIntent
}
override fun isReadAccessAllowed(): Boolean = myState.get().hasPermit
override fun isReadAccessAllowed(): Boolean = getThreadState().hasPermit
override fun executeOnPooledThread(action: Runnable, expired: BooleanSupplier): Future<*> {
val actionDecorated = decorateRunnable(action)
@@ -179,7 +220,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
// @Throws(E::class)
override fun <T, E : Throwable?> runUnlockingIntendedWrite(action: ThrowableComputable<T, E>): T {
val ts = myState.get()
val ts = getThreadState()
if (!ts.hasWriteIntent) {
try {
ts.writeIntentReleased = true
@@ -222,7 +263,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
private fun <T, E : Throwable?> runReadAction(clazz: Class<*>, block: ThrowableComputable<T, E>): T {
fireBeforeReadActionStart(clazz)
val ts = myState.get()
val ts = getThreadState()
if (ts.hasPermit) {
val prevImplicitLock = ThreadingAssertions.isImplicitLockOnEDT()
ThreadingAssertions.setImplicitLockOnEDT(false)
@@ -283,7 +324,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
}
override fun tryRunReadAction(action: Runnable): Boolean {
val ts = myState.get()
val ts = getThreadState()
if (ts.hasPermit) {
val prevImplicitLock = ThreadingAssertions.isImplicitLockOnEDT()
ThreadingAssertions.setImplicitLockOnEDT(false)
@@ -319,7 +360,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
}
}
override fun isReadLockedByThisThread() = myState.get().hasRead
override fun isReadLockedByThisThread() = getThreadState().hasRead
@ApiStatus.Internal
override fun setWriteActionListener(listener: WriteActionListener) {
@@ -342,7 +383,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
override fun <T, E : Throwable?> runWriteAction(computation: ThrowableComputable<T, E>): T = runWriteAction(computation.javaClass, computation)
private fun <T, E : Throwable?> runWriteAction(clazz: Class<*>, block: ThrowableComputable<T, E>): T {
val ts = myState.get()
val ts = getThreadState()
val state = startWrite(ts, clazz)
return try {
block.compute()
@@ -397,7 +438,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
title: @NlsContexts.DialogTitle String,
runnable: Runnable) {
ThreadingAssertions.assertWriteIntentReadAccess()
val ts = myState.get()
val ts = getThreadState()
if (ts.hasWriteIntent) {
runModalProgress(project, title, runnable)
return
@@ -465,7 +506,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
@Deprecated
override fun acquireReadActionLock(): AccessToken {
PluginException.reportDeprecatedUsage("ThreadingSupport.acquireReadActionLock", "Use `runReadAction()` instead")
val ts = myState.get()
val ts = getThreadState()
if (ts.hasWrite) {
throw IllegalStateException("Write Action can not request Read Access Token")
}
@@ -497,7 +538,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
return
}
val ts = myState.get()
val ts = getThreadState()
ts.impatientReader = true
try {
runnable.run()
@@ -507,9 +548,9 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
}
}
override fun isInImpatientReader(): Boolean = myState.get().impatientReader
override fun isInImpatientReader(): Boolean = getThreadState().impatientReader
override fun isInsideUnlockedWriteIntentLock(): Boolean = myState.get().writeIntentReleased
override fun isInsideUnlockedWriteIntentLock(): Boolean = getThreadState().writeIntentReleased
private fun measureWriteLock(acquisitor: () -> WritePermit) : WritePermit {
val delay = ApplicationImpl.Holder.ourDumpThreadsOnLongWriteActionWaiting
@@ -678,7 +719,7 @@ internal object AnyThreadWriteThreadingSupport: ThreadingSupport {
@Deprecated
private class WriteAccessToken(private val clazz: Class<*>) : AccessToken() {
val ts = myState.get()
val ts = getThreadState()
val release = startWrite(ts, clazz)
init {

View File

@@ -48,6 +48,7 @@ import com.intellij.util.messages.Topic;
import com.intellij.util.ui.EDT;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function0;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.GlobalScope;
@@ -1207,6 +1208,16 @@ public final class ApplicationImpl extends ClientAwareComponentManager implement
otelMonitor.get().writeActionExecuted();
}
@Override
public CoroutineContext getLockStateAsCoroutineContext() {
return getThreadingSupport().getPermitAsContextElement();
}
@Override
public boolean hasLockStateInContext(CoroutineContext context) {
return getThreadingSupport().hasPermitAsContextElement(context);
}
@NotNull
private static ThreadingSupport getThreadingSupport() {
return AnyThreadWriteThreadingSupport.INSTANCE;

View File

@@ -7,6 +7,7 @@ import com.intellij.openapi.application.ModalityState
import com.intellij.openapi.application.ReadAction.CannotReadException
import com.intellij.openapi.application.ReadConstraint
import com.intellij.openapi.application.ex.ApplicationEx
import com.intellij.openapi.application.isLockStoredInContext
import com.intellij.openapi.progress.blockingContext
import kotlinx.coroutines.*
import kotlin.coroutines.coroutineContext
@@ -36,8 +37,17 @@ internal class InternalReadAction<T>(
}
}
else {
if (isLockStoredInContext && application.isReadAccessAllowed && !application.isWriteIntentLockAcquired) {
val unsatisfiedConstraint = findUnsatisfiedConstraint()
check(unsatisfiedConstraint == null) {
"Cannot suspend until constraints are satisfied while holding the read lock: $unsatisfiedConstraint"
}
return withContext(Dispatchers.Default) {
action()
}
}
withContext(Dispatchers.Default) {
check(!application.isReadAccessAllowed) {
check(isLockStoredInContext || !application.isReadAccessAllowed) {
"This thread unexpectedly holds the read lock"
}
readLoop()

View File

@@ -0,0 +1,159 @@
// Copyright 2000-2024 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.openapi.application.impl
import com.intellij.openapi.application.*
import com.intellij.openapi.progress.runBlockingCancellable
import com.intellij.openapi.util.Disposer
import com.intellij.testFramework.common.timeoutRunBlocking
import com.intellij.testFramework.junit5.TestApplication
import com.intellij.util.concurrency.ImplicitBlockingContextTest
import com.intellij.util.concurrency.Semaphore
import com.intellij.util.io.await
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.RepeatedTest
import org.junit.jupiter.api.extension.ExtendWith
import kotlin.test.assertFalse
private const val REPETITIONS: Int = 100
@TestApplication
@ExtendWith(ImplicitBlockingContextTest.Enabler::class)
class ReadWritePropagationTest {
private fun checkInheritanceViaStructureConcurrency(wrapper: suspend (() -> Unit) -> Unit, checker: () -> Boolean): Unit = timeoutRunBlocking {
wrapper {
assertTrue(checker())
runBlockingCancellable {
assertTrue(checker())
launch {
assertTrue(checker())
}
launch {
assertTrue(checker())
}
assertTrue(checker())
}
assertTrue(checker())
}
}
@RepeatedTest(REPETITIONS)
fun `read action is inherited by structured concurrency`() {
checkInheritanceViaStructureConcurrency(::readAction, { ApplicationManager.getApplication().isReadAccessAllowed })
}
@RepeatedTest(REPETITIONS)
fun `write intent read action is inherited by structured concurrency`() {
checkInheritanceViaStructureConcurrency(::writeIntentReadAction, { ApplicationManager.getApplication().isWriteIntentLockAcquired })
}
@RepeatedTest(REPETITIONS)
fun `write action is inherited by structured concurrency`() {
checkInheritanceViaStructureConcurrency(::writeAction, { ApplicationManager.getApplication().isWriteAccessAllowed })
}
private fun checkInheritanceViaNewContext(wrapper: suspend (() -> Unit) -> Unit, checker: () -> Boolean): Unit = timeoutRunBlocking {
wrapper {
assertTrue(checker())
runBlockingCancellable {
assertTrue(checker())
launch(Dispatchers.Default) {
assertTrue(checker())
}
launch(Dispatchers.IO) {
assertTrue(checker())
}
assertTrue(checker())
}
assertTrue(checker())
}
}
@RepeatedTest(REPETITIONS)
fun `read action is inherited by new context`() {
checkInheritanceViaNewContext(::readAction, { ApplicationManager.getApplication().isReadAccessAllowed })
}
@RepeatedTest(REPETITIONS)
fun `write intent read action is inherited by new context`() {
checkInheritanceViaNewContext(::writeIntentReadAction, { ApplicationManager.getApplication().isWriteIntentLockAcquired })
}
private fun checkNoInheritanceViaNonStructuredConcurrency(wrapper: suspend (() -> Unit) -> Unit, checker: () -> Boolean): Unit = timeoutRunBlocking {
wrapper {
assertTrue(checker())
runBlockingCancellable {
assertTrue(checker())
// No scope at all, so no lock
ApplicationManager.getApplication().executeOnPooledThread {
assertFalse(checker())
}.await()
assertTrue(checker())
}
// Scope from timeoutRunBlocking (outside RA), so no RA even if syntactically in RA
launch {
assertFalse(checker())
}
assertTrue(checker())
}
}
@RepeatedTest(REPETITIONS)
fun `read action is not inherited by non-structured concurrency`() {
checkNoInheritanceViaNonStructuredConcurrency(::readAction, { ApplicationManager.getApplication().isReadAccessAllowed })
}
@RepeatedTest(REPETITIONS)
fun `write intent read action is not inherited by non-structured concurrency`() {
checkNoInheritanceViaNonStructuredConcurrency(::writeIntentReadAction, { ApplicationManager.getApplication().isWriteIntentLockAcquired })
}
@RepeatedTest(REPETITIONS)
fun `write action is not inherited by non-structured concurrency`() {
checkNoInheritanceViaNonStructuredConcurrency(::writeAction, { ApplicationManager.getApplication().isWriteAccessAllowed })
}
@RepeatedTest(REPETITIONS)
fun `nested read action can be run even if write is waiting`(): Unit = timeoutRunBlocking {
val sema = beforeWrite()
val ra = launch(Dispatchers.Default) {
ApplicationManager.getApplication().runReadAction {
runBlockingCancellable {
withContext(Dispatchers.Default) {
sema.waitFor()
ApplicationManager.getApplication().runReadAction {
assertTrue(ApplicationManager.getApplication().isReadAccessAllowed)
}
}
}
}
}
val wa = launch(Dispatchers.Default) {
assertFalse(ApplicationManager.getApplication().isReadAccessAllowed)
ApplicationManager.getApplication().invokeAndWait {
ApplicationManager.getApplication().runWriteAction {
assertTrue(ApplicationManager.getApplication().isWriteIntentLockAcquired)
}
}
}
joinAll(ra, wa)
}
}
private fun beforeWrite(): Semaphore {
val beforeWrite = Semaphore(1)
val listenerDisposable = Disposer.newDisposable()
ApplicationManager.getApplication().addApplicationListener(object : ApplicationListener {
override fun beforeWriteActionStart(action: Any) {
beforeWrite.up()
Disposer.dispose(listenerDisposable)
}
}, listenerDisposable)
return beforeWrite
}

View File

@@ -36,6 +36,12 @@ abstract class SuspendingReadActionTest : CancellableReadActionTests() {
application.assertReadAccessNotAllowed()
}
fun assertNestedContext(job: Job) {
assertEquals(job, Cancellation.currentJob())
assertNull(ProgressManager.getGlobalProgressIndicator())
application.assertReadAccessAllowed()
}
fun assertReadActionWithCurrentJob() {
assertNotNull(Cancellation.currentJob())
assertNull(ProgressManager.getGlobalProgressIndicator())
@@ -56,7 +62,12 @@ abstract class SuspendingReadActionTest : CancellableReadActionTests() {
val suspendingJob = Cancellation.currentJob()!!
assertReadActionWithoutCurrentJob(suspendingJob) // TODO consider explicitly turning off RA inside runBlockingCancellable
withContext(Dispatchers.Default) {
assertEmptyContext(coroutineContext.job)
if (isLockStoredInContext) {
assertNestedContext(coroutineContext.job)
}
else {
assertEmptyContext(coroutineContext.job)
}
}
assertReadActionWithoutCurrentJob(suspendingJob)
}

View File

@@ -69,6 +69,14 @@ class SuspendingReadAndWriteActionTest {
Assertions.assertFalse(application.isReadAccessAllowed)
}
fun assertNestedContext() {
Assertions.assertFalse(EDT.isCurrentThreadEdt())
Assertions.assertNotNull(Cancellation.currentJob())
Assertions.assertNull(ProgressManager.getGlobalProgressIndicator())
Assertions.assertFalse(application.isWriteAccessAllowed)
Assertions.assertTrue(application.isReadAccessAllowed)
}
fun assertReadButNoWriteActionWithCurrentJob() {
Assertions.assertFalse(EDT.isCurrentThreadEdt())
Assertions.assertNotNull(Cancellation.currentJob())
@@ -106,7 +114,7 @@ class SuspendingReadAndWriteActionTest {
runBlockingCancellable {
assertReadButNoWriteActionWithoutCurrentJob() // TODO consider explicitly turning off RA inside runBlockingCancellable
withContext(Dispatchers.Default) {
assertEmptyContext()
assertNestedContext()
}
assertReadButNoWriteActionWithoutCurrentJob()
}
@@ -116,7 +124,7 @@ class SuspendingReadAndWriteActionTest {
runBlockingCancellable {
assertWriteActionWithoutCurrentJob() // TODO consider explicitly turning off RA inside runBlockingCancellable
withContext(Dispatchers.Default) {
assertEmptyContext()
assertNestedContext()
}
assertWriteActionWithoutCurrentJob()
}