• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

marcoferrer/kroto-plus: gRPC Kotlin Coroutines, Protobuf DSL, Scripting for Prot ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称(OpenSource Name):

marcoferrer/kroto-plus

开源软件地址(OpenSource Url):

https://github.com/marcoferrer/kroto-plus

开源编程语言(OpenSource Language):

Kotlin 100.0%

开源软件介绍(OpenSource Introduction):

Kroto+

gRPC Kotlin Coroutines, Protobuf DSL, Scripting for Protoc

Build Status GitHub license JCenter Maven Central Awesome Kotlin Badge Awesome gRPC Slack

Community Contributions are Welcomed

ℹ️ | Docs are being expanded and moved to Readme.io

Quick Start: gRPC Coroutines

Run the following command to get started with a preconfigured template project. (kotlin-coroutines-gRPC-template)

git clone https://github.com/marcoferrer/kotlin-coroutines-gRPC-template && \
cd kotlin-coroutines-gRPC-template && \
./gradlew run 

Getting Started

Code Generators


Proto Builder Generator (Message DSL)

Setup & Documentation

This generator creates lambda based builders for message types

     val starPlatinum = Stand {
        name = "Star Platinum"
        powerLevel = 500
        speed = 550
        attack {
            name = "ORA ORA ORA"
            damage = 100
            range = Attack.Range.CLOSE
        }
    }
    
    val attack = Attack {
        name = "ORA ORA ORA"
        damage = 100
        range = Attack.Range.CLOSE
    }
    
    // Copy extensions
    val newAttack = attack.copy { damage = 200 }            
    
    // orDefault() will return the messages default instance when null
    val nullableAttack: Attack? = null
    nullableAttack.orDefault()
    
    // Plus operator extensions 
    val mergedAttack = attack + Attack { name = "Sunlight Yellow Overdrive" }            

gRPC Coroutines Client & Server    codecov

This option requires the artifact kroto-plus-coroutines as a dependency.

Configuration Options

Client / Server Examples
Method Signature Option Support

  • Design
  • Client Stubs
    • Designed to work well with Structured Concurrency
    • Cancellation of the client CoroutineScope will propagate to the server.
    • Cancellations can now be propagated across usages of a specific stub instance.
    • Rpc methods are overloaded with inline builders for request types
    • The request parameter for rpc methods defaults to RequestType.defaultsInstance
// Creates new stub with a default coroutine context of `EmptyCoroutineContext`
val stub = GreeterCoroutineGrpc.newStub(channel)

// Suspends and creates new stub using the current coroutine context as the default. 
val stub = GreeterCoroutineGrpc.newStubWithContext(channel)

// An existing stub can replace its current coroutine context using either 
stub.withCoroutineContext()
stub.withCoroutineContext(coroutineContext)

// Stubs can accept message builder lambdas as an argument  
stub.sayHello { name = "John" }

// For more idiomatic usage in coroutines, stubs can be created
// with an explicit coroutine scope using the `newGrpcStub` scope extension function.
launch {
    // Using `newGrpcStub` makes it clear that the resulting stub will use the receiving 
    // coroutine scope to launch any concurrent work. (usually for manual flow control in streaming apis) 
    val stub = newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel)
    
    val (requestChannel, responseChannel) = stub.sayHelloStreaming()
    ...
}

  • Service Base Impl
    • Rpc calls are wrapped within a scope initialized with the following context elements.
      • CoroutineName set to MethodDescriptor.fullMethodName
      • GrpcContextElement set to io.grpc.Context.current()
    • Base services implement ServiceScope and allow overriding the initial coroutineContext used for each rpc method invocation.
    • Each services initialContext defaults to EmptyCoroutineContext
    • A common case for overriding the initialContext is for setting up application specific ThreadContextElement or CoroutineDispatcher, such as MDCContext() or newFixedThreadPoolContext(...)

Cancellation Propagation

  • Client
    • Both normal and exceptional coroutine scope cancellation will cancel the underlying call stream. See ClientCall.cancel() in io.grpc.ClientCall.java for more details.
    • In the case of service implementations using coroutines, this client call stream cancellation will cancel the coroutine scope of the rpc method being invoked on the server.
  • Server
    • Exceptional cancellation of the coroutine scope for the rpc method will be mapped to an instance of StatusRuntimeException and returned to the client.
    • Normal cancellation of the coroutine scope for the rpc method will be mapped to an instance of StatusRuntimeException with a status of Status.CANCELLED, and returned to the client.
    • Cancellation signals from the corresponding client will cancel the coroutine scope of the rpc method being invoked.

Examples

Unary

Client: Unary calls will suspend until a response is received from the corresponding server. In the event of a cancellation or the server responds with an error the call will throw the appropriate StatusRuntimeException

val response = stub.sayHello { name = "John" }

Server: Unary rpc methods can respond to client requests by either returning the expected response type, or throwing an exception.

override suspend fun sayHello(request: HelloRequest): HelloReply {

    if (isValid(request.name))
        return HelloReply { message = "Hello there, ${request.name}!" } else
        throw Status.INVALID_ARGUMENT.asRuntimeException()
}

Client Streaming

Client: requestChannel.send() will suspend until the corresponding server signals it is ready by requesting a message. In the event of a cancellation or the server responds with an error, both requestChannel.send() and response.await(), will throw the appropriate StatusRuntimeException.

val (requestChannel, response) = stub.sayHelloClientStreaming()

launchProducerJob(requestChannel){
    repeat(5){
        send { name = "name #$it" }
    }
}

println("Client Streaming Response: ${response.await()}")

Server: Client streaming rpc methods can respond to client requests by either returning the expected response type, or throwing an exception. Calls to requestChannel.receive() will suspend and notify the corresponding client that the server is ready to accept a message.

override suspend fun sayHelloClientStreaming(
    requestChannel: ReceiveChannel<HelloRequest>
): HelloReply =  HelloReply {
    message = requestChannel.toList().joinToString()
}

Server Streaming

Client: responseChannel.receive() will suspend and notify the corresponding server that the client is ready to accept a message.

val responseChannel = stub.sayHelloServerStreaming { name = "John" }

responseChannel.consumeEach {
    println("Server Streaming Response: $it")
}

Server: Server streaming rpc methods can respond to client requests by submitting messages of the expected response type to the response channel. Completion of service method implementations will automatically close response channels in order to prevent abandoned rpcs.

Calls to responseChannel.send() will suspend until the corresponding client signals it is ready by requesting a message. Error responses can be returned to clients by either throwing an exception or invoking close on responseChannel with the desired exception.

For an example of how to implement long lived response streams please reference MultipleClientSubscriptionsExample.kt.

override suspend fun sayHelloServerStreaming(
    request: HelloRequest,
    responseChannel: SendChannel<HelloReply>
) {        
    for(char in request.name) {
        responseChannel.send {
            message = "Hello $char!"
        }
    }
}

Bi-Directional Streaming

Client: requestChannel.send() will suspend until the corresponding server signals it is ready by requesting a message. In the event of a cancellation or the server responds with an error, both requestChannel.send() and response.await(), will throw the appropriate StatusRuntimeException.

val (requestChannel, responseChannel) = stub.sayHelloStreaming()

launchProducerJob(requestChannel){
    repeat(5){
        send { name = "person #$it" }
    }
}

responseChannel.consumeEach {
    println("Bidi Response: $it")
}

Server: Bidi streaming rpc methods can respond to client requests by submitting messages of the expected response type to the response channel. Completion of service method implementations will automatically close response channels in order to prevent abandoned rpcs.

Calls to responseChannel.send() will suspend until the corresponding client signals it is ready by requesting a message. Error responses can be returned to clients by either throwing an exception or invoking close on responseChannel with the desired exception.

For an example of how to implement long lived response streams please reference MultipleClientSubscriptionsExample.kt.

override suspend fun sayHelloStreaming(
    requestChannel: ReceiveChannel<HelloRequest>,
    responseChannel: SendChannel<HelloReply>
) {
    requestChannel.mapTo(responseChannel){
    
        HelloReply {
            message = "Hello there, ${it.name}!"
        }
    }
}

gRPC Stub Extensions

Configuration Options

This modules generates convenience extensions that overload the request message argument for rpc methods with a builder lambda block and a default value. It also supports generating overloads based off (google.api.method_signature) method options. More info available here

               
    //Kroto+ Generated Extension
    val response = serviceStub.myRpcMethod {
         id = 100
         name = "some name"
    }

    //Original Java Fluent builders
    val response = serviceStub.myRpcMethod(ExampleServiceGrpc.MyRpcMethodRequest
        .newBuilder()
        .setId(100)
        .setName("some name")
        .build())                  

For unary rpc methods, the generator will create the following extensions

    //Future Stub with default argument
    fun ServiceBlockingStub.myRpcMethod(request: Request = Request.defaultInstance): ListenableFuture<Response>
    
    //Future Stub with builder lambda 
    inline fun ServiceFutureStub.myRpcMethod(block: Request.Builder.() -> Unit): ListenableFuture<Response>
        
    //Blocking Stub with default argument
    fun ServiceBlockingStub.myRpcMethod(request: Request = Request.defaultInstance): Response
    
    //Blocking Stub with builder lambda
    inline fun ServiceBlockingStub.myRpcMethod(block: Request.Builder.() -> Unit): Response 

Coroutine Support

In addition to request message arguments as builder lambda rpc overloads, coroutine overloads for rpc calls can also be generated. This provides the same functionality as the generated coroutine stubs. Usage is identical to the client examples outlined in Coroutine Client Examples.

  • This is accomplished by defining extension functions for async service stubs.
  • This option requires the artifact kroto-plus-coroutines as a dependency.
  • If using rpc interceptors or other code that relies on io.grpc.Context then you need to be sure to add a GrpcContextElement to your CoroutineContext when launching a coroutine. Child coroutines will inherit this ThreadContextElement and the dispatcher will ensure that your grpc context is present on the executing thread.
    Context.current().withValue(MY_KEY, myValue).attach()
    
    val myGrpcContext = Context.current()
    
    val job = launch( GrpcContextElement() ) { //Alternate usage:  myGrpcContext.asContextElement() 
       
        launch {
            assertEquals(myGrpcContext, Context.current())
        }
       
        GlobalScope.launch{
            assertNotEquals(myGrpcContext, Context.current())
        } 
    }

Mock Service Generator

Configuration Options

This generator creates mock implementations of proto service definitions. This is useful for orchestrating a set of expected responses, aiding in unit testing methods that rely on rpc calls. Full example for mocking services in unit tests. The code generated relies on the kroto-plus-test artifact as a dependency. It is a small library that provides utility methods used by the mock services.

  • If no responses are added to the response queue then the mock service will return the default instance of the response type.
  • Currently only unary methods are being mocked, with support for other method types on the way
@Test fun `Test Unary Response Queue`(){
    
    MockStandService.getStandByNameResponseQueue.apply {
       //Queue up a valid response message
       addMessage {
           name = "Star Platinum"
           powerLevel = 500
           speed = 550
           addAttacks {
               name = "ORA ORA ORA"
               damage = 100
               range = StandProto.Attack.Range.CLOSE
           }
       }   
          
       //Queue up an error
       addError(Status.INVALID_ARGUMENT)
   }
   
   val standStub = StandServiceGrpc.newBlockingStub(grpcServerRule.channel)
   
   standStub.getStandByName { name = "Star Platinum" }.let{ response ->
       assertEquals("Star Platinum",response.name)
       assertEquals(500,response.powerLevel)
       assertEquals(550,response.speed)
       response.attacksList.first().let{ attack ->
           assertEquals("ORA ORA ORA",attack.name)
           assertEquals(100,attack.damage)
           assertEquals(StandProto.Attack.Range.CLOSE,attack.range)
       }
   }
   
   try{
       standStub.getStandByName { name = "The World" }
       fail("Exception was expected with status code: ${Status.INVALID_ARGUMENT.code}")
   }catch (e: StatusRuntimeException){
       assertEquals(Status.INVALID_ARGUMENT.code, e.status.code)
   }
}

Extendable Messages Generator (Experimental)

Configuration Options

Generated code relies on the kroto-plus-message artifact. This generator adds tagging interfaces to the java classes produce by protoc. It also adds pseudo companion objects to provide a way to access proto message APIs in a non static manner. The following is a small example of how to write generic methods and extensions that resolve both message and builders type.

inline fun <reified M, B> M.copy( block: B.() -> Unit ): M
        where M : KpMessage<M, B>, B : KpBuilder<M> {
    return this.toBuilder.apply(block).build()
}

// Usage
myMessage.copy { ... }

inline fun <reified M, B> build( block: B.() -> Unit ): M
        where M : KpMessage<M, B>, B : KpBuilder<M> {

    return KpCompanion.Registry[M::class.java].build(block)
}

// Usage
build<MyMessage> { ... }

inline fun <M, B> KpCompanion<M, B>.build( block: B.() -> Unit ): M
        where B : KpBuilder<M>,M : KpMessage< 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap