문제
현재 스프링부트에서 STOMP를 활용한 채팅방을 구현하는 도중에
아래 오류를 해결한 후에 또 다른 오류를 직면하게 되었다.
https://kjungw1025.tistory.com/24
SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류
문제 스프링부트에서 STOMP를 활용한 채팅방을 구현하기 위해 기존 In memory broker 방식에서 External broker 중 하나인 Kafka를 통해 아래 형태의 메시지를 전달하려는데, @Getter @NoArgsConstructor @ToString publi
kjungw1025.tistory.com
KafkaTemplate 부분을 보다시피 채팅방 메시지를 전달하기 위해서
value 값을 Message 객체로 넘겨주기 때문에 Jsonserializer로 직렬화하는 것을 볼 수 있다.
// KafkaProducerConfig.java
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
....
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
이 상태에서 Kafka에 데이터를 넣어 처리하는 도중, 에러가 발생했다.
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1303) ~[spring-kafka-2.8.11.jar:2.8.11]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition chatting-0 at offset 4. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:135) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1671) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1345) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.8.11.jar:2.8.11]
... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 116, 121, 112, 101, 34, 58, 34, 69, 78, 84, 69, 82, 34, 44, 34, 114, 111, 111, 109, 73, 100, 34, 58, 34, 54, 57, 53, 49, 97, 56, 101, 53, 45, 102, 53, 57, 98, 45, 52, 48, 97, 52, 45, 98, 48, 48, 48, 45, 57, 49, 55, 57, 100, 55, 52, 50, 54, 101, 49, 97, 34, 44, 34, 115, 101, 110, 100, 101, 114, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 32, -21, -117, -104, 32, -20, -98, -123, -20, -98, -91, 33, 33, 34, 125]] from topic [chatting]
이유
에러 로그를 살펴보면 직렬화/역직렬화 오류가 발생했다. 카프카에 Message 객체 클래스를 전송했고
org.springframework.kafka.support.serializer.JsonDeserializer를 통해
역직렬화 하는 과정에서 발생하는 오류였다.
근데 한 가지 의문이 들었던 점은
에러 로그의 맨 마지막줄을 보면 아스키 코드 변환임에도 음수 값들이 존재했었다. (ex. -21, -117, -104 ....)
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't deserialize data [[123, 34, 116, 121, 112, 101, 34, 58, 34, 69, 78, 84, 69, 82, 34, 44, 34, 114, 111, 111, 109, 73, 100, 34, 58, 34, 54, 57, 53, 49, 97, 56, 101, 53, 45, 102, 53, 57, 98, 45, 52, 48, 97, 52, 45, 98, 48, 48, 48, 45, 57, 49, 55, 57, 100, 55, 52, 50, 54, 101, 49, 97, 34, 44, 34, 115, 101, 110, 100, 101, 114, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 106, 117, 110, 103, 119, 111, 111, 32, -21, -117, -104, 32, -20, -98, -123, -20, -98, -91, 33, 33, 34, 125]] from topic [chatting]
엥?
내가 아는 아스키 코드는 0부터 127까지의 값을 다루고 있는걸로 알고 있는데
왜 음수 값들이 나오는거지...?
그렇다...
채팅 메시지를 입력할 때 한글 문자가 들어가기 때문에,
아스키 코드는 한글 문자를 지원하지 않으므로 발생한 에러였다.
이 부분을 해결하기 위해서 많은 고민을 했던 것 같다.
채팅 메시지 데이터를 전달해야되는데 한글이 지원 안된다니!!
(아직 부족한 부분이 많아서 못 찾은걸 수도 있습니다 ㅠㅠ)
해결
결론적으로는 한글 문자를 사용하지 않을 수 없기에 변환 방식을 바꿨다.
String(De)Serializer는 UTF-8로 변환되는 것을 확인했기에
기존 org.springframework.kafka.support.serializer.Json(De)serializer에서
org.apache.kafka.common.serialization.String(De)serializer로 바꿔서 변환을 진행했다.
String(De)Serializer로 직렬화와 역직렬화를 수행하기 때문에
JsonSerializer / JsonDeserializer를 ObjectMapper와 활용하는 것을 공식 문서에서 확인하여,
직렬화 과정 전에 ObjectMapper를 사용하여 입력된 Message 객체를 JSON 문자열로 변환하는 과정을 수행해주었다.
(역직렬화 과정에서는 ObjectMapper를 사용하여 문자열 -> Message 객체로)
KafkaProducerConfig.java
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
....
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
....
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
KafkaConsumerConfig.java
@EnableKafka
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumerConfig {
....
// KafkaListener 컨테이너 펙토리를 생성하는 Bean 메서드
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(customErrorHandler());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> consumerConfig = new HashMap<>();
....
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerConfig.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(consumerConfig, new StringDeserializer(), new StringDeserializer());
}
....
}
MessageSender.java
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageSender {
private final KafkaTemplate<String, String> kafkaTemplate;
// 메시지를 지정한 Kafka 토픽으로 전송
public void send(String topic, Message data) {
// 메시지를 KafkaTemplate 를 사용하여 지정된 토픽으로 전송
ObjectMapper objectMapper = new ObjectMapper();
try {
String stringChat = objectMapper.writeValueAsString(data);
log.info("MessageSender Message -> String형 : " + stringChat);
kafkaTemplate.send(topic, stringChat);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
MessageReceiver.java
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageReceiver {
private final SimpMessageSendingOperations template;
@KafkaListener(topics = "chatting", containerFactory = "kafkaConsumerContainerFactory")
public void receiveMessage(String stringChat) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
Message message = objectMapper.readValue(stringChat, Message.class);
log.info("Consumed Message : " + stringChat);
// 메시지 객체 내부의 채팅방 번호를 참조하여, 해당 채팅방 구독자에게 메시지를 발송
template.convertAndSend("/sub/chatRoom/enter" + message.getRoomId(), message);
}
}
위와 같이 코드를 변경하니 에러 없이 잘 해결되었다.
아래는 Kafka Consumer에서 값이 잘 들어오나 확인한 사진
글을 작성하면서 보니깐 리펙토링이 필요한 부분이 보여서
추후에 진행하도록 해야겠다!
'Apache Kafka > 오류 해결' 카테고리의 다른 글
SpringBoot와 Kafka 연동 시 발생한 ErrorHandlingDeserializer 관련 오류 (0) | 2024.01.14 |
---|---|
SpringBoot와 Docker compose로 pull 받은 Kafka 연동 시 발생한 오류 (0) | 2024.01.14 |