Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
207 views
in Technique[技术] by (71.8m points)

How to shut down Spring Boot Kafka stream-processing app after current message

In a Spring Boot Kafka stream-processing app using Spring Cloud Function, how can I shut down the application (to stop it receiving any further messages) but only after responding to the current message?

The scenario is this: we have an app that processes a message which contains a reference to some file. We handle this by interrogating the given file's content using some third party libraries and responding with some data we extract from the file. In some rare cases certain files can cause the external libraries to hang. So we call these libraries on a background thread and time out if it takes too long. We need to respond to Kafka for this message (with some JSON detailing the error) so that Kafka doesn't send it to any other instance of our app (as it'll probably cause that instance to hang as well). But we then want this instance of our Spring Boot app to shut down, as we can't cleanly recover from the hang in the 3rd party library (we could get resource or memory leaks otherwise). Another instance will then be automatically brought up by Kubernetes or Docker Swarm or whatever.

I'm not sure if it's relevant, but we're using Spring Cloud Function, and we're joining two streams: the "file" stream, where each message contains a reference to the file to process, and a GlobalKTable with some config info. The code looks like this (in Kotlin):

    // The service we use to run the process on a background thread
    private val executorService = Executors.newFixedThreadPool(1)

    @Bean
    fun process() = BiFunction<KStream<String, FileInfo>,
        GlobalKTable<String, Config>,
        KStream<String, FileInterrogationResult>> { fileStream, configTable ->

        requestStream.join(
            configTable,
            { _, file -> file.configId },
            { file, config ->
                try {
                    // Process the file using the 3rd party libraries.
                    val result = executorService.submit(theThirdPartyLibExtractionFunction)
                        .get(someTimeout, TimeUnit.MILLISECONDS)

                    // Success: return some FileInterrogationResult object wrapping the result from above.
                } catch (e: TimeoutException) {
                    // Return some FileInterrogationResult object with error details.
                    // TODO: Here we know that after this function completes the app should shut down. How do we do this?
                } catch (e: Throwable) {
                    // Return some FileInterrogationResult object with error details.
                }
            }
        )
question from:https://stackoverflow.com/questions/65897973/how-to-shut-down-spring-boot-kafka-stream-processing-app-after-current-message

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use Actuator endpoints to stop/start/pause/resume as well as monitor individual bindings. Once enabled you can simply use curl to manage the bindings. For example

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName

The exact instruction on how to do it available here


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...