-
SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류Apache Kafka/오류 해결 2024. 1. 14. 23:27
문제
스프링부트에서 STOMP를 활용한 채팅방을 구현하기 위해
기존 In memory broker 방식에서 External broker 중 하나인 Kafka를 통해 아래 형태의 메시지를 전달하려는데,
@Getter @NoArgsConstructor @ToString public class Message { @NotNull private MessageType type; private String roomId; @NotNull private String sender; @NotNull private String message; @Builder private Message(MessageType type, String roomId, String sender, String message) { this.type = type; this.roomId = roomId; this.sender = sender; this.message = message; } }
아래와 같은 오류가 무한 반복하며 에러 메시지가 올라왔다.
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1303) ~[spring-kafka-2.8.11.jar:2.8.11] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na] Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition chatting-0 at offset 4. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.1.2.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.2.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.11.jar:2.8.11] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.8.11.jar:2.8.11] ... 3 common frames omitted
이유
역직렬화(나의 경우, 위 Message 객체)를 수행하는 과정에서
카프카 컨슈머는 역직렬화 안되면 될 때까지, 서버가 망가질 때까지 계속 동작한다고 한다.
무한 루프로 에러 메시지가 발생했던 이유...
(안되면 그냥 멈추지)👇 무한 루프가 발생하는 해당 코드 확인👇
https://serverwizard.tistory.com/285
StringDeserializer를 사용하면 역직렬화 실패 가능성은 매우 낮아지지만,
실제로 사용하려면 역직렬화한 문자열을 다시 업무에 사용하는 데이터 타입으로 변환해줘야 한다.
이 과정을 JSON으로 한 번에 해주는 게 JsonDeserializer이다.
그러나 JsonDeserializer를 사용했을 때, 역직렬화 오류가 발생할 수 있는 상황에서는반드시 ErrorHandlingDeserializer를 사용해야 Poison Pill로 인한 장애를 막을 수 있다고 한다.
해결
ErrorHandlingDeserializer 설정하기
1. bean을 통해 설정하는 법
@EnableKafka @Configuration @RequiredArgsConstructor @Slf4j public class KafkaConsumerConfig { ..... @Bean public ConsumerFactory<String, Message> consumerFactory() { JsonDeserializer<Message> deserializer = new JsonDeserializer<>(); // 패키지 신뢰 오류 때문에 모든 패키지를 신뢰하도록 설정했음 deserializer.addTrustedPackages("*"); Map<String, Object> consumerConfig = new HashMap<>(); .... consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, deserializer); return new DefaultKafkaConsumerFactory<>(consumerConfig, new StringDeserializer(), deserializer); } private DefaultErrorHandler customErrorHandler() { return new DefaultErrorHandler((consumerRecord, e) -> { log.error("[Error] topic = {}, key = {}, value = {}, error message = {}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value(), e.getMessage()); }, new FixedBackOff(1000L, 10)); } }
2. application.yml에서 설정하는 법
spring: kafka: consumer: key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer ....
참고
https://yeongchan1228.tistory.com/104
'Apache Kafka > 오류 해결' 카테고리의 다른 글
Kafka Jsonserializer/JsonDeserializer 한글 처리 오류 (2) 2024.01.15 SpringBoot와 Docker compose로 pull 받은 Kafka 연동 시 발생한 오류 (0) 2024.01.14