|
For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
|
For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:
-
Use
ncontainers withconcurrency=1with a prototype scopedMessageListenerbean so that each container gets its own instance (this is not possible when using@KafkaListener). -
Keep the state in
ThreadLocal<?>instances. -
Have the singleton listener delegate to a bean that is declared in
SimpleThreadScope(or a similar scope).
To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits.
You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope.
Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.
| By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. |
| By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. |