0. ์์
์ด๋ฒคํธ๋ ๊ฐ๊ฒฐํฉ์ ์ ๊ฑฐํ๊ฑฐ๋ ๋ฐ์ดํฐ ๋๊ธฐํ ๋ฑ ์ฌ๋ฌ ์ ์ฉํ ์ํฉ์์ ์ฌ์ฉํ ์ ์๋ค.
ํ์ง๋ง, ์ธ์ ๋ ์์ธ๊ฐ ๋ฐ์ํ ์ ์๊ธฐ ๋๋ฌธ์ ๋จ์ํ๊ฒ ์ด๋ฒคํธ๋ฅผ ๋ฐํ/๊ตฌ๋ ์ ๊ตฌํํ๋ ๊ฒ๋ง์ผ๋ก ์์ ํ์ง ์๋ค.
์ฌ๊ธฐ์ "์์ ํ๋ค"๋ ์ฌ๋ฌ ์กฐ๊ฑด์ด ์์ ์ ์๊ฒ ๋ค.
- ๋ฉ์ธ์ง ๋ฐํ์ด ๋ณด์ฅ๋๋์ง
- ๋ฐํ๋ ๋ฉ์ธ์ง๊ฐ ์ฌ๋ฐ๋ฅด๊ฒ ์ ๋ฌ๋์๋์ง
- ์ ๋ฌ๋ ๋ฉ์ธ์ง๊ฐ ์ฌ๋ฐ๋ฅด๊ฒ ์ฒ๋ฆฌ๋์๋์ง
- ์ค๋ณต ๋ฉ์ธ์ง๊ฐ ์ค๋ณต ์ฒ๋ฆฌ๋์ง ์์๋์ง
- (ํ์ํ ๊ฒฝ์ฐ) ๋ฉ์ธ์ง์ ์์๊ฐ ๋ณด์ฅ๋์๋์ง
์ด๋ฒคํธ ๋ฐํ/๊ตฌ๋ ์์ ์ฌ์ํ ์ค๋ฅ ํ๋๊ฐ ์ฌ๊ฐํ ์ํฅ์ ๋ถ๋ฌ์ฌ ์ ์๊ธฐ ๋๋ฌธ์ ์์ ํจ์ ๋งค์ฐ ์ค์ํ๋ค.
์ค๋์ ์ด๋ฒคํธ ๋ฐํ ๊ณผ์ ์์ ๊ณ ๋ คํด์ผํ ์ฌ๋ฌ ์์ธ ์ํฉ์ ์์๋ณด๊ณ , ๊ทธ์ ๋ํ ๋์ฒ๋ฅผ ์์๋ณด๋๋ก ํ๊ฒ ๋ค.
1. ์์ธ
๋จผ์ , ๋ฐ์ํ ์ ์๋ ์์ธ๋ค์ ์ดํด๋ณด๊ฒ ๋ค.
๋คํธ์ํฌ ๊ด๋ จ ์์ธ - ๋ธ๋ก์ปค์์ ํต์ ์์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ
์ด๋ฒคํธ๋ฅผ ๊ด๋ฆฌํ๋ ๋ธ๋ก์ปค๋ก์์ ํต์ ์์ ์์ธ๊ฐ ๋ฐ์ํ ์ ์๋ค.
Kafka์์๋ ์๋์ ๊ฐ์ ์์ธ๋ค์ด ์๋ค.
- TimeoutException: ๋ธ๋ก์ปค ์๋ต ๋๊ธฐ ์๊ฐ ์ด๊ณผ
- SocketTimeoutException: ์์ผ ์ฐ๊ฒฐ ์๊ฐ ์ด๊ณผ
- ConnectionException: ๋ธ๋ก์ปค ์ฐ๊ฒฐ ์คํจ
- NetworkException: ์ผ๋ฐ์ ์ธ ๋คํธ์ํฌ ์ค๋ฅ
ํ ๋ฒ ๋ธ๋ก์ปค์์ ํต์ ์ด ์๋ฃ๋์๋ค๊ณ ์์ฌํ๋ฉด ์ ๋๋ค. ์ธ์ ๋ ์ง ๋ธ๋ก์ปค๋ ๋ค์ด๋ ์ ์๋ค๋ ์๊ฐ์ ๊ฐ์ง๊ณ ๊ตฌํํด์ผ ํ๋ค.
๋ฉ์ธ์ง ์ง๋ ฌํ/์ญ์ง๋ ฌํ ์์ธ
Kafka์์๋ ๋ฏธ๋ฆฌ ์ง๋ ฌํ/์ญ์ง๋ ฌํ๋ฅผ ์ํ ํด๋์ค๋ฅผ ์ค์ ํด๋๋ค.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// ...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
๋ง์ฝ ์ค์ ๊ณผ ๋ง์ง ์๋ ์ด๋ฒคํธ๊ฐ ๋ฐํ๋๋ฉด ์์ธ๋ฅผ ์ผ์ผํฌ ๊ฒ์ด๋ค.
์ด๋ Kafka๊ฐ ์๋ ๋ค๋ฅธ ํ๋ ์์ํฌ์์๋ ์ผ์ด๋ ์ ์๋ค.
๋ธ๋ก์ปค ๊ด๋ จ ์์ธ
๋ธ๋ก์ปค๊ฐ ํญ์ ๊ธฐ๋ํ๋ ๋ฉ์ธ์ง๋ฅผ ๋ฐ๋๋ค๊ณ ์๊ฐํ๋ฉด ์ ๋๋ค๊ณ ์๊ฐํ๋ค.
์๋๋ ๋ธ๋ก์ปค์์ ์ผ์ด๋ ์ ์๋ ์ฌ๋ฌ ์์ธ๋ค์ด๋ค.
- ํ ํฝ ์ด๋ฆ์ด ์๋ชป๋จ
- ํ ํฝ ์ ๊ทผ ๊ถํ์ด ์์
- ๋ณต์ ๋ณธ(Replica) ๋ถ์กฑ
- ๋ฆฌ๋ ๋ ธ๋(Leader Node) ์ฌ์ฉ ๋ถ๊ฐ
์์ ๋ํ ์ค๋ฅ๋ค์ ์ ํ๋ฆฌ์ผ์ด์ ๋จ์์ ๊ฐ์งํ์ฌ ์ฒ๋ฆฌํด์ผ ํ ๊ฒ์ด๋ค.
๋น์ฆ๋์ค ๋ก์ง ์์ธ
๋จ์ํ ์ด๋ฒคํธ ํ๋ ์์ํฌ์ ์๊ด์์ด, ๋ฉ์ธ์ง๋ฅผ ๋ฐ๊ณ ์ฒ๋ฆฌํ๋ ๊ณผ์ ์์ ์์ธ๊ฐ ๋ฐ์ํ ์ ์๋ค.
์ด๋ฒคํธ ์ข ๋ฅ, ํ๋ ์์ํฌ ๋ฑ์ ๋ฐ๋ผ์ ์ฌ๋ฌ ์์ธ๊ฐ ๋ฐ์ํ ์ ์๊ณ , ๊ทธ์ ๋ํ ๋์ฒ๋ ๋ค๋ฅด๋ค.
- ์ ํจ์ฑ ๊ฒ์ฆ ์คํจ
- ์ด๋ฒคํธ ์ค๋ณต
- ๋ฆฌ์์ค ์์
์ ํ๋ฆฌ์ผ์ด์ ์ข ๋ฃ
์ฌ์ฉ์์ ์์ฒญ์ ์ฒ๋ฆฌํ๋ ๊ณผ์ ์์ ๊ฐ์์ค๋ฌ์ด ์ ํ๋ฆฌ์ผ์ด์ ์ข ๋ฃ ๋ํ ๊ณ ๋ คํด์ผํ ์ฌ์์ด๋ค.
์ด๋ฒคํธ ๋ฐํ ์ / ํ ๋ชจ๋ ๊ณ ๋ คํด์ผ ํ ๊ฒ์ด๋ค.
์ด๋ฒคํธ ์์ ๋ณด์ฅ
์๋น์ค์ ๋ฐ๋ผ์ ๋ฐํ๋๋ ์ด๋ฒคํธ์ ์์์ ๋ฐ๋ผ ์ฒ๋ฆฌํด์ผํ ์๋ ์๋ค.
์ํฉ์ ๋ฐ๋ผ์ ์ด์ ์ด๋ฒคํธ๊ฐ ์๋ฃ๋ ํ ์คํํด์ผ ํ ์๋ ์๊ณ ,
ํ๋ ์์ํฌ์ ๋ฐ๋ผ์ ์ด๋ฒคํธ ์์๊ฐ ๋ณด์ฅ๋์ง ์๋ ๊ฒฝ์ฐ๋ ์๊ธฐ ๋๋ฌธ์ ์ด๋ฅผ ๊ณ ๋ คํด์ผ ํ ์๋ ์๋ค.
์ด๋ฒคํธ ๋ฒ์ ํ์ธ
์ํฉ์ ๋ฐ๋ผ ์ด๋ฒคํธ์ ๋ฒ์ ์ด ๋ฐ๋๋ ์์ ๋ ๊ณ ๋ คํด์ผ ํ๋ค.
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
public interface Event {
String getVersion();
String getType();
}
์์ ๊ฐ์ ์ธํฐํ์ด์ค๋ฅผ ํตํด ์ด๋ฒคํธ ๋ฒ์ ๋์ ํ๊ณ , ๊ฐ ๋ฒ์ ์ ๋ฐ๋ผ ์ด๋ฒคํธ ํ๋ก์ธ์๋ฅผ ๋ฐ๊ฟ์ผ ํ ์๋ ์๋ค.
2. ํด๊ฒฐ
๊ทธ๋ ๋ค๋ฉด, ์์ ์์ธ๋ค์ ํด๊ฒฐํ ์ ์๋ ๋ฐฉ๋ฒ๋ค์ด ๋ฌด์์ด ์์์ง ์์๋ณด๊ฒ ๋ค.
2-1. ํธ๋์ญ์ ๋๊ธฐํ
์ด๋ฒคํธ ๋ฐํ์ ์ค์ํ ์ ์ ์ด๋ฒคํธ ๋ฐํ ํจ์์ ์ด๋ฒคํธ ์คํ ํจ์์ ํธ๋์ญ์ ์ด ๋ถ๋ฆฌ๋์ด์ผ ํ๋ค๋ ๊ฒ์ด๋ค.
๋ง์ฝ ๋ค์๊ณผ ๊ฐ์ด ์ฃผ๋ฌธ์ ํ๋ฉด ์ฃผ๋ฌธ์ด ์๋ฃ๋์๋ค๋ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋ ๊ณผ์ ์ด ์๋ค๊ณ ํด๋ณด์.
๋ง์ฝ ์ด ๊ณผ์ ์ด ํ ํธ๋์ญ์ ์ ๋ฌถ์ฌ์๋ค๋ฉด, ์ฌ๊ธฐ์ ์ฃผ๋ฌธ ์๋ฃ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋ ๊ณผ์ ์ด ์คํจํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
์ฃผ๋ฌธ ์๋ฃ ๋ฉ์ธ์ง์์ ๋ฐ์ํ ์์ธ๊ฐ ํธ์ถ ์คํ์ ๋ฐ๋ผ๊ฐ์ ์ฃผ๋ฌธ์๋ ์ํฅ์ ๋ผ์ณ์, ์ฃผ๋ฌธ๋ ์คํจํ๋ ์ํฉ์ด ๋ฒ์ด์ง ๊ฒ์ด๋ค.
๋ฐ๋ผ์, ํธ๋์ญ์ ์ด ๋ชจ๋ ์๋ฃ๋ ํ์ ์ด๋ฒคํธ ๋ฐํ์ ๋๊ธฐํ ํด์ผํ๋ค.
์ฃผ๋ฌธ ์๋ฃ ๋ฉ์ธ์ง ์ ์ก์ด ์คํจํ๋๋ผ๋ ์ฃผ๋ฌธ์ ์๋ฃ๋์ด์ผ ํ๋ค.
๋ชจ๋๋ฆฌ์์์ ์ฌ์ฉํ๋ Spring Application Event์์๋ @TranscationalEventListener๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
public class OrderEventHandler {
private final EmailService emailService;
// ํธ๋์ญ์
์ปค๋ฐ ํ ์คํ
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Order created: {}", event.getOrderId());
emailService.sendOrderConfirmation(event.getOrderId());
}
// ํธ๋์ญ์
๋กค๋ฐฑ ์ ์คํ
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleOrderFailed(OrderCreatedEvent event) {
log.error("Order creation failed: {}", event.getOrderId());
// ์คํจ ์ฒ๋ฆฌ ๋ก์ง
}
}
๊ฐ phase์ ๋ฐ๋ผ ํธ๋์ญ์ ์ฑ๊ณต/ ์คํจ ํ ๋์์ ๋๋ ์ ์๋ค.
2-2. TransactionalOutbox Pattern
TransactionalOutbox ํจํด์ ๋ฐํํ ๋ฉ์ธ์ง๋ฅผ ๋ธ๋ก์ปค์ ์ ์กํ๋ฉด์ DB์ ์ ์ฅํด๋๋ ๋ฐฉ๋ฒ์ด๋ค.
Outbox๋ "๋ณด๋ธ ํธ์งํจ"์ ์๋ฏธํ๋ค.
Transactional Outbox Pattern์ ๋ณดํต ํฌ๊ฒ Polling & Publisher ๋ฐฉ๋ฒ๊ณผ Transaction Log Tailing ๋ฐฉ๋ฒ์ผ๋ก ๋๋๋ค.
2-2-1. Poliing & Publisher
Polling์ ์ผ์ ํ ์ฃผ๊ธฐ๋ฅผ ๊ฐ์ง๊ณ ์๋ฒ์ ์๋ต์ ์ฃผ๊ณ ๋ฐ๋ ๋ฐฉ๋ฒ์ ๋งํ๋ค.
๋ฐํํ ๋ฉ์ธ์ง๋ฅผ Outbox DB์ ์ ์ฅํด๋๊ณ , ์ฃผ๊ธฐ์ ์ผ๋ก pollingํ์ฌ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๋ ๋ฐฉ๋ฒ์ด๋ค.
ํด๋น ๋ฐฉ๋ฒ์ผ๋ก ์ด๋ฒคํธ ๋ฐํ์ ๋ณด์ฅํ ์ ์๋ค.
์๋ ์์ ๊ตฌํ์ผ๋ก ์ดํด๋ณด๊ฒ ๋ค.
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String aggregateType; // "ORDER", "PAYMENT" ๋ฑ
private String aggregateId; // ํด๋น ๋๋ฉ์ธ ๊ฐ์ฒด์ ID
private String eventType; // "ORDER_CREATED", "PAYMENT_COMPLETED" ๋ฑ
private String payload; // ์ด๋ฒคํธ ๋ฐ์ดํฐ (JSON)
@Enumerated(EnumType.STRING)
private EventStatus status = EventStatus.PENDING;
private int retryCount = 0;
private LocalDateTime createdAt;
private LocalDateTime processedAt;
@Version
private Long version; // ๋๊ด์ ๋ฝ์ ์ํ ๋ฒ์
}
์์ ๊ฐ์ด Outbox DB์ ์ ์ฅํ ์ด๋ฒคํธ ๊ฐ์ฒด๋ฅผ ์ ์ํ๋ค.
public void createOrder(OrderRequest request) {
//์ฃผ๋ฌธ ์์ฑ
Order order = orderRepository.save(Order.from(request));
//Outbox ์ด๋ฒคํธ ์ ์ฅ
OutboxEvent event = OutboxEvent.builder()
//...
.build();
outboxRepository.save(event);
}
์ฃผ๋ฌธ์ ์์ฑํ๋ฉด Outbox DB์ ์ด๋ฒคํธ๋ฅผ ์ ์ฅํ๋ค.
@Component
@EnableScheduling
@Slf4j
public class OutboxEventPublisher {
private final OutboxEventRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
@Scheduled(fixedRate = 1000) // 1์ด๋ง๋ค ์คํ
@Transactional
public void publishEvents() {
List<OutboxEvent> events = outboxRepository
.findByStatusAndRetryCountLessThan(
EventStatus.PENDING,
3,
PageRequest.of(0, 100)
);
for (OutboxEvent event : events) {
try {
// Kafka๋ก ์ด๋ฒคํธ ๋ฐํ
kafkaTemplate.send(
event.getAggregateType().toLowerCase(),
event.getAggregateId(),
event.getPayload()
).get(10, TimeUnit.SECONDS); // ํ์์์ ์ค์
// ๋ฐํ ์ฑ๊ณต ์ ์ํ ์
๋ฐ์ดํธ
event.setStatus(EventStatus.PUBLISHED);
event.setProcessedAt(LocalDateTime.now());
outboxRepository.save(event);
} catch (Exception e) {
log.error("Failed to publish event: {}", event.getId(), e);
// ์ฌ์๋ ํ์ ์ฆ๊ฐ
event.incrementRetryCount();
// ์ต๋ ์ฌ์๋ ํ์ ์ด๊ณผ ์
if (event.getRetryCount() >= 3) {
event.setStatus(EventStatus.FAILED);
}
outboxRepository.save(event);
}
}
}
}
์ด์ ์ผ์ ์ฃผ๊ธฐ๋ง๋ค OutboxDB๋ก๋ถํฐ ๋ฉ์ธ์ง๋ฅผ Pollingํด์ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๋ค.
์ด๋ฒคํธ์ ๋ฐํ์ด ์๋ฃ๋๋ฉด status๋ฅผ ๋ฐ๊พธ๋ ๋ฐฉ์์ผ๋ก ๊ตฌํํ๊ฑฐ๋, ๋๊ธฐ ๋ฉ์ธ์ง/์๋ฃ ๋ฉ์ธ์ง ํ ์ด๋ธ๋ก ๋๋์ด ์ฟผ๋ฆฌ ๊ณผ์ ์์ Join์ผ๋ก ์ฌ์ฉํด๋ ๊ด์ฐฎ์ ๊ฒ ๊ฐ๋ค.
@Component
@EnableScheduling
public class FailedEventProcessor {
@Scheduled(cron = "0 0 * * * *") // ๋งค์๊ฐ ์คํ
public void processFailedEvents() {
List<OutboxEvent> failedEvents = outboxRepository
.findByStatus(EventStatus.FAILED);
// ์คํจํ ์ด๋ฒคํธ ์ฒ๋ฆฌ (์๋ฆผ, ๋ก๊น
๋ฑ)
}
}
์ํ์ ๋ฐ๋ผ ์คํจํ ์ด๋ฒคํธ๋ฅผ ๋ฐ๋ก ์ฒ๋ฆฌํ ์๋ ์๋ค.
์ฌ๊ธฐ์ ํด๊ฒฐํด์ผ ํ ๋ฌธ์ ๋ ์ฌ๋ฌ๊ฐ ๋ ์๋ค.
- ์ธ์คํด์ค๋น ์ฅ์ ๋ฐ์ ์ ์คํํ ์ฌ๋ฌ ๋
ธ๋๋ฅผ ๋ฐฐ์น
- ๋ณต์ ๋ ธ๋๋ก ์ฌ์ฉํ ์ ๋์์ฑ ์ด์ -> ๋ฝํน์ ํตํด ํด๊ฒฐ
- ๋ฒ์ ๋ -> ๋ฝํน ์ฌ์ฉ
์ ๋ฐฉ๋ฒ๋ค์ ๋ค์์ ๋ ์์ธํ๊ฒ ๋ค๋ฃจ๋ ๊ธฐํ์์ ์์๋ณด๋๋ก ํ๊ฒ ๋ค.
์ด๋ก์จ ์ฃผ๋ฌธ ํธ๋์ญ์ ํ์ ๋ฉ์ธ์ง publish๋ฅผ ๋ณด์ฅํ ์ ์๋ค.
ํด๋น ๋ฐฉ๋ฒ์ ๋น๊ต์ ๊ตฌํ์ด ๊ฐ๋จํ๊ณ ํธ๋ฆฌํ์ง๋ง, DB๋ก๋ถํฐ ์ฃผ๊ธฐ์ ์ผ๋ก Pollingํ๊ธฐ ๋๋ฌธ์ DB์ ๋ถํ๊ฐ ๊ฐ ์ ์๋ค๋ ์ ์ด ๋จ์ ์ด๋ค.
2-2-2. Transaction Log Tailing ๋ฐฉ๋ฒ
Transaction Log Trailing์ DB ํธ๋์ญ์ ์ ๋ก๊ทธ๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ด๋ค.
DB์ ํธ๋์ญ์ ์ด ์ฒ๋ฆฌ๋๋ฉด ์์ฑ๋๋ ๋ก๊ทธ์ ๋ํ CDC๋ฅผ ๊ตฌํํ๋ ๋ฐฉ๋ฒ์ด๋ค.
CDC(Change Data Capture)
๋ฐ์ดํฐ ๋ณ๊ฒฝ์ ์ถ์ ํ์ฌ ๋ณ๊ฒฝ ์ฌํญ์ ์ค์๊ฐ์ผ๋ก ์ ๋ฌํ๋ ๋ฐ์ดํฐ ํตํฉ ๊ธฐ์
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ฐ๋ผ ์ฌ๋ฌ ๋ก๊ทธ๋ฅผ ํ์ฉํ ์ ์๋ค.
- MySQL binlog
- Postgres WAL
- AWS DynamoDB table streams
์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ก๊ทธ๋ฅผ ํ์ฉํ๋ ๋ฐฉ๋ฒ์ ๊ตฌํ ๋์ด๋๊ฐ ๋์ ๋ณดํต ํด์ ์ฌ์ฉํ๋๋ฐ, ๋ํ์ ์ธ ํด์ ์๋์ ๊ฐ๋ค.
- ๋๋น์ง์(Debezium)
- ๋งํฌ๋์ธ ๋ฐ์ดํฐ ๋ฒ์ค(LinkdIn Databus)
- DynamoDB ์คํธ๋ฆผ์ฆ
- ์ด๋ฒค์ถ์์ดํธ ํธ๋จ
@Component
public class MySQLBinLogTailer {
private final BinaryLogClient client;
public MySQLBinLogTailer() {
client = new BinaryLogClient(
"localhost",
3306,
"username",
"password"
);
client.registerEventListener(event -> {
if (event.getData() instanceof WriteRowsEventData) {
WriteRowsEventData writeData = (WriteRowsEventData) event.getData();
// ์๋ก์ด Row ๋ฐ์ดํฐ ์ฒ๋ฆฌ
processNewRows(writeData);
}
});
}
@PostConstruct
public void start() {
new Thread(() -> {
try {
client.connect();
} catch (IOException e) {
log.error("Failed to connect to MySQL binlog", e);
}
}).start();
}
}
์์ ๊ฐ์ด Mysql binlog๋ฅผ ํ์ฉํ๊ฑฐ๋,
{
"name": "mysql-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.name": "mysql",
"database.server.id": "1",
"database.include.list": "mydb",
"table.include.list": "mydb.outbox_events",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.fields.additional.placement": "event_type:header:eventType,aggregate_type:header:aggregateType",
"transforms.outbox.route.by.field": "aggregate_type",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
์์ ๊ฐ์ด Debezium์ ์ค์ ํด๋๊ณ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ด ์๊ฒ ๋ค.
Transaction Log Tailing ๋ฐฉ๋ฒ์ Polling & Publisher ๋ฐฉ๋ฒ์ ๋นํด Polling ์์ ์ด ์๋ค๋ ์ ์์ ์ฅ์ ์ด ์์ง๋ง,
Kafka ๋ฉ์ธ์ง ํ์์ ๋ง์ถ์ด ๋ค์ ๋ฐ์ดํฐ dto๋ฅผ ๋น๋ํด์ผ ํ๋ค.
2-3. Saga ํจํด : ํธ๋์ญ์ ๊ด๋ฆฌ
Saga ํจํด์ ์ ํ๋ฆฌ์ผ์ด์ ๋ด ํธ๋์ญ์ ์ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ํจํด์ด๋ค.
์ฌ๊ธฐ์ ํธ๋์ญ์ ์ DB๊ฐ ์๋ ์ ํ๋ฆฌ์ผ์ด์ ๋จ์ ํธ๋์ญ์ ๋ฒ์๋ฅผ ๋งํ๋ค.
ํต์ฌ์ ๋ณด์ ํธ๋์ญ์ ์ด ์๋ค๋ ๊ฒ์ด๋ค.
๋ณด์ ํธ๋์ญ์
๋ฐ๋ ํจ๊ณผ๋ฅผ ๊ฐ์ง ํธ๋์ญ์ ์ ์ฒ๋ฆฌํ์ฌ ์ ์ฌ์ ์ผ๋ก ๋๋๋ฆด ์ ์๋ ํธ๋์ญ์
์ฆ, ์ด์ ์ ์๋ฃ๋ ํธ๋์ญ์ ์ ๋กค๋ฐฑํ๊ธฐ ์ํด ์ญ์์ผ๋ก ์คํ๋๋ ํธ๋์ญ์
Sagaํจํด์ ๋ณดํต MSA ํ๊ฒฝ์์ ๋ง์ด ๋ฑ์ฅํ๋๋ฐ,
๋ชจ๋๋ฆฌ์๊ณผ ๋ฌ๋ฆฌ DB๊ฐ ๋ถ์ฐ๋์ด ์์ด ACID๊ฐ ์ง์ผ์ง๋ ํธ๋์ญ์ ์ ์ฌ์ฉํ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
์ด๋ฅผ ์์ ์๋ 2PC(2-Phase-Commit)์ผ๋ก ํด๊ฒฐํ์ง๋ง, ์ด๋ ์๋นํ ๋นํจ์จ์ ์ด๊ธฐ ๋๋ฌธ์ Saga ํจํด์ด ํด๊ฒฐ์ ์ผ๋ก ๋ฑ์ฅํ๋ค.
Saga ํจํด์ ํฌ๊ฒ ์ฝ๋ ์ค๊ทธ๋ํผ(Choreography)์ ์ค์ผ์คํธ๋ ์ด์ (Orchestration)์ด ์๋ค.
Choreography
์ฝ๋ ์ค๊ทธ๋ํผ๋ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ฅผ ์ด์ฉํด์ ์ค์ ์ง์ค์ ์ ์ด ์ง์ ์์ด ์ด๋ฒคํธ๋ฅผ ์ ์กํ๋ ๋ฐฉ๋ฒ์ด๋ค.
๊ฐ ์๋น์ค์ ๋ก์ปฌ ํธ๋์ญ์ ์ด ์๋ฃ๋๋ฉด ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๊ณ , ๋ค์ ์ํํด์ผํ ์ฑ์ด ๋ฐ์์ ๋ค์ ๋ก์ปฌ ํธ๋์ญ์ ์ ์คํํ๋ ๋ฐฉ์์ผ๋ก ์ฌ์ฉ๋๋ค.
๋ง์ฝ ์ค๊ฐ์ ํธ๋์ญ์ ์ด ์คํจํ๋ฉด ์คํจํ ์ฑ์์ ๋ณด์ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ์ฌ ๋กค๋ฐฑ์ ์ํ ๋ณด์ ํธ๋์ญ์ ์ ๊ฐ ์ฑ์ ์ค์ํ๋๋ก ๊ตฌ์ฑํ๋ค.
ํด๋น ๋ฐฉ๋ฒ์ ์ ์ง/๊ด๋ฆฌ๊ฐ ํ์ํ์ง ์๊ณ ๊ตฌํ์ด ๊ฐ๋จํ์ง๋ง Saga ์ฐธ๊ฐ์๊ฐ ์ด๋ค ๋ช ๋ น์ ์์ ๋๊ธฐํ๋์ง ์ถ์ ํ๊ธฐ ์ด๋ ต๊ณ , ํธ๋์ญ์ ์ ์ํ๋ฅผ ํ์ธํ๊ธฐ ์ด๋ ต๋ค๋ ๋จ์ ์ด ์๋ค.
Orchestration
์ค์ผ์คํธ๋ ์ด์ ์ ์ค์ ์ง์ค์ ์ปจํธ๋กค๋ฌ(Orchestrator)๊ฐ ์คํํ ๋ก์ปฌ ํธ๋์ญ์ ์ ์๋ ค์ฃผ๋ ๋ฐฉ๋ฒ์ด๋ค.
์ค์ผ์คํธ๋ ์ดํฐ๋ ๋ชจ๋ ํธ๋์ญ์ ์ ์ฒ๋ฆฌํ๊ณ , ์ด๋ฒคํธ์ ๋ฐ๋ผ ์ํํ ์์ ์ ๊ฐ ์ฐธ๊ฐ์ํํ ์๋ ค์ค๋ค.
๊ฐ ์ฐธ๊ฐ์๋ ์์ฒญ์ ์คํํ๊ณ , ์คํจ ์ ๋ณด์ ํธ๋์ญ์ ์ ์ฌ์ฉํ์ฌ ์ค๋ฅ ๋ณต๊ตฌ๋ฅผ ์๋ํ๋ค.
2-4. ์ฌ์ ์ก : ์ผ์์ ๋ฌธ์ ํด๊ฒฐ
์ด๋ฒคํธ๋ ์ด๋ค ์ด์ ์์๋ผ๋ ์ ์ก์ ์คํจํ ์ ์๋ค.
์ ์ก์ ์คํจํ ์ด๋ฒคํธ๋ฅผ ์ฌ์๋ ํ๋๊ฒ์ ์์ฐ์ค๋ฌ์ด ์์์ธ๋ฐ,
์ด๋ ์ด๋์ ๋ ์ผ์์ ์ธ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๊ฒ ์ง๋ง, ์๊ตฌ์ ์ธ ๋ฌธ์ ๋ ํด๊ฒฐํ ์ ์๋ค.
์ฌ์๋ ์ ์ฑ ์ ๊ธฐ๋ณธ์ ์ผ๋ก Spring Retry๋ฅผ ์ด์ฉํด์ ๊ตฌํํ ์ ์๋ค.
Spring Retry๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด @EnableRetry๋ฅผ ์ค์ ํด์ผ ํ๋ค.
@Retryable(
value = {KafkaException.class, TimeoutException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void publishEvent(String topic, String key, Object event) {
// ๋ฉ์ธ์ง ์ ์ก ๋ก์ง
}
@Recover
public void handlePublishFailure(Exception e, String topic, String key, Object event) {
// ์คํจ ์ฒ๋ฆฌ ๋ก์ง
}
์ต๋ ์ ์ก ํ์๋ maxAttempts, ์ฌ์๋ ๋ฉ์ธ์ง๊ฐ ๊ฐ๊ฒฉ์ backoff๋ก ์ค์ ํ ์ ์๋ค.
์คํจํ ๋ฉ์ธ์ง์ ๋ํ ์ฒ๋ฆฌ๋ @Recover๋ฅผ ํตํด์ ์ฒ๋ฆฌํ ์ ์๋ค.
2-5. DLT (Dead Letter Topic) : ์คํจํ ๋ฉ์ธ์ง ๊ด๋ฆฌํ๊ธฐ
DLQ๋ AWS SQS์์ ์ฌ์ฉํ๋ ์๋น๋์ง ๋ชปํ ๋ฉ์ธ์ง(Dead Letter)๋ฅผ ๋ด์๋๋ ๋๊ธฐ์ด(ํ)์ด๋ค.
์คํจ๋ ๋ฉ์ธ์ง๋ฅผ Lambda๋ฅผ ํตํด ์ด๋ค ์ฒ๋ฆฌ๋ฅผ ํ ์ง ์ ํด๋ ์ ์๋ค.
Kafka์์๋ DLQ๊ฐ ์๋ DLT(Dead Letter Topic)์ ์ฌ์ฉํ๋ค.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
Kafka ๋ฉ์ธ์ง ์ ์ก์ด ์คํจํ๋ ๊ฒฝ์ฐ๋ฅผ ํ์ธํ๋ ค๋ฉด ์๋ ACK๋ชจ๋๋ฅผ ์ฌ์ฉํ๋ ์ค์ ์ด ๊ถ์ฅ๋๋ค.
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
๋ฉ์๋๊ฐ ๊ฐ์ ํํ์ @RetryableTopic๊ณผ @DltHandler๋ฅผ ๋ถ์ธ ๋ฉ์๋๋ก DLT๋ฅผ ํ์ฉํ ์ ์๋ค.
๋๋, RetryTopicConfiguration์ ํตํด ์ค์ ํด๋ ์ ์๋ค.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
@RetryableTopic์ ์ฌ๋ฌ ์ ํธ๋ฆฌ๋ทฐํธ๋ฅผ ๋ฌ์๋์ผ๋ฉด ์ฝ๋๊ฐ ์ค๋ณต๋๊ณ ๋ณต์กํด์ง ์ ์๋ค.
RetryTopicConfiguration์ ์ฌ์ฉํ๋ฉด ์ ์ญ์ ์ผ๋ก ์ฌ์๋ ํ ํฝ๊ณผ DLT ํ ํฝ์ด ์ ์ฉ๋๋ฏ๋ก ๋ ๊น๋ํ ์ ์ฉ์ธ ๊ฒ ๊ฐ๋ค.
์ฌ๊ธฐ์ ์ค์ํ ์ ์, @KafkaListener์ Factory ์ค์ ๊ณผ ๋์ผํด์ผ MessageConversionException์ ํผํ ์ ์๋ค.
์ฐธ๊ณ
https://ridicorp.com/story/transactional-outbox-pattern-ridi/
https://giron.tistory.com/156
https://docs.spring.io/spring-kafka/reference/index.html
https://docs.spring.io/spring-kafka/reference/kafka/micrometer.html
https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html
https://hvho.github.io/2021-12-05/spring-retry
https://junuuu.tistory.com/795
https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/
https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html
https://velog.io/@wwlee94/Kafka-%EC%9E%AC%EC%8B%9C%EB%8F%84-DLT-%EB%B9%8C%EB%8D%94-%EC%A0%91%EA%B7%BC-%EB%B0%A9%EC%8B%9D%EC%9C%BC%EB%A1%9C-%EB%A6%AC%ED%8C%A9%ED%86%A0%EB%A7%81
'BackEnd' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[MSA] ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ(EDA)์ ์ด์ ์ ๋ฐ๋์งํ ์ค๊ณ (0) | 2024.12.28 |
---|