[์ด์ ํฌ์คํ ] RDB๋ฅผ ์ฌ์ฉํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ
[์ด์ ํฌ์คํ ] RabbitMQ๋ฅผ ์ฌ์ฉํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ
2. Kafka๋ฅผ ์ฌ์ฉํ ์ ๋ฌ ๋ฐฉ๋ฒ
2-1. Producer Confirm
์๋๋ Producer Confirm์ ๊ตฌํํ ํด๋์ค์ด๋ค.
- Spring Boot 2 ๋ฒ์
@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {
public void sendEvent(CreateTaskEvent event) {
ListenableFuture<sendResult<String, CreateTaskEvent> future =
kafkaTemplate.send(TOPIC_TASK, event);
future.addCallback(
result -> log.info("offset : {}" , result.getRecordMetadata().offset),
throwable -> log.error("fail to publish", throwable)
);
}
}
์๋ ์คํ๋ง๋ถํธ 2๋ฒ์ ์์ ๊ตฌํํ Producer Confirm์ด๋ค.
์คํ๋ง๋ถํธ 2๋ฒ์ ์์๋ KafkaTemplate์ send()๋ฉ์๋๊ฐ ListenableFuture๋ฅผ ๋ฆฌํดํด์ ํด๋น ์ฝ๋๊ฐ ๋์ง๋ง,
์คํ๋ง๋ถํธ 3์์ ListenableFuture๊ฐ deprecate ๋์๋ค.
CompletableFuture์๋ callback๋ฉ์๋๊ฐ ์๊ธฐ ๋๋ฌธ์, whenComplete()๋ฉ์๋๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค.
- Spring boot 3 ๋ฒ์
@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {
public void sendEvent(CreateTaskEvent event) {
CompletableFuture<SendResult<String, CreateTaskEvent>> future
= kafkaTemplate.send(TOPIC_TASK, event);
future.whenComplete(((result, throwable) -> {
if (throwable == null) {
log.info("offset : {}", result.getRecordMetadata().offset());
} else {
log.error("fail to publish", throwable);
}
}
));
}
}
์ด๋ ๊ฒ kafka ํ๊ฒฝ์์ ์ฑ๊ณต, ์คํจ์ ๋ํ callback์ ๊ตฌํํ ์ ์๋ค.
2-2. Consumer ACK
Consumer ACK์ AcknowledgingMessageListner ์ธํฐํ์ด์ค๋ฅผ ์ด์ฉํ๋ค.
@FunctionalInterface
public interface AcknowledgingMessageListener<K, V> extends MessageListener<K, V> {
/**
* Invoked with data from kafka. Containers should never call this since it they
* will detect that we are an acknowledging listener.
* @param data the data to be processed.
*/
@Override
default void onMessage(ConsumerRecord<K, V> data) {
throw new UnsupportedOperationException("Container should never call this");
}
/**
* Invoked with data from kafka.
* @param data the data to be processed.
* @param acknowledgment the acknowledgment.
*/
@Override
void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment);
}
ํด๋น ์ธํฐํ์ด์ค์ ์๋ onMessage ๋ฉ์๋์์ ๋ ๋ฒ์งธ ์ธ์๋ก Acknowledgement๋ฅผ ์ ๋ฌํ ์ ์๋๋ฐ, ์ด๋ฅผ ์ด์ฉํด์ Consumer ACK๋ฅผ ๊ตฌํํ๋ค.
์๋๋ Consumer ACK์ ๊ตฌํํ ์ฝ๋์ด๋ค.
@Component
@Slf4j
public class MyAcknowledgementMessageListner implements AcknowledgingMessageListener<String, String> {
@Override
@KafkaListener(
//...
containerFactory = "KafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
try {
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Error to receive messages. ", e);
}
}
}
AcknowledgingMessageListener๋ฅผ ์์๋ฐ์ onMessage๋ฅผ Overrideํ๋ค.
์ธ์๋ก ๋ฐ์ Acknowledgement์ acknowledge()๋ฉ์๋๋ฅผ ์คํํ๋ฉด ์ฑ๊ณตํ Consumer ACK์ด ์ ์ก๋๋ค.
์ด๋ ๊ฒ ์๋์ ์ผ๋ก ACK์ ๋ณด๋ด๋ ค๋ฉด ๋ฏธ๋ฆฌ ํด๋์ด์ผ ํ ์ค์ ์ด ์๋ค.
@EnableKafka
@Configuration
public class KafkaConfig {
//...
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//Setting for Consumer ACK
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(props);
}
//...
}
CosnumerFactory ์ค์ ์์, AUTO COMMIT์ false๋ก ์ค์ ํด์ ์ง์ ACK, NACK๋ฅผ ๋ ๋ฆด ์ ์๋๋ก ์ค์ ํด๋๋ค.
์ด๋ก์จ RDB, RabbitMQ, Kafka ํ๊ฒฝ์์ ์๋ ์กฐ๊ฑด์ ์ถฉ์กฑํ๋ค.
- At Least Once : ์ต์ ํ ๋ฒ ์ ๋ฌ ๊ตฌํ
- Producer Confirm, Consumer ACK
์์ผ๋ก MSA๊ตฌ์กฐ๋ก ํ๋ก์ ํธ๋ฅผ ๊ณํํ๊ฒ ๋๋ค๋ฉด, ๋ฐฐ์ด ์ ์ ์ด์ฉํ์ฌ ์์ ์ฑ์๋ ๋ฐ์ดํฐ ์ ๋ฌ์ ๊ตฌํํ ์ ์๊ฒ ๋ค.
์ฐธ๊ณ
https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-3.0-Migration-Guide
https://spring.io/projects/spring-kafka
https://www.youtube.com/watch?v=uk5fRLUsBfk&t=1797s&ab_channel=NHNCloud
'BackEnd > Spring Boot' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[ Spring ] Spring Data Redis (0) | 2024.04.14 |
---|---|
[Kafka] JUnit5 Kafka ๋จ์ ๋ฉ์ธ์ง ๋ฐํ, ์ฑ๋ฅ ํ ์คํธ (0) | 2024.04.03 |
[Spring] ๋ถ์ฐ ์์คํ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ (2) - RabbitMQ (0) | 2023.11.10 |
[Spring] ๋ถ์ฐ ์์คํ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ (1) - RDB (1) | 2023.11.04 |
[Spring] Docker์์ Static File ์ฒ๋ฆฌํ๊ธฐ (0) | 2023.09.02 |