코딩박스

[Spring] Embedded kafka 테스트 진행

by 코박7
  • 테스트는 junit 5.8.1. version 으로 진행하였습니다.

 

gradle 설정

// kafak-test 라이브러리 의존성 주입.
testImplementation 'org.springframework.kafka:spring-kafka-test'

 

 

Producer, Consumer Config 설정

package com.example.vehicle.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

   @Value("localhost:9092")
   private String BOOTSTRAP_ADDRESS;

   @Value("earliest")
   private String AUTO_OFFSET_RESET;

   @Value("false")
   private boolean AUTO_COMMIT;

   @Bean
    ConsumerFactory<String, String> consumerFactory() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
       return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
   }

   @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       return factory;
   }

}

 

package com.example.vehicle.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("localhost:9092")
    private String BOOTSTRAP_SERVER;

    @Bean
    public ProducerFactory<String, String> factory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(factory()); }

}

 

 

Producer 작성

package com.example.vehicle.adaptor;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaTestProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        kafkaTemplate.send(topic, payload);
    }

}

 

Consumer 작성

package com.example.vehicle.adaptor;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaTestConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestConsumer.class);
    private final CountDownLatch latch = new CountDownLatch(1);
    private String convertMessage;

    @KafkaListener(topics = "test", groupId = "testGroup")
    public void consume(String message) {
        log.info("Received message {}", message);
        convertMessage = message;
        latch.countDown();
    }

    public String getConvertMessage() {
        return convertMessage;
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

 

 

테스트 코드 작성

package com.example.vehicle.kafka;

import com.example.vehicle.adaptor.KafkaTestConsumer;
import com.example.vehicle.adaptor.KafkaTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class KafkaUnitTest {

    @Autowired
    private KafkaTestProducer producer;

    @Autowired
    private KafkaTestConsumer consumer;

    @Test
    void kafkaUnitTest() throws InterruptedException {

        // given
        String topic = "test";
        String payload = "들어가나요?";

        // when
        producer.send(topic, payload);
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);

        // then
        assertThat(consumer.getConvertMessage()).isEqualTo(payload);

    }
}

 

위와 같이 세팅, 테스트 결과 정상적으로 Producer 에서 데이터를 보내고, Consumer 에서 데이터를 받는게 확인됐습니다.

 

이상.

블로그의 정보

코딩박스

코박7

활동하기