[fleet] move http ktor engine related code to rpc.server.ktor

GitOrigin-RevId: 09aee497f8c5f532380918456c3a2a1a0f0e13cb
This commit is contained in:
Alexander Shparun
2024-09-23 04:00:12 +02:00
committed by intellij-monorepo-bot
parent 7968a25171
commit ca98770be4
3 changed files with 30 additions and 27 deletions

View File

@@ -8,24 +8,21 @@ import fleet.rpc.client.rpcClient
import fleet.rpc.core.Serialization import fleet.rpc.core.Serialization
import fleet.rpc.core.Transport import fleet.rpc.core.Transport
import fleet.rpc.core.TransportMessage import fleet.rpc.core.TransportMessage
import fleet.tracing.spannedScope
import fleet.util.UID import fleet.util.UID
import fleet.util.async.async import fleet.util.async.*
import fleet.util.async.resource
import fleet.util.async.use
import fleet.util.channels.channels import fleet.util.channels.channels
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
suspend fun RequestDispatcher.withDirectRpcClient(serialization: () -> Serialization, fun RequestDispatcher.directRpcClient(
serialization: () -> Serialization,
interceptor: RpcInterceptor, interceptor: RpcInterceptor,
body: suspend CoroutineScope.(IRpcClient) -> Unit) { ): Resource<IRpcClient> =
spannedScope("withDirectRpcClient") { resource { cc ->
val (dispatcherSend, clientReceive) = channels<TransportMessage>(Channel.BUFFERED) val (dispatcherSend, clientReceive) = channels<TransportMessage>(Channel.BUFFERED)
val (clientSend, dispatcherReceive) = channels<TransportMessage>(Channel.BUFFERED) val (clientSend, dispatcherReceive) = channels<TransportMessage>(Channel.BUFFERED)
val origin = UID.random() val origin = UID.random()
resource<IRpcClient> { continuation ->
launch { launch {
handleConnection(route = origin, handleConnection(route = origin,
endpoint = EndpointKind.Client, endpoint = EndpointKind.Client,
@@ -37,11 +34,19 @@ suspend fun RequestDispatcher.withDirectRpcClient(serialization: () -> Serializa
serialization = serialization, serialization = serialization,
origin = origin, origin = origin,
requestInterceptor = interceptor) { rpcClient -> requestInterceptor = interceptor) { rpcClient ->
continuation(rpcClient) cc(rpcClient)
} }
} }
}.async().use { rpcClientDeferred -> }.span("directRpcClient")
suspend fun RequestDispatcher.withDirectRpcClient(
serialization: () -> Serialization,
interceptor: RpcInterceptor,
body: suspend CoroutineScope.(IRpcClient) -> Unit,
) {
directRpcClient(serialization, interceptor)
.async()
.use { rpcClientDeferred ->
body(promisingRpcClient(rpcClientDeferred)) body(promisingRpcClient(rpcClientDeferred))
} }
} }
}

View File

@@ -16,7 +16,6 @@ class FleetService private constructor(val serviceId: UID,
job.join() job.join()
} }
//@fleet.kernel.plugins.InternalInPluginModules(where = ["fleet.app.fleet.tests"])
suspend fun terminate(cause: String) { suspend fun terminate(cause: String) {
terminate(CancellationException(cause)) terminate(CancellationException(cause))
} }
@@ -50,7 +49,7 @@ class FleetService private constructor(val serviceId: UID,
} }
} }
}.use { serviceJob -> }.use { serviceJob ->
body(FleetService(providerId, serviceJob)) body(FleetService(serviceId = providerId, job = serviceJob))
} }
} }
} }

View File

@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
//@fleet.kernel.plugins.InternalInPluginModules(where = ["fleet.app.fleet.tests"])
class RpcExecutor private constructor(private val serialization: () -> Serialization, class RpcExecutor private constructor(private val serialization: () -> Serialization,
private val services: RpcServiceLocator, private val services: RpcServiceLocator,
private val route: UID, private val route: UID,