Spring Boot와 Kafka 테스트: Embedded Kafka로 상품 좋아요 메시지 검증하기

2024. 10. 13. 18:56·SpringBoot

 

 

들어가며

레디스에 캐싱되어 있는 각각의 상품에 대해서 사용자가 좋아요를 누른 정보들을

상품 서비스에서 주기적으로 DB에 dump하는 과정에서 추가적으로 유저 서비스의 DB에 저장할 수 있도록 하기 위해 중간에 카프카를 두고 비동기적으로 통신했는데,

 

이번에는 상품 서비스쪽에 작성한 카프카 프로듀서 관련 코드에 대해서

테스트 코드를 작성해본 내용을 기록하고자 글을 작성해보았습니다.

 

 

 

 

어떻게 카프카 테스트 코드를 작성해야 할까?

처음으로 카프카 관련 테스트 코드를 작성하다보니

 

아래 기존의 레디스 관련 테스트 코드를 작성할 때와 같이,

레디스 컨테이너를 생성하고, 매 테스트 전에 레디스 데이터를 초기화하여

테스트가 레디스에 의존하는 기능을 검증할 수 있도록하는 Testcontainer 방식으로 수행해야 하는지에 대한 고민이 있었습니다.

public abstract class AbstractContainerRedisTest {
    static final String REDIS_IMAGE = "redis:6-alpine";
    static final GenericContainer REDIS_CONTAINER;

    @Autowired
    private StringRedisTemplate redisTemplate;

    static {
        REDIS_CONTAINER = new GenericContainer<>(REDIS_IMAGE)
                .withExposedPorts(6379)
                .withReuse(true);
        REDIS_CONTAINER.start();
    }

    @BeforeEach
    void clearAll() {
        Set<String> keys = redisTemplate.keys("*");
        if (keys != null) {
            for (String key : keys) {
                redisTemplate.delete(key);
            }
        }
    }

    @DynamicPropertySource
    public static void overrideProps(DynamicPropertyRegistry registry) {
        registry.add("spring.redis.host", REDIS_CONTAINER::getHost);
        registry.add("spring.redis.port", () -> String.valueOf(REDIS_CONTAINER.getMappedPort(6379)));
    }
}

 

 

그러나, 스프링에서 spring-kafka-test를 통해 embedded 방식으로

kafka를 테스트 할 수 있는 방법을 제공하는 것을 확인할 수 있었습니다.

https://docs.spring.io/spring-kafka/reference/testing.html

 

Testing Applications :: Spring Kafka

When using @EmbeddedKafka with @SpringJUnitConfig, it is recommended to use @DirtiesContext on the test class. This is to prevent potential race conditions occurring during the JVM shutdown after running multiple tests in a test suite. For example, without

docs.spring.io

 

 

아래는 테스트 코드를 작성할 대상의 코드입니다.

@Service
@RequiredArgsConstructor
@Slf4j
public class ProductLikeMessageSender {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, ProductLikeMessage productLikeMessage) {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        try {
            String stringMessage = objectMapper.writeValueAsString(productLikeMessage);
            log.info("product-like Message Created : " + stringMessage);

            kafkaTemplate.send(topic, stringMessage);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

}

 

 

 

spring-kafka-test 설정

아래 공식 문서의 내용과 같이 JUnit5에서 @EmbeddedKafka를 사용하는 방법으로 테스트 코드를 작성하고자 합니다.

공식문서

 

 

의존성 추가

기본적으로 Junit5가 현 스프링부트에 spring-boot-starter-test를 통해 내장되어 있으므로

아래와 같은 spirng-kafka-test 의존성만 별도로 추가해주었습니다.

testImplementation 'org.springframework.kafka:spring-kafka-test'
@SpringBootTest
@EmbeddedKafka(topics = {"product-like"})
class ProductLikeMessageSenderTest {

    @Autowired
    private ProductLikeMessageSender productLikeMessageSender;

    @Autowired
    private ObjectMapper objectMapper;


    @Test
    public void testSendProductLikeMessage(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) throws Exception {
    			...
    }
}

 

 

consumer 설정

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = consumerFactory.createConsumer();

 

위와 같이 KafkaTestUtils를 통해서 consumer를 설정할 수 있습니다.

consumerProps를 통해 consumer group, offset 자동 커밋 여부와

테스트 중에 사용되는 임베디드 카프카 브로커를 주입시킵니다.

이후에 consumerProps로부터 Kafka 소비자를 생성할 수 있는 consumerFactory를 추가하였습니다.

 

 

embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "product-like");
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, "product-like");
ProductLikeMessage consumedMessage = objectMapper.readValue(singleRecord.value(), ProductLikeMessage.class);
consumer.close();

 

테스트 코드 검증 부분에서는 현재 사용하고 있는 "product-like"라는 토픽에 대해서

embeddedKafkaBroker가 메시지를 소비하도록 하고,

KafkaTestUtils.getSingleRecord를 사용하여 단일 메시지를 가져오도록 합니다.

 

 

 

 

최종 테스트 코드

package com.wl2c.elswhereproductservice.domain.product.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.wl2c.elswhereproductservice.domain.like.model.LikeState;
import com.wl2c.elswhereproductservice.domain.product.model.dto.ProductLikeMessage;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import java.util.Map;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

@SpringBootTest
@EmbeddedKafka(topics = {"product-like"})
class ProductLikeMessageSenderTest {

    @Autowired
    private ProductLikeMessageSender productLikeMessageSender;

    @Autowired
    private ObjectMapper objectMapper;


    @Test
    @DisplayName("Kafka에 상품 좋아요 메시지를 전송하고, 메시지가 정상적으로 소비되는지 검증")
    public void testSendProductLikeMessage(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) throws Exception {
        // given
        ProductLikeMessage message = ProductLikeMessage.builder()
                .userId(1L)
                .productId(10L)
                .likeState(LikeState.LIKED)
                .build();

        // Kafka Consumer 설정
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<String, String> consumer = consumerFactory.createConsumer();

        // when
        productLikeMessageSender.send("product-like", message);

        // then
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "product-like");
        ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, "product-like");
        ProductLikeMessage consumedMessage = objectMapper.readValue(singleRecord.value(), ProductLikeMessage.class);
        consumer.close();

        assertThat(consumedMessage.getUserId()).isEqualTo(1L);
        assertThat(consumedMessage.getProductId()).isEqualTo(10L);

    }
}

 

테스트가 정상적으로 동작하는 것을 확인할 수 있었습니다.

 

 

'SpringBoot' 카테고리의 다른 글

Redis 캐싱을 사용하여, 일일 조회수와 좋아요 증감 수를 반영한 일일 HOT 상품 목록 조회 기능 개발  (0) 2024.12.19
비효율적인 상품 상세 검색 코드 수정과 nGrinder를 활용한 성능 테스트  (0) 2024.11.12
MSA) 서비스 별 각 인스턴스에서 애플리케이션을 Docker 컨테이너화 후, 발생한 Eureka Client 간의 통신 문제  (0) 2024.07.09
MSA) Spring Cloud 기반의 MSA 구조에서 Swagger 통합하기 + FastAPI의 Swagger까지  (0) 2024.06.17
MSA) Spring Cloud Eureka에 FastAPI 서버를 Client로 등록하기  (0) 2024.06.01
'SpringBoot' 카테고리의 다른 글
  • Redis 캐싱을 사용하여, 일일 조회수와 좋아요 증감 수를 반영한 일일 HOT 상품 목록 조회 기능 개발
  • 비효율적인 상품 상세 검색 코드 수정과 nGrinder를 활용한 성능 테스트
  • MSA) 서비스 별 각 인스턴스에서 애플리케이션을 Docker 컨테이너화 후, 발생한 Eureka Client 간의 통신 문제
  • MSA) Spring Cloud 기반의 MSA 구조에서 Swagger 통합하기 + FastAPI의 Swagger까지
개발이조아용
개발이조아용
IT 개발에서 배운 성장의 기록을 작성합니다.
  • 개발이조아용
    계속 하다 보면?!
    개발이조아용
  • 전체
    오늘
    어제
    • 분류 전체보기 (67)
      • Tibero DB (Tmax AI Bigdata .. (7)
      • Git (2)
      • CI CD (2)
      • Redis (3)
      • SpringBoot (16)
      • SQL 문제 풀이 (8)
      • Apache Kafka (8)
        • 오류 해결 (3)
        • 개념 정리 (4)
        • 보안 (1)
      • Nginx (3)
      • SW마에스트로 (3)
      • Kubernetes (4)
      • AWS (5)
      • gRPC (3)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    Git
    SASL 인증
    Tibero
    SpringBoot
    grpc
    소프트웨어 마에스트로
    Kafka SASL
    Kafka 개념
    Kafka 오류
    K8S
    MSA
    DynamoDB 연동
    redis script
    leetcode
    nginx
    Redis 개념
    SQL
    redis
    KAFKA
    sql 문제
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
개발이조아용
Spring Boot와 Kafka 테스트: Embedded Kafka로 상품 좋아요 메시지 검증하기
상단으로

티스토리툴바