DevOps/Kafka

[Kafka] Spring-Kafka API ์ ์šฉ & ํ…Œ์ŠคํŠธ

ddonghyeo 2023. 9. 19. 20:49

Kafka version : 3.0.10

 

spring-kafka ๊ณต์‹๋ฌธ์„œ๋ฅผ ๋งŽ์ด ์ฝ์–ด๋ณด์ž.

https://docs.spring.io/spring-kafka/reference/index.html

 

Overview :: Spring Kafka

Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.

docs.spring.io

 

- ์ „์†ก ( Producer )

 

๋ฉ”์„ธ์ง€๋ฅผ ์ „์†กํ•˜๋Š”๊ฑด KafkaTemplate ํด๋ž˜์Šค๋ฅผ ์ด์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

0. ์„ค์ •

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

KafkaTemplate์„ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„ , producerFactory๋ฅผ ๋นˆ์œผ๋กœ ๋“ฑ๋กํ•˜๊ณ  ์ƒ์„ฑ์ž๋กœ ์ œ๊ณตํ•œ๋‹ค.

 

 

 

1. ์ „์†ก

 

kafkaTemplate์„ ์„ ์–ธํ•ด์ฃผ์ž.

private final KafkaTemplate<String, String> kafkaTemplate;

 

Kafka์˜ send๋ฉ”์„œ๋“œ๋ฅผ ์‚ดํŽด๋ณด๊ฒ ๋‹ค.

 

- KafkaTemplate.java

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

 

topic๊ณผ ํ•จ๊ป˜ ๋ณด๋‚ด๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ๋ณด๋ฉด, ์ฒซ ๋ฒˆ์งธ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ๋Š” topic์„ Stringํ˜•์‹์œผ๋กœ ์ „๋‹ฌํ•˜๋Š”๊ฑด ๊ณ ์ •์ด๋‹ค.

 

๋‘ ๋ฒˆ์งธ ํŒŒ๋ผ๋ฏธํ„ฐ๋ถ€ํ„ฐ partition, timestamp, key, data๋ฅผ ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ๋‹ค.

 

timestamp๋ฅผ ๊ฐ™์ด ์ „๋‹ฌํ•˜๋ฉด, ๋ ˆ์ฝ”๋“œ ๋‚ด์— ์‹œ๊ฐ„์„ ๊ฐ™์ด ๋‹ด๊ฒŒ ๋œ๋‹ค.

 

ํ•ด๋‹น topic์˜ ์„ค์ •์— ๋”ฐ๋ผ ๋‹ค๋ฅด๋‹ค.

  • CREATE_TIME ์œผ๋กœ ์„ค์ •๋œ ๊ฒฝ์šฐ : ์‚ฌ์šฉ์ž๊ฐ€ ์ „๋‹ฌํ•œ timestamp๋กœ ๊ธฐ๋ก๋˜๊ฑฐ๋‚˜, ์ „๋‹ฌ๋˜์ง€ ์•Š์œผ๋ฉด ์ž๋™์œผ๋กœ ์ƒ์„ฑ๋œ๋‹ค.
  • LOG_APPEND_TIME ์œผ๋กœ ์„ค์ •๋œ ๊ฒฝ์šฐ : ์‚ฌ์šฉ์ž๊ฐ€ ์ „๋‹ฌํ•œ timestamp๋Š” ๋ฌด์‹œ๋œ๋‹ค. ๋ธŒ๋กœ์ปค๊ฐ€ ๋กœ์ปฌ ์‹œ๊ฐ„์„ ๋„ฃ๊ฒŒ๋œ๋‹ค.

 

 

 

 

์ด์ „ ํฌ์ŠคํŒ…์—์„œ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋‚ด์šฉ์ด ์žˆ์—ˆ๋‹ค.

 

ํ”„๋กœ๋“€์„œ๊ฐ€ ๋งŒ์•ฝ Topic 0์— ๋ฉ”์„ธ์ง€๋ฅผ ๋„ฃ์œผ๋ ค๊ณ  ์‹œ๋„ํ•  ๋•Œ, ๋ผ์šด๋“œ ๋กœ๋นˆRound Robin ์•Œ๊ณ ๋ฆฌ์ฆ˜์œผ๋กœ ํŒŒํ‹ฐ์…˜์„ ์„ ํƒํ•˜๊ฑฐ๋‚˜, ํŒŒํ‹ฐ์…˜์˜ ํ‚คKey(์ด๋Š” ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ฉ”์„ธ์ง€๋ฅผ ์ €์žฅํ•  ๋•Œ ํ† ํ”ฝ๊ณผ ๊ฐ™์ด ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.)๋ฅผ ํ†ตํ•ด ์ €์žฅํ•  Partition์„ ์„ ํƒํ•˜๊ฒŒ ๋œ๋‹ค.

 

์—ฌ๊ธฐ์„œ ํ”„๋กœ๋“€์„œ๊ฐ€ ์ „๋‹ฌํ•˜๋Š” ํ‚ค๋ฅผ ์ „๋‹ฌํ•˜๋ฉฐ ์ €์žฅํ•  Partition์„ ์ง€์ •ํ•˜๊ฒŒ ๋œ๋‹ค.

 

ํ‚ค๋ฅผ ์ „๋‹ฌํ•˜๋ฉด ํ‚ค๋ฅผ ํ•ด์‹ฑํ•˜์—ฌ ๊ฒฐ๊ณผ์— ๋”ฐ๋ผ ํŒŒํ‹ฐ์…˜์— ์ €์žฅํ•˜๊ฒŒ ๋œ๋‹ค.

 

ํ‚ค๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด KafkaConfig์— ๋‹ค์Œ ์„ค์ •์„ ์ถ”๊ฐ€ํ•ด์•ผํ•œ๋‹ค.

...

    @Value("${kafka.topic-key}")
    public String TOPIC_WITH_KEY;

    @Autowired
    private KafkaAdmin kafkaAdmin;

    private NewTopic defaultTopic() {
        return TopicBuilder.name(DEFAULT_TOPIC)
                .partitions(2)
                .replicas(2)
                .build();
    }

    private NewTopic topicWithKey() {
        return TopicBuilder.name(TOPIC_WITH_KEY)
                .partitions(2)
                .replicas(2)
                .build();
    }

    @PostConstruct
    public void init() {
        kafkaAdmin.createOrModifyTopics(defaultTopic());
        kafkaAdmin.createOrModifyTopics(topicWithKey());
	}

...

Key๋Š” ๋‹ค์Œ์— ์ ์šฉํ•ด ๋ณด๋„๋ก ํ•˜๊ณ , ์ „์†กํ•˜๋Š” ์˜ˆ์ œ ์ฝ”๋“œ๋ฅผ ๋ณด์ž.

 

1-1. ๋น„๋™๊ธฐ ์ „์†ก (Non-Blocking)

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}

Java์˜ CompletableFuture์„ ํ†ตํ•ด ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ์— ๋Œ€ํ•œ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

(ํ˜„์žฌ ๋ฒ„์ „์—์„œ๋Š” CompletableFuture๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์žˆ์ง€๋งŒ, ๋‹ค๋ฅธ ๋ฒ„์ „์—์„œ๋Š” ListenableFuture๋กœ ๋ฐ˜ํ™˜ํ•˜๋Š” ๊ฒƒ ๊ฐ™๋‹ค.)

์ดํ›„์— Suceess, Failure๋ฅผ handleํ•˜๋Š” ๋ฉ”์„œ๋“œ๋Š” ์ƒ๋žตํ•˜๊ฒ ๋‹ค.

 

1-2. ๋™๊ธฐ ์ „์†ก(Blocking)

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

๋™๊ธฐ ์ „์†ก๋„ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ CompletableFuture๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ธฐ ๋•Œ๋ฌธ์— get๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด handleํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

 

1-3. ProducerRecordํด๋ž˜์Šค

 

ProducerRecordํด๋ž˜์Šค๋Š” Kafka์—์„œ ์ œ๊ณตํ•˜๋Š” ํด๋ž˜์Šค์ด๋‹ค. ์ด๋ฅผ ์ด์šฉํ•˜์—ฌ send๋ฉ”์„œ๋“œ์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋“ค์„ ๋‹ด์„ ์ˆ˜ ์žˆ๋‹ค.

์•„๋ž˜๋Š” ProducerRecordํด๋ž˜์Šค์˜ ํ•„๋“œ๊ฐ’๋“ค์ด๋‹ค.

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
    
    ...
    
   }

send์— ํ•„์š”ํ•œ ๊ฐ’๋“ค์„ ํ•„๋“œ ๊ฐ’์œผ๋กœ ๊ฐ–๊ณ ์žˆ๋‹ค.

 

์—ฌ๋Ÿฌ ๋ธ”๋กœ๊ธ€๋ฅผ ๋ณด์•˜์„ ๋•Œ ๊ฐ„๋‹จํ•˜๊ฒŒ topic์„ ์ •ํ•ด๋†“๊ณ  message๋งŒ ์ „๋‹ฌํ•˜๊ณ  ์žˆ์—ˆ๋‹ค.

์œ„์™€ ๊ฐ™์ด ProducerRecord๋ฅผ ์ด์šฉํ•ด์„œ ๋งค๊ฐœ๋ณ€์ˆ˜๋“ค์„ ์ •ํ˜•ํ™”ํ•˜๊ณ  controller๋‚˜ ๋‹ค๋ฅธ ์„œ๋น„์Šค์—์„œ ์ฒ˜๋ฆฌํ•˜๋Š”๊ฒŒ ์ข‹์•„๋ณด์ธ๋‹ค.

 

 

 

- ์ˆ˜์‹  ( Consumer )

 

0. ์„ค์ •

 

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

@KafkaListner๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด, @EnableKafka๋ฅผ ํ™œ์„ฑํ™”์‹œ์ผœ์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

1. ์ˆ˜์‹ 

๋ฉ”์„ธ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๊ธฐ ์œ„ํ•ด์„  @KafkaListner ์–ด๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•œ๋‹ค.

 

1-1. ๊ธฐ๋ณธ ์ˆ˜์‹ 

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

ํ•„์š”ํ•œ ๋งค๊ฐœ๋ณ€์ˆ˜๋“ค์„ ์„ค์ •ํ•˜์—ฌ data๋ฅผ listenํ•  ์ˆ˜ ์žˆ๋‹ค.

 

์—ฌ๊ธฐ์„œ Kafka์˜ ํŠน์ง•์ธ Event Driven์˜ ์„ฑ๊ฒฉ์„ ์ž˜ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

Consumer๋Š” ๋ฉ”์„ธ์ง€๊ฐ€ ์™”๋Š”์ง€ DB๋ฅผ ์ ‘๊ทผํ•˜๋Š” ๋“ฑ ์ˆ˜์‹œ๋กœ ํ™•์ธํ•˜๋Š”๊ฒŒ ์•„๋‹ˆ๋ผ, ๋ฉ”์„ธ์ง€๋ฅผ ๋ฐ›์€ Event๊ฐ€ ๋ฐœ์ƒํ•จ์— ๋”ฐ๋ผ ์‹คํ–‰๋œ๋‹ค.

 

1-2. ํ† ํ”ฝ, ํŒŒํ‹ฐ์…˜ ์ง€์ • ์ˆ˜์‹ 

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

์œ„์™€ ๊ฐ™์ด ํŠน์ • ํ† ํ”ฝ์ด๋‚˜ ํŒŒํ‹ฐ์…˜์„ ์ง€์ •ํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Producer๊ฐ€ ์–ด๋Š ํ† ํ”ฝ๊ณผ ํŒŒํ‹ฐ์…˜์— ๋„ฃ๊ฒ ๋‹ค๊ณ  ์•ฝ์†ํ•ด ๋‘๊ณ  ์‚ฌ์šฉํ•˜๋ฉด ๋˜๊ฒ ๋‹ค.

 

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

์œ„์™€ ๊ฐ™์ด parition = "*" ์ฒ˜๋Ÿผ wildcard๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

์œ„์™€ ๊ฐ™์ด ํŒŒํ‹ฐ์…˜์˜ ๋ฒ”์œ„๋ฅผ ์ง€์ •ํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

 

1-3. ๋ ˆ์ฝ”๋“œ์˜ metadata์ˆ˜์ง‘

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

๋ฉ”์„ธ์ง€๋กœ๋ถ€ํ„ฐ ๋ฐ›์€ ํ‚ค, ํŒŒํ‹ฐ์…˜, ํ† ํ”ฝ ๋“ฑ๋“ฑ์„ ๋ฐ›์„ ์ˆ˜ ์žˆ์—ˆ๋‹ค.

 

 

ํ•˜์ง€๋งŒ 2.5 ๋ฒ„์ „๋ถ€ํ„ฐ๋Š”, 

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}

 

ConsumerRecordMetadata ํด๋ž˜์Šค๋ฅผ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ํ•œ๊บผ๋ฒˆ์— ๋ฐ›์•„๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

- ConsumerRecordMetadata.java

public class ConsumerRecordMetadata {

	private final RecordMetadata delegate;

	private final TimestampType timestampType;

	...

	public boolean hasOffset() {...}

	public long offset() {...}

	public boolean hasTimestamp() {...}

	public long timestamp() {...}

	public int serializedKeySize() {...}

	public int serializedValueSize() {...}

	public String topic() {...}

	public int partition() {...}

	public TimestampType timestampType() {...}

	@Override
	public int hashCode() {...}

	@Override
	public boolean equals(Object obj) {...}

	@Override
	public String toString() {...}

}

ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ํด๋ž˜์Šค์—์„œ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๊ฒ ๋‹ค.

 

 

 

- ์ ์šฉ

 

์ด์ œ Spring ํ”„๋กœ์ ํŠธ ์˜ˆ์ œ์— Kafka๋ฅผ ์ ์šฉ์‹œ์ผœ ํ…Œ์ŠคํŠธํ•ด ๋ณด๊ฒ ๋‹ค.

 

0. ์„ค์ •

dependency์— ๋‹ค์Œ์„ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.

implementation 'org.springframework.kafka:spring-kafka'

 

application.yml์— ๋‹ค์Œ์„ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

 

DB์—†๋Š” ํ…Œ์ŠคํŠธ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด๋‹ค.

@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class KafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaApplication.class, args);
	}

}

DataSrouce๋ฅผ ์ œ์™ธํ•˜๊ณ  ์‹คํ–‰ํ•˜์ž.

 

1. KafkaConfig

 

- KafkaConfig.java

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.consumer.group-id}")
    private String groupId;



    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }


    // -------------------------------------- Producer --------------------------------------

    // New Topic
    @Bean
    public NewTopic myTopic1() {
        return new NewTopic("topic_1", 1, (short) 1);
    }

    // New Topic 2
    @Bean
    public NewTopic myTopic2() {
        return new NewTopic("topic_2", 1, (short) 1);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // -------------------------------------- Consumer --------------------------------------

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

ํ…Œ์ŠคํŠธ์šฉ ํ† ํ”ฝ topic_1, topic_2๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค. yml์— ์„ค์ •ํ•ด๋‘” kafka address์™€ ๋งคํ•‘์‹œ์ผœ์ฃผ๊ณ , Producer์™€ Consumer ์„ค์ •์„ ํ•ด๋‘์—ˆ๋‹ค.

 

 

2. KafkaController

 

- KafkaController.java

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    @Autowired
    private KafkaService kafkaService;

    @PostMapping("")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaService.sendMessage(message);
        return "Send Success!";
    }
}

url์€ ๋‹จ์ˆœํ•˜๊ฒŒ /kafka๋กœ ์„ค์ •ํ–ˆ๊ณ , ์‚ฌ์šฉ์ž๊ฐ€ ๋ฉ”์„ธ์ง€๋ฅผ ์ „์†กํ•˜๋ ค๊ณ  ์‹œ๋„ํ•˜๋Š” API๋ฅผ ์ž‘์„ฑํ–ˆ๋‹ค.

 

๋งค๊ฐœ๋ณ€์ˆ˜ message๋กœ ์ „๋‹ฌํ•˜๋ ค๋Š” ๋ฉ”์„ธ์ง€๋ฅผ ์ „๋‹ฌํ•˜๋ฉด ๋œ๋‹ค.

 

3. KafkaService

 

 - KafkaService.java

@Slf4j
@Service
public class KafkaService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    //----------- Producer -----------
    public void sendMessage(String message) {
        log.info("[Producer] Message Send : " + message);
        this.kafkaTemplate.send("topic_1", message);
    }


    //----------- Consumer -----------
    @KafkaListener(topics = "topic_1", groupId = "myGroup")
    public void consumeMyopic1(@Payload String message, ConsumerRecordMetadata metadata) {
        log.info("[Consumer] Message Received : " + message);
        log.info("[Consumer] Partition : " + metadata.partition());
    }
}

Producer๊ฐ€ ๋ฉ”์„ธ์ง€๋ฅผ sendํ•˜๊ธฐ ์œ„ํ•ด KafkaTemplate์„ ์„ ์–ธํ–ˆ๋‹ค.

 

Producer์˜ ๊ฒฝ์šฐ message๋ฅผ ๋ฐ›์•„์„œ ๋‹จ์ˆœํžˆ sendํ•˜๊ณ , ์„ค์ •ํ•ด๋‘์—ˆ๋˜ ํ† ํ”ฝ(topic_1 )์œผ๋กœ ๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋‚ธ๋‹ค.

 

Consumer์˜ ๊ฒฝ์šฐ KafkaListener ์–ด๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•ด์„œ message๋ฅผ listenํ•œ๋‹ค.

๋ฉ”์„ธ์ง€๋ฅผ ๋ฐ›์œผ๋ฉด log๋ฅผ ์ถœ๋ ฅํ•˜๋„๋ก ํ•ด๋‘์—ˆ๋‹ค.

 

 

 

 

- ํ…Œ์ŠคํŠธ

 

์ด์ œ ํ…Œ์ŠคํŠธ๋ฅผ ์ง„ํ–‰ํ•ด๋ณด์ž.

๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋ƒˆ๊ณ , Reponse 200๊ณผ ํ•จ๊ป˜ Send success! ๋ฌธ๊ตฌ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ์—ˆ๋‹ค.

 

log๋„ ์ž˜ ์ถœ๋ ฅ๋œ ๊ฒƒ์„ ํ™•์ธํ–ˆ๋‹ค. ๋ฉ”์„ธ์ง€ ์ „์†ก์— ์„ฑ๊ณตํ–ˆ๋‹ค. 

 

 

 

 

์ ์šฉ ์ฝ”๋“œ๋Š” ๊นƒํ—ˆ๋ธŒ์— ์—…๋กœ๋“œํ•ด ๋‘์—ˆ๋‹ค.

https://github.com/DDonghyeo/kafka-spring

 

 

Kafka๋ฅผ ๊ณต๋ถ€ํ•˜๊ณ  Spring์— ์ง์ ‘ ์ ์šฉ์‹œ์ผœ์„œ ์‚ฌ์šฉํ•ด๋ณด์•˜๋‹ค.

 

๋งŽ์€ ๋‚ด์šฉ์ด ์žˆ์—ˆ์ง€๋งŒ ํ˜„์žฌ๋Š” ๋‹จ์ˆœ ๋ฉ”์„ธ์ง€ ์ „์†ก๋งŒ ํ…Œ์ŠคํŠธ ํ•ด๋ณด์•˜๋‹ค.

 

๋‹ค์Œ ํฌ์ŠคํŒ…๋ถ€ํ„ด ๊ณต์‹ ๋ฌธ์„œ์— ์žˆ๋Š” ์—ฌ๋Ÿฌ ๋ ˆํผ๋Ÿฐ์Šค๋ฅผ ํ™œ์šฉํ•ด์„œ ํ† ํ”ฝ&ํŒŒํ‹ฐ์…˜ ๊ด€๋ฆฌ, ํ‚ค ์‚ฌ์šฉ, Producer, Consumer ์„ค์ • ๋“ฑ์„ ์ง์ ‘ ์ ์šฉํ•ด๋ณด๊ณ , ๋”์šฑ ์‹ค์šฉ์ ์œผ๋กœ ์ฝ”๋“œ๋ฅผ ์ •ํ˜•ํ™”ํ•ด ๋ณด๊ฒ ๋‹ค.

 

์ฐธ๊ณ 
https://docs.spring.io/spring-kafka/reference/
https://spring.io/projects/spring-kafka
https://happy-jjang-a.tistory.com/201
https://devocean.sk.com/blog/techBoardDetail.do?ID=164096