Class DefaultKafkaListenerExceptionHandler
java.lang.Object
io.micronaut.configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler
- All Implemented Interfaces:
KafkaListenerExceptionHandler
,io.micronaut.core.exceptions.ExceptionHandler<KafkaListenerException>
@Singleton
@Primary
public class DefaultKafkaListenerExceptionHandler
extends Object
implements KafkaListenerExceptionHandler
The default ExceptionHandler used when a
KafkaConsumer
fails to process a ConsumerRecord
. By default just logs the error.- Since:
- 1.0
- Author:
- graemerocher
-
Constructor Summary
ConstructorDescriptionDeprecated, for removal: This API element is subject to removal in a future version.Creates a new instance. -
Method Summary
Modifier and TypeMethodDescriptionvoid
handle
(KafkaListenerException exception) protected void
seekPastDeserializationError
(@NonNull SerializationException cause, @NonNull Object consumerBean, @NonNull Consumer<?, ?> kafkaConsumer) Seeks past a serialization exception if an error occurs.void
setCommitRecordOnDeserializationFailure
(boolean commitRecordOnDeserializationFailure) Sets whether to commit the offset of past records that are not deserializable and are skipped.void
setSkipRecordOnDeserializationFailure
(boolean skipRecordOnDeserializationFailure) Sets whether the seek past records that are not deserializable.
-
Constructor Details
-
DefaultKafkaListenerExceptionHandler
@Inject public DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration config) Creates a new instance.- Parameters:
config
- The default Kafka listener exception handler configuration
-
DefaultKafkaListenerExceptionHandler
Deprecated, for removal: This API element is subject to removal in a future version.
-
-
Method Details
-
handle
- Specified by:
handle
in interfaceio.micronaut.core.exceptions.ExceptionHandler<KafkaListenerException>
-
setSkipRecordOnDeserializationFailure
public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializationFailure) Sets whether the seek past records that are not deserializable.- Parameters:
skipRecordOnDeserializationFailure
- True if records that are not deserializable should be skipped.
-
setCommitRecordOnDeserializationFailure
public void setCommitRecordOnDeserializationFailure(boolean commitRecordOnDeserializationFailure) Sets whether to commit the offset of past records that are not deserializable and are skipped.- Parameters:
commitRecordOnDeserializationFailure
- True if the offset for records that are not deserializable should be committed after being skipped.
-
seekPastDeserializationError
protected void seekPastDeserializationError(@NonNull @NonNull SerializationException cause, @NonNull @NonNull Object consumerBean, @NonNull @NonNull Consumer<?, ?> kafkaConsumer) Seeks past a serialization exception if an error occurs. Additionally commits the offset if commitRecordOnDeserializationFailure is set- Parameters:
cause
- The causeconsumerBean
- The consumer beankafkaConsumer
- The kafka consumer
-
DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration)