https://docs.spring.io/spring-kafka/reference/testing.html
공식 문서를 참고해서 작성했다.
최근 프로젝트에서 Kafka를 적용하면서 Kafka와 관련된 테스트 케이스를 작성했다.
0. Dependency
testImplementation 'org.springframework.kafka:spring-kafka-test'
//JUnit + AssertJ
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2'
testImplementation 'org.assertj:assertj-core:3.23.1'
1. Annotations
@SpringBootTest //Spring Boot 애플리케이션 테스트. Application Context를 로드해주고, 테스트 환경에서 Bean을 주입받을 수 있다.
@ExtendWith(SpringExtension.class)
@SpringJUnitConfig // Embedded Broker가 Test Application Context에 추가됨. -> Broker를 Autowire 해줄 수 있음.
@EmbeddedKafka
public class KafkaServiceTest {
Kafka 테스트를 위해선 EmbeddedKafkaBroker를 이용하는데, 이를 bean에 등록하기 위해선 @EmbeddedKafka 를 사용한다.
JUnit5에서 @EmbeddedKafka를 사용하는 방법 중,
@SpringJUnitConfig 를 사용하게 되면 Embedded Broker가 테스트 Application Context에 추가된다.
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
이렇게 주입받을 수 있다!!
2. Consumer Settings
2-1. Consumer 설정
먼저 Consumer를 설정해보자.
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("publish-group", "false",
embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaTestUtils 클래스는 Kafka 테스트를 위해 여러 기능들을 제공하는 클래스이다. consumer설정, producer설정, record 가져오기, offset가져오기 등 여러 테스트를 진행해볼 수 있다.
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
KafkaUtils를 사용하여 group 이름과 auto-commit을 설정해주고, 주입받았던 EmbeddedKafka를 넘겨준다.
그렇다면 간단하게 ConsumerFactory를 만들 수 있다.
2-2. Container 설정
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override //Listener 설정
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println("[ Kafka Publish Test ] Message received : " + record.value());
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getPartitionsPerTopic());
먼저 Container를 설정해둔다. topic 이름을 전달하여 생성하고, KafkaMessageListenerContainer를 만들어주면 된다.
메세지를 받고 저장해둘 ConsumerRecord가 들어갈 BlockingQueue를 미리 만들어두고, container 내에서 Message Listener를 메서드를 Override해서 설정해둔다.
container가 할당된 파티션 수를 가질때까지 기다리기 위해 ContainerTestUtils의 waitForAssignment 메서드를 이용한다.
3. Producer Settings
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
마찬가지로 KafkaTestUtils 클래스를 이용해 Producer를 설정해준다.
4. Assertions
나는 메세지 발행 테스트, 메세지 성능 테스트를 진행했다.
Assertion은 assertJ를 이용했다.
4-1. 메세지 발행 테스트
//when 1 - test message 발행
System.out.println("[ Kafka Publish Test ] Message Publishing (1)");
template.sendDefault("test message");
//then 1 - test message가 있는지
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS); //메세지 꺼내오기
assertThat(received).has(value("test message")); //발행한 메세지가 있는지?
KafkaTemplate을 통해 메세지를 발행하고, 미리 설정해둔 BlockingQueue에서 메세지를 꺼내온다.
발행한 메세지인 "test message"가 있는지 검사한다.
//when 2 - test message 2 발행
System.out.println("[ Kafka Publish Test ] Message Publishing (2)");
template.sendDefault(0, 2, "test message 2");
//then 2 - test message 2 를 보유중인지
ConsumerRecord<Integer, String> secondReceived = records.poll(10, TimeUnit.SECONDS);
assertThat(secondReceived).has(key(2)); //key가 있는지?
assertThat(secondReceived).has(value("test message 2")); //발행한 메세지가 있는지?
assertThat(secondReceived).has(partition(0)); //partition이 있는지?
두 번째로는 파티션 0, key 2 와 함께 메세지를 발행한다.
각 key, message, partition이 있는지 검사한다.
4-2. 메세지 성능 테스트
System.out.println("[ Kafka Performance Test ] Message Publishing start...");
startInstant = Instant.now(); //현재 기록
for (int i = 0; i < 100; i++) {
sendMessage(template, "Performance Test Message "+i);
}
메세지 성능 테스트는 총 100개의 메세지를 발행했다.
private static Instant startInstant;
미리 설정해둔 전역변수에 시작 Instant를 넣어두었다.
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
Instant now = Instant.now();
System.out.println("[ Kafka Performance Test ] Message received : " + record.value());
System.out.println("[ Kafka Performance Test ] Time Passed : " + Duration.between(startInstant, now).toMillis() + "ms");
}
Listener는 위와 같이 설정했다.
메세지를 받을 때 Instant를 설정해두고, 미리 설정해둔 startInstant와 비교해서 ms 단위로 차이를 출력했다.
- 최종 코드
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.*;
@SpringBootTest //Spring Boot 애플리케이션 테스트. Application Context를 로드해주고, 테스트 환경에서 Bean을 주입받을 수 있다.
@ExtendWith(SpringExtension.class)
@SpringJUnitConfig // Embedded Broker가 Test Application Context에 추가됨. -> Broker를 Autowire 해줄 수 있음.
@EmbeddedKafka
public class KafkaServiceTest {
@BeforeEach //테스트 전 처리
void setUp() {}
@AfterAll //테스트 후 처리
static void afterAll() {}
private static final String PUBLISH_TOPIC = "publish-topic"; //토픽 이름 설정
private static final String PERFORMANCE_TOPIC = "performance-topic"; //토픽 이름 설정
private static Instant startInstant;
@Test
@DisplayName("Kafka 메세지 발행 테스트")
public void messagePublishingTest( @Autowired EmbeddedKafkaBroker embeddedKafka) throws Exception {
//Given
/**
* Consumer Settings
*/
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("publish-group", "false",
embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
/**
* Container & Listener Settings
*/
ContainerProperties containerProperties = new ContainerProperties(PUBLISH_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override //Listener 설정
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println("[ Kafka Publish Test ] Message received : " + record.value());
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getPartitionsPerTopic());
/**
* Producer Settings
*/
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(PUBLISH_TOPIC);
//when 1 - test message 발행
System.out.println("[ Kafka Publish Test ] Message Publishing (1)");
template.sendDefault("test message");
//then 1 - test message가 있는지
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS); //메세지 꺼내오기
assertThat(received).has(value("test message")); //발행한 메세지가 있는지?
//when 2 - test message 2 발행
System.out.println("[ Kafka Publish Test ] Message Publishing (2)");
template.sendDefault(0, 2, "test message 2");
//then 2 - test message 2 를 보유중인지
ConsumerRecord<Integer, String> secondReceived = records.poll(10, TimeUnit.SECONDS);
assertThat(secondReceived).has(key(2)); //key가 있는지?
assertThat(secondReceived).has(value("test message 2")); //발행한 메세지가 있는지?
assertThat(secondReceived).has(partition(0)); //partition이 있는지?
}
@Test
@DisplayName("Kafka 성능 테스트")
public void messagePerformanceTest(@Autowired EmbeddedKafkaBroker embeddedKafka) throws Exception {
//Given
/**
* Consumer Settings
*/
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("performance-group", "false",
embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(PERFORMANCE_TOPIC);
/**
* Listener Settings
*/
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
Instant now = Instant.now();
System.out.println("[ Kafka Performance Test ] " + record.value() +" Time Passed : " + Duration.between(startInstant, now).toMillis() + "ms");
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getPartitionsPerTopic());
/**
* Producer Settings
*/
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(PERFORMANCE_TOPIC);
//when test message 발행
System.out.println("[ Kafka Performance Test ] Message Publishing start...");
startInstant = Instant.now(); //현재 기록
for (int i = 0; i < 100; i++) {
sendMessage(template, "Performance Test Message "+i);
}
}
@Async
void sendMessage(KafkaTemplate<Integer, String> template, String message) {
template.sendDefault(0, 2, message);
}
}
5. 결과
5-1. 메세지 발행 테스트
메세지를 발행하고 Assertion을 모두 통과했다.
5-2. 메세지 성능 테스트
비동기 추적 메세지가 계속 메세지 로그를 끊어서 설정을 꺼주었다.
[ Kafka Performance Test ] Performance Test Message 0 Time Passed : 311ms
[ Kafka Performance Test ] Performance Test Message 1 Time Passed : 312ms
[ Kafka Performance Test ] Performance Test Message 2 Time Passed : 312ms
[ Kafka Performance Test ] Performance Test Message 3 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 4 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 5 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 6 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 7 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 8 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 9 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 10 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 11 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 12 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 13 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 14 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 15 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 16 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 17 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 18 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 19 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 20 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 21 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 22 Time Passed : 313ms
[ Kafka Performance Test ] Performance Test Message 23 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 24 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 25 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 26 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 27 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 28 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 29 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 30 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 31 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 32 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 33 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 34 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 35 Time Passed : 314ms
[ Kafka Performance Test ] Performance Test Message 36 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 37 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 38 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 39 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 40 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 41 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 42 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 43 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 44 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 45 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 46 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 47 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 48 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 49 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 50 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 51 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 52 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 53 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 54 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 55 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 56 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 57 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 58 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 59 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 60 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 61 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 62 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 63 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 64 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 65 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 66 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 67 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 68 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 69 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 70 Time Passed : 315ms
[ Kafka Performance Test ] Performance Test Message 71 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 72 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 73 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 74 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 75 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 76 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 77 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 78 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 79 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 80 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 81 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 82 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 83 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 84 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 85 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 86 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 87 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 88 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 89 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 90 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 91 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 92 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 93 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 94 Time Passed : 316ms
[ Kafka Performance Test ] Performance Test Message 95 Time Passed : 317ms
[ Kafka Performance Test ] Performance Test Message 96 Time Passed : 317ms
[ Kafka Performance Test ] Performance Test Message 97 Time Passed : 317ms
[ Kafka Performance Test ] Performance Test Message 98 Time Passed : 317ms
[ Kafka Performance Test ] Performance Test Message 99 Time Passed : 317ms
메세지를 끊기지 않고 모두 받았다.
흥미로운 결과였다. 처음 메세지 발행이 311ms, 마지막 메세지 발행은 317ms이 지난 후에 받았다. 6ms 차이였다.
약 100개의 메세지를 6ms 내에 모두 처리한 것이다.
3개의 테스트 결과를 정리해보면 이렇다.
보통 100개의 메세지를 발행하는데에 2ms~4ms가 걸렸다.
여러 기업에서 Kafka 를 사용하는 이유를 알 수 있었다.
이렇게 동시성이 좋은 메세지 큐를 잘 이용한다면 좋은 서비스를 개발할 수 있을 것 같다.
'BackEnd > Spring Boot' 카테고리의 다른 글
[Spring] CORS 개념과 Spring Security 6 설정 톺아보기 (0) | 2024.05.19 |
---|---|
[ Spring ] Spring Data Redis (0) | 2024.04.14 |
[Spring] 분산 시스템에서 효과적인 데이터 전달 방법 (3) - Kafka (0) | 2023.11.11 |
[Spring] 분산 시스템에서 효과적인 데이터 전달 방법 (2) - RabbitMQ (0) | 2023.11.10 |
[Spring] 분산 시스템에서 효과적인 데이터 전달 방법 (1) - RDB (1) | 2023.11.04 |