In a Spring Boot Kafka stream-processing app using Spring Cloud Function, how can I shut down the application (to stop it receiving any further messages) but only after responding to the current message?
The scenario is this: we have an app that processes a message which contains a reference to some file. We handle this by interrogating the given file's content using some third party libraries and responding with some data we extract from the file. In some rare cases certain files can cause the external libraries to hang. So we call these libraries on a background thread and time out if it takes too long. We need to respond to Kafka for this message (with some JSON detailing the error) so that Kafka doesn't send it to any other instance of our app (as it'll probably cause that instance to hang as well). But we then want this instance of our Spring Boot app to shut down, as we can't cleanly recover from the hang in the 3rd party library (we could get resource or memory leaks otherwise). Another instance will then be automatically brought up by Kubernetes or Docker Swarm or whatever.
I'm not sure if it's relevant, but we're using Spring Cloud Function, and we're joining two streams: the "file" stream, where each message contains a reference to the file to process, and a GlobalKTable with some config info. The code looks like this (in Kotlin):
// The service we use to run the process on a background thread
private val executorService = Executors.newFixedThreadPool(1)
@Bean
fun process() = BiFunction<KStream<String, FileInfo>,
GlobalKTable<String, Config>,
KStream<String, FileInterrogationResult>> { fileStream, configTable ->
requestStream.join(
configTable,
{ _, file -> file.configId },
{ file, config ->
try {
// Process the file using the 3rd party libraries.
val result = executorService.submit(theThirdPartyLibExtractionFunction)
.get(someTimeout, TimeUnit.MILLISECONDS)
// Success: return some FileInterrogationResult object wrapping the result from above.
} catch (e: TimeoutException) {
// Return some FileInterrogationResult object with error details.
// TODO: Here we know that after this function completes the app should shut down. How do we do this?
} catch (e: Throwable) {
// Return some FileInterrogationResult object with error details.
}
}
)
question from:
https://stackoverflow.com/questions/65897973/how-to-shut-down-spring-boot-kafka-stream-processing-app-after-current-message 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…