[์ด์ ํฌ์คํ ] RDB๋ฅผ ์ฌ์ฉํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ
2. RabbitMQ๋ฅผ ์ฌ์ฉํ ์ ๋ฌ ๋ฐฉ๋ฒ
RabbitMQ๋ ๊ธฐ๋ณธ์ ์ผ๋ก AMQP(Advanced Mssage Queueing Protocol)๋ฅผ ๊ตฌํํ ๋ธ๋ก์ปค๋ค.
์์ ์ ๊ณต๋ถํ๋ Publish/Subscribe ๋ฐฉ์์ ์ง์ํ๋ค.
๋ํ, RabbitMQ๋ ACK(Acknowledgement) ๋ฉ์ปค๋์ฆ์ ๊ฐ๊ณ ์๋ค.
Publisher๊ฐ ๋ฉ์ธ์ง๋ฅผ Queue์ ์ ์์ ์ผ๋ก ๋ฐํํ๋ค๋ฉด Producer Confirm์,
Consumer๊ฐ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฐ์๋ค๋ฉด Consumer Ack์ ๋ณด๋ด๊ฒ ๋๋ค.
1. Producer Confirm
Publisher๊ฐ ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ๋ฉด, Exchange๊ฐ ๋ฉ์ธ์ง๋ฅผ ๋ฐ์์ Queue์ ์ ์ฅํ๋ค.
์ด ๊ณผ์ ์ด ์๋ฃ๋์๋ค๋ฉด Exchange๋ Publisher์๊ฒ ACK๋ฅผ ๋ณด๋ด๊ฒ ๋๋ค.
๊ทธ๋ ๋ค๋ฉด ์คํ๋ง์์ Producer Confirm์ ํ์ธํ ์ ์๋ ์ฝ๋ ์์ ๋ฅผ ๋ณด์.
@Service
public class MessagePublisher {
public void sendMessage(CreateTaskEvent createTaskEvent) throws JsonProcessingException{
String json = objectMapper.writeValueAsString(createTaskEvent);
rabbitTemplate.send(EXCHANGE_NAME,
ROUTING_KEY,
new Message(json.getBytes(StandardCharsets.UTF_8)),
new CorrelationData(UUID.randomUUID().toString()));
}
}
์คํ๋ง์์ ์ ๊ณตํ๋ CorrelationData ํด๋์ค๋ Producer Confirm์ ํ์ธํ ์ ์๋ ๊ธฐ๋ณธ ํด๋์ค์ด๋ค.
์์ ๊ฐ์ด RabbitTemplate์ ํตํด ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ผ ๋, CorrelationData๊ฐ์ฒด๋ฅผ UUID์๊ฐ์ IDํํ๋ฅผ ๊ฐ์ง๋ ๊ฐ์ ์์ฑํ์ฌ ์ ๋ฌํ๋ฉด,
๋์ค์ callback์ ๋ฐ์ ์ ์๋ค.
@Configuration
@Slf4j
public class RabbitConfiguration {
//...
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//...
//Producer Confirm
rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
if (!ack) {
Message message = correlationData.getReturned().getMessage();
byte[] body = message.getBody();
log.error("Fail to produce. ID: {}", correlationData.getId());
}
}));
return rabbitTemplate;
}
}
์์ ๊ฐ์ด Configuration์์, rabbitTemplate์ setConfirmCallback๋ฉ์๋์ Producer Confirm ๊ตฌํ์ฒด๋ฅผ ๋ฃ์ด์ฃผ๋ฉด ๋๋ค.
์ฌ๊ธฐ์ ์ ๋ฌ๋๋ ์ธ ์ธ์๋ฅผ ์ดํด๋ณด๊ฒ ๋ค.
1. correlationData
- ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ ๋ ์์ฑํ ๊ฐ์ฒด
2. ack
- boolean์ด๋ค. ์ฑ๊ณต/์คํจ๋ฅผ ๋ํ๋ธ๋ค.
3. cause
- ์คํจ ์์ธ์ ๋ด๊ณ ์๋ค.
์์ ๊ฐ์ด ์ฝ๋ฐฑ์ ๊ตฌํํด๋์๋ค๋ฉด ๋ก๊ทธ์์ correlationData ๊ฐ์ฒด์ ๋ฃ์ด๋์๋ ID๋ฅผ ํ์ธํ ์ ์๋ค.
์ Publisher Confirm์ ์ฌ์ฉํ๋ ค๋ฉด ์ค์ ์ ํด์ผํ๋๋ฐ,
Spring Boot๋ฅผ ์ฌ์ฉ ์ค์ด๋ผ๋ฉด,
โ applicaiton.yml
spring:
rabbitmq:
publisher-confirm-type: correlated
Spring์ ์ฌ์ฉ ์ค์ด๋ผ๋ฉด, ์๋์ ๊ฐ์ด ์คํ๋ง ๋น์ ์ง์ ์ ์ํด๋ ๋๋ค.
โ RabbitConfiguration
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
//...
cf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
return cf;
}
2. Consumer Acknowledge
Consumer๊ฐ Queue์ ์๋ Message๋ฅผ ์ ์์ ์ผ๋ก ๊ฐ์ ธ์๋ค๋ฉด ACK๋ฅผ ๋ณด๋ด๋๋ฐ, ์ด๋ฅผ Consumer Acknowledge๋ผ๊ณ ํ๋ค.
Consumer Acknowledge๋ฅผ ๊ตฌํํ๋ ค๋ฉด Channel๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด์ผํ๋ค.
@Component
public class MessageListner {
@RabbitListener(queues = "test.task")
public void receiveMessage(Message message, Channel channel) throws IOException {
//์๋ ACK ์ ์ก
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//์๋ NACK ์ ์ก
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
์ด Channel๊ฐ์ฒด๋ ๋ฉ์ธ์ง ํ์ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฐ๊ฒฐ์ ์ถ์ํํ ํด๋์ค์ด๋ค.
์ด ํด๋์ค๋ฅผ ์ด์ฉํ๋ฉด Publish, Listen์ ๋ชจ๋ ์ฌ์ฉํ ์ ์๊ณ , ์ง์ ACK์ NACK์ ์ ์กํ ์ ์๋ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
์ ์ฝ๋์์๋ RabbitListner๋ฅผ ๊ตฌํํ ๋, message์ธ์ Channel๊ฐ์ฒด๋ฅผ ๊ฐ์ด ๋ฐ์ผ๋ฉด ์คํ๋ง ํ๋ ์์ํฌ์์ ์๋ง์ ๊ฐ์ฒด๋ฅผ ์ ๋ฌํด์ค๋ค.
์ฌ์ค์ Channel ๊ฐ์ฒด๋ฅผ ์ง์ ๋ฐ์ง ์๋๋ผ๋, ํ๋ ์์ํฌ๊ฐ Exception์ด ๋ฐ์ํ๋ฉด ์๋์ผ๋ก NACK๋ฅผ ๋ณด๋ด์ฃผ๊ธฐ ๋๋ฌธ์ ์ง์ ๊ตฌํํ ํ์๋ ์๋ค.
3. Dead Letter Queue
๊ทธ๋ฐ๋ฐ ๋ง์ฝ, Consumer์์ ๊ณ์ NACK์ ๋ณด๋ด๋ ์ํฉ์ด๊ณ , Queue์๋ ๋ฉ์ธ์ง๊ฐ ๊ณ์ ์์ด๋ ์ค์ด๋ผ๋ฉด ์ด๋ป๊ฒ ํด์ผ๋ ๊น?
์ด ๊ฒฝ์ฐ์, Dead Letterํ๊ฒฝ์ ๊ตฌ์ถํ๋ค.
Queue์์ ์ ์์ ์ผ๋ก ์ฒ๋ฆฌ๋์ง ๋ชปํ ๋ฉ์ธ์ง๋ค์ Dead Letter Queue์ ๋๊ธด๋ค.
์กฐ๊ฑด์ ํฌ๊ฒ ์๋ ์ธ ๊ฐ์ง๊ฐ ์๋ค.
1. requeue = false์ด๊ณ , basic reject, basic NACK๋ก ์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ
requeue = true๋ผ๋ฉด ๋ฉ์ธ์ง๊ฐ dead letter๋ก ๊ฐ์ง ์๊ณ ๋ฐ๋ก ์๋ ์๋ queue๋ก ๋๋์๊ฐ๋ค.
2. queue์ TTL(Time To Live)์ด ๋์ด๊ฐ ๊ฒฝ์ฐ (์ค๋๋ ๊ฒฝ์ฐ)
3. queue๊ฐ ๊ฐ๋ ์ฐจ์ ์ฒ๋ฆฌํ ์ ์๋ ์ํ๊ฐ ๋๋ ๊ฒฝ์ฐ
์๋๋ Dead Letter๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํ ์ค์ ์ด๋ค.
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connectionFactory) {
//...
var containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setAdviceChain(
RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2, 2000)
.recoverer(new RejectAndDontRequeueRecoverer())
.build()
);
return containerFactory;
}
์์ ๊ฐ์ด ์ค์ ํด๋๋ฉด, ์ต๋ 3๋ฒ ์คํํ๊ณ , retry ๊ฐ๊ฒฉ์ ๊ธฐ๋ณธ์ผ๋ก 1000(1์ด0), ๊ฐ๊ฒฉ์ ๋ฐฐ์๋ 2, ์ต๋ ๊ฐ๊ฒฉ์ 2000(2์ด)๊ฐ ๋๋ค.
์๋ recoeverer์๋ RejectAndDontRequeueRecoverer ํด๋์ค๋ฅผ ์ค์ ํด๋์ด์ผ Requeue๋ฅผ ํ์ง ์๋๋ค.
์๋๋ Dead Letter Queue๋ก ์จ ๋ฉ์ธ์ง๋ค์ ์ฒ๋ฆฌํ๋ ์์ ์ด๋ค.
@RabbitListener(
queues = "test.deadletter",
containerFactory = "deadLetterContainerFactory"
)
public void onDeadLetterMessage(Map<String, Object> rawEvent) {
log.info("Dead Letter Message");
// Alert failure or fallback
}
RabbitListener๋ก ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ผ๋ฉด ๋๊ณ , ์ถ๊ฐ๋ก ๋ก๊ทธ๋ฅผ ์ ์ฅํ๊ฑฐ๋ fallback์ ์๋ํ๋ฉด ๋๋ค.
์ด๋ก์จ RabbitMQ๋ฅผ ์ฌ์ฉํ๋ ํ๊ฒฝ์์ Producer Confim์ ํ์ธํ๋ ๋ฐฉ๋ฒ, Consumer ACK์ ๋ณด๋ด๋ ๊ณผ์ , Dead Letter Queue ํ๊ฒฝ ๊ตฌ์ถ์ ์์๋ณด์๋ค.
๋ค์ ํฌ์คํ ์์๋ Kafka๋ฅผ ์ฌ์ฉํ๋ ํ๊ฒฝ์์ ๋ฐ์ดํฐ ์ ๋ฌ ํ๊ฒฝ์ ๊ตฌ์ถํด ๋ณด๊ฒ ๋ค.
์ฐธ๊ณ
https://blog.rabbitmq.com/posts/2011/02/introducing-publisher-confirms/
https://spring.io/guides/gs/messaging-rabbitmq/
https://www.youtube.com/watch?v=uk5fRLUsBfk&t=1797s&ab_channel=NHNCloud
'BackEnd > Spring Boot' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Kafka] JUnit5 Kafka ๋จ์ ๋ฉ์ธ์ง ๋ฐํ, ์ฑ๋ฅ ํ ์คํธ (0) | 2024.04.03 |
---|---|
[Spring] ๋ถ์ฐ ์์คํ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ (3) - Kafka (0) | 2023.11.11 |
[Spring] ๋ถ์ฐ ์์คํ ์์ ํจ๊ณผ์ ์ธ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐฉ๋ฒ (1) - RDB (1) | 2023.11.04 |
[Spring] Docker์์ Static File ์ฒ๋ฆฌํ๊ธฐ (0) | 2023.09.02 |
[Spring] MongoDB ์ ์ฉ, ํ ์คํธ (0) | 2023.09.02 |