들어가며
레디스에 캐싱되어 있는 각각의 상품에 대해서 사용자가 좋아요를 누른 정보들을
상품 서비스에서 주기적으로 DB에 dump하는 과정에서 추가적으로 유저 서비스의 DB에 저장할 수 있도록 하기 위해 중간에 카프카를 두고 비동기적으로 통신했는데,
이번에는 상품 서비스쪽에 작성한 카프카 프로듀서 관련 코드에 대해서
테스트 코드를 작성해본 내용을 기록하고자 글을 작성해보았습니다.
어떻게 카프카 테스트 코드를 작성해야 할까?
처음으로 카프카 관련 테스트 코드를 작성하다보니
아래 기존의 레디스 관련 테스트 코드를 작성할 때와 같이,
레디스 컨테이너를 생성하고, 매 테스트 전에 레디스 데이터를 초기화하여
테스트가 레디스에 의존하는 기능을 검증할 수 있도록하는 Testcontainer 방식으로 수행해야 하는지에 대한 고민이 있었습니다.
public abstract class AbstractContainerRedisTest {
static final String REDIS_IMAGE = "redis:6-alpine";
static final GenericContainer REDIS_CONTAINER;
@Autowired
private StringRedisTemplate redisTemplate;
static {
REDIS_CONTAINER = new GenericContainer<>(REDIS_IMAGE)
.withExposedPorts(6379)
.withReuse(true);
REDIS_CONTAINER.start();
}
@BeforeEach
void clearAll() {
Set<String> keys = redisTemplate.keys("*");
if (keys != null) {
for (String key : keys) {
redisTemplate.delete(key);
}
}
}
@DynamicPropertySource
public static void overrideProps(DynamicPropertyRegistry registry) {
registry.add("spring.redis.host", REDIS_CONTAINER::getHost);
registry.add("spring.redis.port", () -> String.valueOf(REDIS_CONTAINER.getMappedPort(6379)));
}
}
그러나, 스프링에서 spring-kafka-test를 통해 embedded 방식으로
kafka를 테스트 할 수 있는 방법을 제공하는 것을 확인할 수 있었습니다.
https://docs.spring.io/spring-kafka/reference/testing.html
Testing Applications :: Spring Kafka
When using @EmbeddedKafka with @SpringJUnitConfig, it is recommended to use @DirtiesContext on the test class. This is to prevent potential race conditions occurring during the JVM shutdown after running multiple tests in a test suite. For example, without
docs.spring.io
아래는 테스트 코드를 작성할 대상의 코드입니다.
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductLikeMessageSender {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, ProductLikeMessage productLikeMessage) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
try {
String stringMessage = objectMapper.writeValueAsString(productLikeMessage);
log.info("product-like Message Created : " + stringMessage);
kafkaTemplate.send(topic, stringMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
spring-kafka-test 설정
아래 공식 문서의 내용과 같이 JUnit5에서 @EmbeddedKafka를 사용하는 방법으로 테스트 코드를 작성하고자 합니다.
의존성 추가
기본적으로 Junit5가 현 스프링부트에 spring-boot-starter-test를 통해 내장되어 있으므로
아래와 같은 spirng-kafka-test 의존성만 별도로 추가해주었습니다.
testImplementation 'org.springframework.kafka:spring-kafka-test'
@SpringBootTest
@EmbeddedKafka(topics = {"product-like"})
class ProductLikeMessageSenderTest {
@Autowired
private ProductLikeMessageSender productLikeMessageSender;
@Autowired
private ObjectMapper objectMapper;
@Test
public void testSendProductLikeMessage(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) throws Exception {
...
}
}
consumer 설정
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = consumerFactory.createConsumer();
위와 같이 KafkaTestUtils를 통해서 consumer를 설정할 수 있습니다.
consumerProps를 통해 consumer group, offset 자동 커밋 여부와
테스트 중에 사용되는 임베디드 카프카 브로커를 주입시킵니다.
이후에 consumerProps로부터 Kafka 소비자를 생성할 수 있는 consumerFactory를 추가하였습니다.
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "product-like");
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, "product-like");
ProductLikeMessage consumedMessage = objectMapper.readValue(singleRecord.value(), ProductLikeMessage.class);
consumer.close();
테스트 코드 검증 부분에서는 현재 사용하고 있는 "product-like"라는 토픽에 대해서
embeddedKafkaBroker가 메시지를 소비하도록 하고,
KafkaTestUtils.getSingleRecord를 사용하여 단일 메시지를 가져오도록 합니다.
최종 테스트 코드
package com.wl2c.elswhereproductservice.domain.product.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wl2c.elswhereproductservice.domain.like.model.LikeState;
import com.wl2c.elswhereproductservice.domain.product.model.dto.ProductLikeMessage;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import java.util.Map;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@SpringBootTest
@EmbeddedKafka(topics = {"product-like"})
class ProductLikeMessageSenderTest {
@Autowired
private ProductLikeMessageSender productLikeMessageSender;
@Autowired
private ObjectMapper objectMapper;
@Test
@DisplayName("Kafka에 상품 좋아요 메시지를 전송하고, 메시지가 정상적으로 소비되는지 검증")
public void testSendProductLikeMessage(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) throws Exception {
// given
ProductLikeMessage message = ProductLikeMessage.builder()
.userId(1L)
.productId(10L)
.likeState(LikeState.LIKED)
.build();
// Kafka Consumer 설정
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = consumerFactory.createConsumer();
// when
productLikeMessageSender.send("product-like", message);
// then
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "product-like");
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, "product-like");
ProductLikeMessage consumedMessage = objectMapper.readValue(singleRecord.value(), ProductLikeMessage.class);
consumer.close();
assertThat(consumedMessage.getUserId()).isEqualTo(1L);
assertThat(consumedMessage.getProductId()).isEqualTo(10L);
}
}
테스트가 정상적으로 동작하는 것을 확인할 수 있었습니다.
'SpringBoot' 카테고리의 다른 글
Redis 캐싱을 사용하여, 일일 조회수와 좋아요 증감 수를 반영한 일일 HOT 상품 목록 조회 기능 개발 (0) | 2024.12.19 |
---|---|
비효율적인 상품 상세 검색 코드 수정과 nGrinder를 활용한 성능 테스트 (0) | 2024.11.12 |
MSA) 서비스 별 각 인스턴스에서 애플리케이션을 Docker 컨테이너화 후, 발생한 Eureka Client 간의 통신 문제 (0) | 2024.07.09 |
MSA) Spring Cloud 기반의 MSA 구조에서 Swagger 통합하기 + FastAPI의 Swagger까지 (0) | 2024.06.17 |
MSA) Spring Cloud Eureka에 FastAPI 서버를 Client로 등록하기 (0) | 2024.06.01 |