diff --git a/fleet/rpc.server/src/fleet/rpc/server/DirectRpcClient.kt b/fleet/rpc.server/src/fleet/rpc/server/DirectRpcClient.kt index d51a9ce23425..839a07965965 100644 --- a/fleet/rpc.server/src/fleet/rpc/server/DirectRpcClient.kt +++ b/fleet/rpc.server/src/fleet/rpc/server/DirectRpcClient.kt @@ -8,40 +8,45 @@ import fleet.rpc.client.rpcClient import fleet.rpc.core.Serialization import fleet.rpc.core.Transport import fleet.rpc.core.TransportMessage -import fleet.tracing.spannedScope import fleet.util.UID -import fleet.util.async.async -import fleet.util.async.resource -import fleet.util.async.use +import fleet.util.async.* import fleet.util.channels.channels import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -suspend fun RequestDispatcher.withDirectRpcClient(serialization: () -> Serialization, - interceptor: RpcInterceptor, - body: suspend CoroutineScope.(IRpcClient) -> Unit) { - spannedScope("withDirectRpcClient") { +fun RequestDispatcher.directRpcClient( + serialization: () -> Serialization, + interceptor: RpcInterceptor, +): Resource = + resource { cc -> val (dispatcherSend, clientReceive) = channels(Channel.BUFFERED) val (clientSend, dispatcherReceive) = channels(Channel.BUFFERED) val origin = UID.random() - resource { continuation -> - launch { - handleConnection(route = origin, - endpoint = EndpointKind.Client, - send = dispatcherSend, - receive = dispatcherReceive, - presentableName = "directRpcClient") - }.use { - rpcClient(transport = Transport(outgoing = clientSend, incoming = clientReceive), - serialization = serialization, - origin = origin, - requestInterceptor = interceptor) { rpcClient -> - continuation(rpcClient) - } + launch { + handleConnection(route = origin, + endpoint = EndpointKind.Client, + send = dispatcherSend, + receive = dispatcherReceive, + presentableName = "directRpcClient") + }.use { + rpcClient(transport = Transport(outgoing = clientSend, incoming = clientReceive), + serialization = serialization, + origin = origin, + requestInterceptor = interceptor) { rpcClient -> + cc(rpcClient) } - }.async().use { rpcClientDeferred -> + } + }.span("directRpcClient") + +suspend fun RequestDispatcher.withDirectRpcClient( + serialization: () -> Serialization, + interceptor: RpcInterceptor, + body: suspend CoroutineScope.(IRpcClient) -> Unit, +) { + directRpcClient(serialization, interceptor) + .async() + .use { rpcClientDeferred -> body(promisingRpcClient(rpcClientDeferred)) } - } } diff --git a/fleet/rpc.server/src/fleet/rpc/server/FleetService.kt b/fleet/rpc.server/src/fleet/rpc/server/FleetService.kt index 1a2a2b98ca59..58c971a30b53 100644 --- a/fleet/rpc.server/src/fleet/rpc/server/FleetService.kt +++ b/fleet/rpc.server/src/fleet/rpc/server/FleetService.kt @@ -16,7 +16,6 @@ class FleetService private constructor(val serviceId: UID, job.join() } - //@fleet.kernel.plugins.InternalInPluginModules(where = ["fleet.app.fleet.tests"]) suspend fun terminate(cause: String) { terminate(CancellationException(cause)) } @@ -50,7 +49,7 @@ class FleetService private constructor(val serviceId: UID, } } }.use { serviceJob -> - body(FleetService(providerId, serviceJob)) + body(FleetService(serviceId = providerId, job = serviceJob)) } } } diff --git a/fleet/rpc.server/src/fleet/rpc/server/RpcExecutor.kt b/fleet/rpc.server/src/fleet/rpc/server/RpcExecutor.kt index e819dc5aaeae..39202afd886b 100644 --- a/fleet/rpc.server/src/fleet/rpc/server/RpcExecutor.kt +++ b/fleet/rpc.server/src/fleet/rpc/server/RpcExecutor.kt @@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.coroutineContext -//@fleet.kernel.plugins.InternalInPluginModules(where = ["fleet.app.fleet.tests"]) class RpcExecutor private constructor(private val serialization: () -> Serialization, private val services: RpcServiceLocator, private val route: UID,