BackEnd/Spring Boot

[Spring] ๋ถ„์‚ฐ ์‹œ์Šคํ…œ์—์„œ ํšจ๊ณผ์ ์ธ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐฉ๋ฒ• (3) - Kafka

ddonghyeo 2023. 11. 11. 00:44

[์ด์ „ ํฌ์ŠคํŒ…] RDB๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ํšจ๊ณผ์ ์ธ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐฉ๋ฒ•

2023.11.04 - [BackEnd/Spring] - [Spring] ๋ถ„์‚ฐ ์‹œ์Šคํ…œ์—์„œ ํšจ๊ณผ์ ์ธ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐฉ๋ฒ• (1) - Transaction Outbox Pattern & Polling Publisher Pattern

 

[์ด์ „ ํฌ์ŠคํŒ…] RabbitMQ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ํšจ๊ณผ์ ์ธ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐฉ๋ฒ•

2023.11.10 - [BackEnd/Spring] - [Spring] ๋ถ„์‚ฐ ์‹œ์Šคํ…œ์—์„œ ํšจ๊ณผ์ ์ธ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐฉ๋ฒ• (2) - RabbitMQ

 

2. Kafka๋ฅผ ์‚ฌ์šฉํ•œ ์ „๋‹ฌ ๋ฐฉ๋ฒ•

 

2-1. Producer Confirm

 

์•„๋ž˜๋Š” Producer Confirm์„ ๊ตฌํ˜„ํ•œ ํด๋ž˜์Šค์ด๋‹ค.

 

- Spring Boot 2 ๋ฒ„์ „

@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {

    public void sendEvent(CreateTaskEvent event) {
    
        ListenableFuture<sendResult<String, CreateTaskEvent> future = 
        kafkaTemplate.send(TOPIC_TASK, event);
        
        future.addCallback(
                result -> log.info("offset : {}" , result.getRecordMetadata().offset),
                throwable -> log.error("fail to publish", throwable)
                
        );
    }
}

 

์œ„๋Š” ์Šคํ”„๋ง๋ถ€ํŠธ 2๋ฒ„์ „์—์„œ ๊ตฌํ˜„ํ•œ Producer Confirm์ด๋‹ค.

 

์Šคํ”„๋ง๋ถ€ํŠธ 2๋ฒ„์ „์—์„œ๋Š” KafkaTemplate์˜ send()๋ฉ”์„œ๋“œ๊ฐ€ ListenableFuture๋ฅผ ๋ฆฌํ„ดํ•ด์„œ ํ•ด๋‹น ์ฝ”๋“œ๊ฐ€ ๋˜์ง€๋งŒ,

 

์Šคํ”„๋ง๋ถ€ํŠธ 3์—์„  ListenableFuture๊ฐ€ deprecate ๋˜์—ˆ๋‹ค.

 

CompletableFuture์—๋Š” callback๋ฉ”์„œ๋“œ๊ฐ€ ์—†๊ธฐ ๋•Œ๋ฌธ์—, whenComplete()๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

 

- Spring boot 3 ๋ฒ„์ „

@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {

    public void sendEvent(CreateTaskEvent event) {
    
        CompletableFuture<SendResult<String, CreateTaskEvent>> future 
        = kafkaTemplate.send(TOPIC_TASK, event);
        
        future.whenComplete(((result, throwable) -> {
            if (throwable == null) {
                log.info("offset : {}", result.getRecordMetadata().offset());
            } else {
                log.error("fail to publish", throwable);
            }
        }
        ));
    }
}

 

์ด๋ ‡๊ฒŒ kafka ํ™˜๊ฒฝ์—์„œ ์„ฑ๊ณต, ์‹คํŒจ์— ๋Œ€ํ•œ callback์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

2-2. Consumer ACK

Consumer ACK์€ AcknowledgingMessageListner ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ด์šฉํ•œ๋‹ค.

@FunctionalInterface
public interface AcknowledgingMessageListener<K, V> extends MessageListener<K, V> {

	/**
	 * Invoked with data from kafka. Containers should never call this since it they
	 * will detect that we are an acknowledging listener.
	 * @param data the data to be processed.
	 */
	@Override
	default void onMessage(ConsumerRecord<K, V> data) {
		throw new UnsupportedOperationException("Container should never call this");
	}

	/**
	 * Invoked with data from kafka.
	 * @param data the data to be processed.
	 * @param acknowledgment the acknowledgment.
	 */
	@Override
	void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment);

}

 

ํ•ด๋‹น ์ธํ„ฐํŽ˜์ด์Šค์— ์žˆ๋Š” onMessage ๋ฉ”์„œ๋“œ์—์„œ ๋‘ ๋ฒˆ์งธ ์ธ์ž๋กœ Acknowledgement๋ฅผ ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ์ด๋ฅผ ์ด์šฉํ•ด์„œ Consumer ACK๋ฅผ ๊ตฌํ˜„ํ•œ๋‹ค.

 

 

์•„๋ž˜๋Š” Consumer ACK์„ ๊ตฌํ˜„ํ•œ ์ฝ”๋“œ์ด๋‹ค.

 

@Component
@Slf4j
public class MyAcknowledgementMessageListner implements AcknowledgingMessageListener<String, String> {

    @Override
    @KafkaListener(
            //...
            containerFactory = "KafkaListenerContainerFactory"
    )
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        try {
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Error to receive messages. ", e);
        }
    }
}

 

AcknowledgingMessageListener๋ฅผ ์ƒ์†๋ฐ›์•„ onMessage๋ฅผ Overrideํ•œ๋‹ค.

 

์ธ์ž๋กœ ๋ฐ›์€ Acknowledgement์˜ acknowledge()๋ฉ”์„œ๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ์„ฑ๊ณตํ•œ Consumer ACK์ด ์ „์†ก๋œ๋‹ค.

 

 

์ด๋ ‡๊ฒŒ ์ˆ˜๋™์ ์œผ๋กœ ACK์„ ๋ณด๋‚ด๋ ค๋ฉด ๋ฏธ๋ฆฌ ํ•ด๋‘์–ด์•ผ ํ•  ์„ค์ •์ด ์žˆ๋‹ค.

 

@EnableKafka
@Configuration
public class KafkaConfig {

	//...

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //Setting for Consumer ACK
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    //...
}

 

CosnumerFactory ์„ค์ •์—์„œ, AUTO COMMIT์„ false๋กœ ์„ค์ •ํ•ด์„œ ์ง์ ‘ ACK, NACK๋ฅผ ๋‚ ๋ฆด ์ˆ˜ ์žˆ๋„๋ก ์„ค์ •ํ•ด๋‘”๋‹ค.

 

 

 

 

 

 


 

์ด๋กœ์จ RDB, RabbitMQ, Kafka ํ™˜๊ฒฝ์—์„œ ์•„๋ž˜ ์กฐ๊ฑด์„ ์ถฉ์กฑํ–ˆ๋‹ค.

  • At Least Once : ์ตœ์†Œ ํ•œ ๋ฒˆ ์ „๋‹ฌ ๊ตฌํ˜„
  • Producer Confirm, Consumer ACK

์•ž์œผ๋กœ MSA๊ตฌ์กฐ๋กœ ํ”„๋กœ์ ํŠธ๋ฅผ ๊ณ„ํšํ•˜๊ฒŒ ๋œ๋‹ค๋ฉด, ๋ฐฐ์šด ์ ์„ ์ด์šฉํ•˜์—ฌ ์•ˆ์ •์„ฑ์žˆ๋Š” ๋ฐ์ดํ„ฐ ์ „๋‹ฌ์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๊ฒ ๋‹ค.

 

 

 

 

 

์ฐธ๊ณ 
https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-3.0-Migration-Guide
https://spring.io/projects/spring-kafka
https://www.youtube.com/watch?v=uk5fRLUsBfk&t=1797s&ab_channel=NHNCloud