Kafka Jsonserializer/JsonDeserializer 한글 처리 오류

2024. 1. 15. 10:59·Apache Kafka/오류 해결

 

문제

 

현재 스프링부트에서 STOMP를 활용한 채팅방을 구현하는 도중에

아래 오류를 해결한 후에 또 다른 오류를 직면하게 되었다.

 

https://kjungw1025.tistory.com/24

 

SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류

문제 스프링부트에서 STOMP를 활용한 채팅방을 구현하기 위해 기존 In memory broker 방식에서 External broker 중 하나인 Kafka를 통해 아래 형태의 메시지를 전달하려는데, @Getter @NoArgsConstructor @ToString publi

kjungw1025.tistory.com

 

 

KafkaTemplate 부분을 보다시피 채팅방 메시지를 전달하기 위해서

value 값을 Message 객체로 넘겨주기 때문에 Jsonserializer로 직렬화하는 것을 볼 수 있다.

    // KafkaProducerConfig.java
    
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    ....
    
    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

 

이 상태에서 Kafka에 데이터를 넣어 처리하는 도중, 에러가 발생했다.

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
   at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1303) ~[spring-kafka-2.8.11.jar:2.8.11]
   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
   at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition chatting-0 at offset 4. If needed, please seek past the record to continue consumption.
   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.1.2.jar:na]
   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.2.jar:na]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.8.11.jar:2.8.11]
   ... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 116, 121, 112, 101, 34, 58, 34, 69, 78, 84, 69, 82, 34, 44, 34, 114, 111, 111, 109, 73, 100, 34, 58, 34, 54, 57, 53, 49, 97, 56, 101, 53, 45, 102, 53, 57, 98, 45, 52, 48, 97, 52, 45, 98, 48, 48, 48, 45, 57, 49, 55, 57, 100, 55, 52, 50, 54, 101, 49, 97, 34, 44, 34, 115, 101, 110, 100, 101, 114, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 32, -21, -117, -104, 32, -20, -98, -123, -20, -98, -91, 33, 33, 34, 125]] from topic [chatting]

 

 

이유

 

에러 로그를 살펴보면 직렬화/역직렬화 오류가 발생했다. 카프카에 Message 객체 클래스를 전송했고

org.springframework.kafka.support.serializer.JsonDeserializer를 통해

역직렬화 하는 과정에서 발생하는 오류였다.

 

근데 한 가지 의문이 들었던 점은

에러 로그의 맨 마지막줄을 보면 아스키 코드 변환임에도 음수 값들이 존재했었다. (ex. -21, -117,  -104 ....)

Caused by: org.apache.kafka.common.errors.SerializationException: 
Can't deserialize data [[123, 34, 116, 121, 112, 101, 34, 58, 34, 69, 78, 84, 69, 82, 34, 44, 34, 114, 111, 111, 109, 73, 100, 34, 58, 34, 54, 57, 53, 49, 97, 56, 101, 53, 45, 102, 53, 57, 98, 45, 52, 48, 97, 52, 45, 98, 48, 48, 48, 45, 57, 49, 55, 57, 100, 55, 52, 50, 54, 101, 49, 97, 34, 44, 34, 115, 101, 110, 100, 101, 114, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 32, -21, -117, -104, 32, -20, -98, -123, -20, -98, -91, 33, 33, 34, 125]] from topic [chatting]

 

 

엥?

 

내가 아는 아스키 코드는 0부터 127까지의 값을 다루고 있는걸로 알고 있는데

왜 음수 값들이 나오는거지...?

 

그렇다...

채팅 메시지를 입력할 때 한글 문자가 들어가기 때문에,

아스키 코드는 한글 문자를 지원하지 않으므로 발생한 에러였다.

 

이 부분을 해결하기 위해서 많은 고민을 했던 것 같다.

채팅 메시지 데이터를 전달해야되는데 한글이 지원 안된다니!!

(아직 부족한 부분이 많아서 못 찾은걸 수도 있습니다 ㅠㅠ)

 

 

 

해결

결론적으로는 한글 문자를 사용하지 않을 수 없기에 변환 방식을 바꿨다.

String(De)Serializer는 UTF-8로 변환되는 것을 확인했기에

기존 org.springframework.kafka.support.serializer.Json(De)serializer에서

org.apache.kafka.common.serialization.String(De)serializer로 바꿔서 변환을 진행했다.

 

 

String(De)Serializer로 직렬화와 역직렬화를 수행하기 때문에

JsonSerializer / JsonDeserializer를 ObjectMapper와 활용하는 것을 공식 문서에서 확인하여,

직렬화 과정 전에 ObjectMapper를 사용하여 입력된 Message 객체를 JSON 문자열로 변환하는 과정을 수행해주었다.

(역직렬화 과정에서는 ObjectMapper를 사용하여 문자열 -> Message 객체로)

https://docs.spring.io/spring-kafka/reference/kafka/serdes.html

 

 

KafkaProducerConfig.java

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {

	....

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    ....
    
    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

 

 

KafkaConsumerConfig.java

@EnableKafka
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumerConfig {

	....

    // KafkaListener 컨테이너 펙토리를 생성하는 Bean 메서드
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(customErrorHandler());
        return factory;
    }

	@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> consumerConfig = new HashMap<>();
		....
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        consumerConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(consumerConfig, new StringDeserializer(), new StringDeserializer());
    }

	....
}

 

 

MessageSender.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MessageSender {
    private final KafkaTemplate<String, String> kafkaTemplate;

    // 메시지를 지정한 Kafka 토픽으로 전송
    public void send(String topic, Message data) {

        // 메시지를 KafkaTemplate 를 사용하여 지정된 토픽으로 전송
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String stringChat = objectMapper.writeValueAsString(data);
            log.info("MessageSender Message -> String형 : " + stringChat);
            
            kafkaTemplate.send(topic, stringChat);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

 

 

MessageReceiver.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MessageReceiver {
    private final SimpMessageSendingOperations template;

    @KafkaListener(topics = "chatting", containerFactory = "kafkaConsumerContainerFactory")
    public void receiveMessage(String stringChat) throws JsonProcessingException {

        ObjectMapper objectMapper = new ObjectMapper();
        Message message = objectMapper.readValue(stringChat, Message.class);
        log.info("Consumed Message : " + stringChat);

        // 메시지 객체 내부의 채팅방 번호를 참조하여, 해당 채팅방 구독자에게 메시지를 발송
        template.convertAndSend("/sub/chatRoom/enter" + message.getRoomId(), message);
    }
}

 

 

위와 같이 코드를 변경하니 에러 없이 잘 해결되었다.

아래는 Kafka Consumer에서 값이 잘 들어오나 확인한 사진

 

 

 

글을 작성하면서 보니깐 리펙토링이 필요한 부분이 보여서

추후에 진행하도록 해야겠다!

 

 

'Apache Kafka > 오류 해결' 카테고리의 다른 글

SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류  (0) 2024.01.14
SpringBoot와 Docker compose로 pull 받은 Kafka 연동 시 발생한 오류  (0) 2024.01.14
'Apache Kafka/오류 해결' 카테고리의 다른 글
  • SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류
  • SpringBoot와 Docker compose로 pull 받은 Kafka 연동 시 발생한 오류
개발이조아용
개발이조아용
IT 개발에서 배운 성장의 기록을 작성합니다.
  • 개발이조아용
    계속 하다 보면?!
    개발이조아용
  • 전체
    오늘
    어제
    • 분류 전체보기 (67)
      • Tibero DB (Tmax AI Bigdata .. (7)
      • Git (2)
      • CI CD (2)
      • Redis (3)
      • SpringBoot (16)
      • SQL 문제 풀이 (8)
      • Apache Kafka (8)
        • 오류 해결 (3)
        • 개념 정리 (4)
        • 보안 (1)
      • Nginx (3)
      • SW마에스트로 (3)
      • Kubernetes (4)
      • AWS (5)
      • gRPC (3)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    SQL
    Git
    redis
    SASL 인증
    KAFKA
    Redis 개념
    grpc
    nginx
    Kafka 오류
    Kafka SASL
    sql 문제
    Tibero
    소프트웨어 마에스트로
    K8S
    MSA
    leetcode
    Kafka 개념
    SpringBoot
    DynamoDB 연동
    redis script
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
개발이조아용
Kafka Jsonserializer/JsonDeserializer 한글 처리 오류
상단으로

티스토리툴바