ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류
    Apache Kafka/오류 해결 2024. 1. 14. 23:27

    문제

    스프링부트에서 STOMP를 활용한 채팅방을 구현하기 위해

    기존 In memory broker 방식에서 External broker 중 하나인 Kafka를 통해 아래 형태의 메시지를 전달하려는데,

     

    @Getter
    @NoArgsConstructor
    @ToString
    public class Message {
    
        @NotNull
        private MessageType type;
    
        private String roomId;
    
        @NotNull
        private String sender;
    
        @NotNull
        private String message;
    
        @Builder
        private Message(MessageType type,
                        String roomId,
                        String sender,
                        String message) {
            this.type = type;
            this.roomId = roomId;
            this.sender = sender;
            this.message = message;
        }
    }

     

    아래와 같은 오류가 무한 반복하며 에러 메시지가 올라왔다.

     

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
       at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1303) ~[spring-kafka-2.8.11.jar:2.8.11]
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
       at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
    Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition chatting-0 at offset 4. If needed, please seek past the record to continue consumption.
       at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.2.jar:na]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.8.11.jar:2.8.11]
       ... 3 common frames omitted

     

    이유

    역직렬화(나의 경우, 위 Message 객체)를 수행하는 과정에서 

    카프카 컨슈머는 역직렬화 안되면 될 때까지, 서버가 망가질 때까지 계속 동작한다고 한다.

    무한 루프로 에러 메시지가 발생했던 이유... 

    (안되면 그냥 멈추지)

    👇 무한 루프가 발생하는 해당 코드 확인👇

    https://serverwizard.tistory.com/285

     

    Spring Kafka 적용하며 발생했었던 이슈 정리

    1. Consumer exception 문제 상황 : Consumer 동작을 테스트하기 위해 Kafka Cluster로 메시지를 수동으로 발행하고 있었다. 이때, 오타가 발생했고 이로 인해 사전에 약속된 형태(포맷)의 메시지가 아닌 다른

    serverwizard.tistory.com

     

    StringDeserializer를 사용하면 역직렬화 실패 가능성은 매우 낮아지지만,

    실제로 사용하려면 역직렬화한 문자열을 다시 업무에 사용하는 데이터 타입으로 변환해줘야 한다.

     

    이 과정을 JSON으로 한 번에 해주는 게 JsonDeserializer이다.
    그러나 JsonDeserializer를 사용했을 때, 역직렬화 오류가 발생할 수 있는 상황에서는

    반드시 ErrorHandlingDeserializer를 사용해야 Poison Pill로 인한 장애를 막을 수 있다고 한다.

     

    해결

    ErrorHandlingDeserializer 설정하기

    1. bean을 통해 설정하는 법

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class KafkaConsumerConfig {
    
    	.....
    
        @Bean
        public ConsumerFactory<String, Message> consumerFactory() {
            JsonDeserializer<Message> deserializer = new JsonDeserializer<>();
            // 패키지 신뢰 오류 때문에 모든 패키지를 신뢰하도록 설정했음
            deserializer.addTrustedPackages("*");
        
            Map<String, Object> consumerConfig = new HashMap<>();
            ....
            consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            consumerConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
            consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, deserializer);
    
            return new DefaultKafkaConsumerFactory<>(consumerConfig, new StringDeserializer(), deserializer);
        }
    
        private DefaultErrorHandler customErrorHandler() {
    
            return new DefaultErrorHandler((consumerRecord, e) -> {
                log.error("[Error] topic = {}, key = {}, value = {}, error message = {}",
                        consumerRecord.topic(),
                        consumerRecord.key(),
                        consumerRecord.value(),
                        e.getMessage());
            }, new FixedBackOff(1000L, 10));
        }
    }

     

     

    2. application.yml에서 설정하는 법

    spring:  
      kafka:
        consumer:
          key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        
        ....

     

     

    참고

    https://github.com/HomoEfficio/dev-tips/blob/master/Kafka%20Poison%20Pill%20Spring%20ErrorHandlingDeserializer.md

     

    https://yeongchan1228.tistory.com/104

     

    [Spring + Kafka] 에러 핸들링

    기본 정책 Spring-Kafka를 통해서 Consumer를 구현하고, Consumer에서 처리하는 과정에서 오류가 발생하면 기본 설정으로 최초 요청을 포함해 최대 10회까지 재시도를 한다. 모든 재시도 실패하게 되면

    yeongchan1228.tistory.com

     

Designed by Tistory.