ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Jsonserializer/JsonDeserializer 한글 처리 오류
    Apache Kafka/오류 해결 2024. 1. 15. 10:59

     

    문제

     

    현재 스프링부트에서 STOMP를 활용한 채팅방을 구현하는 도중에

    아래 오류를 해결한 후에 또 다른 오류를 직면하게 되었다.

     

    https://kjungw1025.tistory.com/24

     

    SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류

    문제 스프링부트에서 STOMP를 활용한 채팅방을 구현하기 위해 기존 In memory broker 방식에서 External broker 중 하나인 Kafka를 통해 아래 형태의 메시지를 전달하려는데, @Getter @NoArgsConstructor @ToString publi

    kjungw1025.tistory.com

     

     

    KafkaTemplate 부분을 보다시피 채팅방 메시지를 전달하기 위해서

    value 값을 Message 객체로 넘겨주기 때문에 Jsonserializer로 직렬화하는 것을 볼 수 있다.

        // KafkaProducerConfig.java
        
        @Bean
        public KafkaTemplate<String, Message> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
        
        ....
        
        @Bean
        public Map<String, Object> producerConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return props;
        }

     

    이 상태에서 Kafka에 데이터를 넣어 처리하는 도중, 에러가 발생했다.

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
       at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1303) ~[spring-kafka-2.8.11.jar:2.8.11]
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
       at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
    Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition chatting-0 at offset 4. If needed, please seek past the record to continue consumption.
       at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.1.2.jar:na]
       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.2.jar:na]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.11.jar:2.8.11]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.8.11.jar:2.8.11]
       ... 3 common frames omitted
    Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 116, 121, 112, 101, 34, 58, 34, 69, 78, 84, 69, 82, 34, 44, 34, 114, 111, 111, 109, 73, 100, 34, 58, 34, 54, 57, 53, 49, 97, 56, 101, 53, 45, 102, 53, 57, 98, 45, 52, 48, 97, 52, 45, 98, 48, 48, 48, 45, 57, 49, 55, 57, 100, 55, 52, 50, 54, 101, 49, 97, 34, 44, 34, 115, 101, 110, 100, 101, 114, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 32, -21, -117, -104, 32, -20, -98, -123, -20, -98, -91, 33, 33, 34, 125]] from topic [chatting]

     

     

    이유

     

    에러 로그를 살펴보면 직렬화/역직렬화 오류가 발생했다. 카프카에 Message 객체 클래스를 전송했고

    org.springframework.kafka.support.serializer.JsonDeserializer를 통해

    역직렬화 하는 과정에서 발생하는 오류였다.

     

    근데 한 가지 의문이 들었던 점은

    에러 로그의 맨 마지막줄을 보면 아스키 코드 변환임에도 음수 값들이 존재했었다. (ex. -21, -117,  -104 ....)

    Caused by: org.apache.kafka.common.errors.SerializationException: 
    Can't deserialize data [[123, 34, 116, 121, 112, 101, 34, 58, 34, 69, 78, 84, 69, 82, 34, 44, 34, 114, 111, 111, 109, 73, 100, 34, 58, 34, 54, 57, 53, 49, 97, 56, 101, 53, 45, 102, 53, 57, 98, 45, 52, 48, 97, 52, 45, 98, 48, 48, 48, 45, 57, 49, 55, 57, 100, 55, 52, 50, 54, 101, 49, 97, 34, 44, 34, 115, 101, 110, 100, 101, 114, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 32, -21, -117, -104, 32, -20, -98, -123, -20, -98, -91, 33, 33, 34, 125]] from topic [chatting]

     

     

    엥?

     

    내가 아는 아스키 코드는 0부터 127까지의 값을 다루고 있는걸로 알고 있는데

    왜 음수 값들이 나오는거지...?

     

    그렇다...

    채팅 메시지를 입력할 때 한글 문자가 들어가기 때문에,

    아스키 코드는 한글 문자를 지원하지 않으므로 발생한 에러였다.

     

    이 부분을 해결하기 위해서 많은 고민을 했던 것 같다.

    채팅 메시지 데이터를 전달해야되는데 한글이 지원 안된다니!!

    (아직 부족한 부분이 많아서 못 찾은걸 수도 있습니다 ㅠㅠ)

     

     

     

    해결

    결론적으로는 한글 문자를 사용하지 않을 수 없기에 변환 방식을 바꿨다.

    String(De)Serializer는 UTF-8로 변환되는 것을 확인했기에

    기존 org.springframework.kafka.support.serializer.Json(De)serializer에서

    org.apache.kafka.common.serialization.String(De)serializer로 바꿔서 변환을 진행했다.

     

     

    String(De)Serializer로 직렬화와 역직렬화를 수행하기 때문에

    JsonSerializer / JsonDeserializer를 ObjectMapper와 활용하는 것을 공식 문서에서 확인하여,

    직렬화 과정 전에 ObjectMapper를 사용하여 입력된 Message 객체를 JSON 문자열로 변환하는 과정을 수행해주었다.

    (역직렬화 과정에서는 ObjectMapper를 사용하여 문자열 -> Message 객체로)

    https://docs.spring.io/spring-kafka/reference/kafka/serdes.html

     

     

    KafkaProducerConfig.java

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    public class KafkaProducerConfig {
    
    	....
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
        
        ....
        
        @Bean
        public Map<String, Object> producerConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    }

     

     

    KafkaConsumerConfig.java

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class KafkaConsumerConfig {
    
    	....
    
        // KafkaListener 컨테이너 펙토리를 생성하는 Bean 메서드
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaConsumerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setCommonErrorHandler(customErrorHandler());
            return factory;
        }
    
    	@Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> consumerConfig = new HashMap<>();
    		....
            consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            consumerConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
            consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
    
            return new DefaultKafkaConsumerFactory<>(consumerConfig, new StringDeserializer(), new StringDeserializer());
        }
    
    	....
    }

     

     

    MessageSender.java

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public class MessageSender {
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        // 메시지를 지정한 Kafka 토픽으로 전송
        public void send(String topic, Message data) {
    
            // 메시지를 KafkaTemplate 를 사용하여 지정된 토픽으로 전송
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String stringChat = objectMapper.writeValueAsString(data);
                log.info("MessageSender Message -> String형 : " + stringChat);
                
                kafkaTemplate.send(topic, stringChat);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    }

     

     

    MessageReceiver.java

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public class MessageReceiver {
        private final SimpMessageSendingOperations template;
    
        @KafkaListener(topics = "chatting", containerFactory = "kafkaConsumerContainerFactory")
        public void receiveMessage(String stringChat) throws JsonProcessingException {
    
            ObjectMapper objectMapper = new ObjectMapper();
            Message message = objectMapper.readValue(stringChat, Message.class);
            log.info("Consumed Message : " + stringChat);
    
            // 메시지 객체 내부의 채팅방 번호를 참조하여, 해당 채팅방 구독자에게 메시지를 발송
            template.convertAndSend("/sub/chatRoom/enter" + message.getRoomId(), message);
        }
    }

     

     

    위와 같이 코드를 변경하니 에러 없이 잘 해결되었다.

    아래는 Kafka Consumer에서 값이 잘 들어오나 확인한 사진

     

     

     

    글을 작성하면서 보니깐 리펙토링이 필요한 부분이 보여서

    추후에 진행하도록 해야겠다!

     

     

Designed by Tistory.