Class DefaultKafkaListenerExceptionHandler
java.lang.Object
io.micronaut.configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler
- All Implemented Interfaces:
KafkaListenerExceptionHandler,io.micronaut.core.exceptions.ExceptionHandler<KafkaListenerException>
@Singleton
@Primary
public class DefaultKafkaListenerExceptionHandler
extends Object
implements KafkaListenerExceptionHandler
The default ExceptionHandler used when a
KafkaConsumer
fails to process a ConsumerRecord. By default just logs the error.- Since:
- 1.0
- Author:
- graemerocher
-
Constructor Summary
ConstructorsConstructorDescriptionDeprecated, for removal: This API element is subject to removal in a future version.Creates a new instance. -
Method Summary
Modifier and TypeMethodDescriptionvoidhandle(KafkaListenerException exception) protected voidseekPastDeserializationError(@NonNull SerializationException cause, @NonNull Object consumerBean, @NonNull Consumer<?, ?> kafkaConsumer) Seeks past a serialization exception if an error occurs.voidsetCommitRecordOnDeserializationFailure(boolean commitRecordOnDeserializationFailure) Sets whether to commit the offset of past records that are not deserializable and are skipped.voidsetSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializationFailure) Sets whether the seek past records that are not deserializable.
-
Constructor Details
-
DefaultKafkaListenerExceptionHandler
@Inject public DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration config) Creates a new instance.- Parameters:
config- The default Kafka listener exception handler configuration
-
DefaultKafkaListenerExceptionHandler
Deprecated, for removal: This API element is subject to removal in a future version.
-
-
Method Details
-
handle
- Specified by:
handlein interfaceio.micronaut.core.exceptions.ExceptionHandler<KafkaListenerException>
-
setSkipRecordOnDeserializationFailure
public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializationFailure) Sets whether the seek past records that are not deserializable.- Parameters:
skipRecordOnDeserializationFailure- True if records that are not deserializable should be skipped.
-
setCommitRecordOnDeserializationFailure
public void setCommitRecordOnDeserializationFailure(boolean commitRecordOnDeserializationFailure) Sets whether to commit the offset of past records that are not deserializable and are skipped.- Parameters:
commitRecordOnDeserializationFailure- True if the offset for records that are not deserializable should be committed after being skipped.
-
seekPastDeserializationError
protected void seekPastDeserializationError(@NonNull @NonNull SerializationException cause, @NonNull @NonNull Object consumerBean, @NonNull @NonNull Consumer<?, ?> kafkaConsumer) Seeks past a serialization exception if an error occurs. Additionally commits the offset if commitRecordOnDeserializationFailure is set- Parameters:
cause- The causeconsumerBean- The consumer beankafkaConsumer- The kafka consumer
-
DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration)