[Spring] Embedded kafka 테스트 진행
by 코박7- 테스트는 junit 5.8.1. version 으로 진행하였습니다.
gradle 설정
// kafak-test 라이브러리 의존성 주입.
testImplementation 'org.springframework.kafka:spring-kafka-test'
Producer, Consumer Config 설정
package com.example.vehicle.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("localhost:9092")
private String BOOTSTRAP_ADDRESS;
@Value("earliest")
private String AUTO_OFFSET_RESET;
@Value("false")
private boolean AUTO_COMMIT;
@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
package com.example.vehicle.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("localhost:9092")
private String BOOTSTRAP_SERVER;
@Bean
public ProducerFactory<String, String> factory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(factory()); }
}
Producer 작성
package com.example.vehicle.adaptor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaTestProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
}
}
Consumer 작성
package com.example.vehicle.adaptor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaTestConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestConsumer.class);
private final CountDownLatch latch = new CountDownLatch(1);
private String convertMessage;
@KafkaListener(topics = "test", groupId = "testGroup")
public void consume(String message) {
log.info("Received message {}", message);
convertMessage = message;
latch.countDown();
}
public String getConvertMessage() {
return convertMessage;
}
public CountDownLatch getLatch() {
return latch;
}
}
테스트 코드 작성
package com.example.vehicle.kafka;
import com.example.vehicle.adaptor.KafkaTestConsumer;
import com.example.vehicle.adaptor.KafkaTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class KafkaUnitTest {
@Autowired
private KafkaTestProducer producer;
@Autowired
private KafkaTestConsumer consumer;
@Test
void kafkaUnitTest() throws InterruptedException {
// given
String topic = "test";
String payload = "들어가나요?";
// when
producer.send(topic, payload);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
// then
assertThat(consumer.getConvertMessage()).isEqualTo(payload);
}
}
위와 같이 세팅, 테스트 결과 정상적으로 Producer 에서 데이터를 보내고, Consumer 에서 데이터를 받는게 확인됐습니다.
이상.
'spring' 카테고리의 다른 글
[Spring] 간단하게 POST 요청 보내기 (2) | 2023.07.25 |
---|---|
[Spring] CORS 란? CORS 해결 (0) | 2023.07.13 |
[Spring] File 을 MultipartFile 로 변경 (1) | 2023.07.07 |
[Spring] @Vaild 어노테이션을 리스트에 적용시켜보자 (0) | 2023.06.25 |
[Spring] validation 어노테이션으로 간단한 유효성 검사 (0) | 2023.06.18 |
블로그의 정보
코딩박스
코박7