[Kafka] Spring-Kafka API ์ ์ฉ & ํ ์คํธ
Kafka version : 3.0.10
spring-kafka ๊ณต์๋ฌธ์๋ฅผ ๋ง์ด ์ฝ์ด๋ณด์.
https://docs.spring.io/spring-kafka/reference/index.html
Overview :: Spring Kafka
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically.
docs.spring.io
- ์ ์ก ( Producer )
๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋๊ฑด KafkaTemplate ํด๋์ค๋ฅผ ์ด์ฉํ ์ ์๋ค.
0. ์ค์
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
KafkaTemplate์ ์ฌ์ฉํ๊ธฐ ์ํด์ , producerFactory๋ฅผ ๋น์ผ๋ก ๋ฑ๋กํ๊ณ ์์ฑ์๋ก ์ ๊ณตํ๋ค.
1. ์ ์ก
kafkaTemplate์ ์ ์ธํด์ฃผ์.
private final KafkaTemplate<String, String> kafkaTemplate;
Kafka์ send๋ฉ์๋๋ฅผ ์ดํด๋ณด๊ฒ ๋ค.
- KafkaTemplate.java
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
topic๊ณผ ํจ๊ป ๋ณด๋ด๋ ๋ฉ์๋๋ฅผ ๋ณด๋ฉด, ์ฒซ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ๋ก๋ topic์ Stringํ์์ผ๋ก ์ ๋ฌํ๋๊ฑด ๊ณ ์ ์ด๋ค.
๋ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ๋ถํฐ partition, timestamp, key, data๋ฅผ ์ ๋ฌํ ์ ์๋ค.
timestamp๋ฅผ ๊ฐ์ด ์ ๋ฌํ๋ฉด, ๋ ์ฝ๋ ๋ด์ ์๊ฐ์ ๊ฐ์ด ๋ด๊ฒ ๋๋ค.
ํด๋น topic์ ์ค์ ์ ๋ฐ๋ผ ๋ค๋ฅด๋ค.
- CREATE_TIME ์ผ๋ก ์ค์ ๋ ๊ฒฝ์ฐ : ์ฌ์ฉ์๊ฐ ์ ๋ฌํ timestamp๋ก ๊ธฐ๋ก๋๊ฑฐ๋, ์ ๋ฌ๋์ง ์์ผ๋ฉด ์๋์ผ๋ก ์์ฑ๋๋ค.
- LOG_APPEND_TIME ์ผ๋ก ์ค์ ๋ ๊ฒฝ์ฐ : ์ฌ์ฉ์๊ฐ ์ ๋ฌํ timestamp๋ ๋ฌด์๋๋ค. ๋ธ๋ก์ปค๊ฐ ๋ก์ปฌ ์๊ฐ์ ๋ฃ๊ฒ๋๋ค.
์ด์ ํฌ์คํ ์์ ๋ค์๊ณผ ๊ฐ์ ๋ด์ฉ์ด ์์๋ค.
ํ๋ก๋์๊ฐ ๋ง์ฝ Topic 0์ ๋ฉ์ธ์ง๋ฅผ ๋ฃ์ผ๋ ค๊ณ ์๋ํ ๋, ๋ผ์ด๋ ๋ก๋นRound Robin ์๊ณ ๋ฆฌ์ฆ์ผ๋ก ํํฐ์ ์ ์ ํํ๊ฑฐ๋, ํํฐ์ ์ ํคKey(์ด๋ ํ๋ก๋์๊ฐ ๋ฉ์ธ์ง๋ฅผ ์ ์ฅํ ๋ ํ ํฝ๊ณผ ๊ฐ์ด ์ ๋ฌํด์ผ ํ๋ค.)๋ฅผ ํตํด ์ ์ฅํ Partition์ ์ ํํ๊ฒ ๋๋ค.
์ฌ๊ธฐ์ ํ๋ก๋์๊ฐ ์ ๋ฌํ๋ ํค๋ฅผ ์ ๋ฌํ๋ฉฐ ์ ์ฅํ Partition์ ์ง์ ํ๊ฒ ๋๋ค.
ํค๋ฅผ ์ ๋ฌํ๋ฉด ํค๋ฅผ ํด์ฑํ์ฌ ๊ฒฐ๊ณผ์ ๋ฐ๋ผ ํํฐ์ ์ ์ ์ฅํ๊ฒ ๋๋ค.
ํค๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด KafkaConfig์ ๋ค์ ์ค์ ์ ์ถ๊ฐํด์ผํ๋ค.
...
@Value("${kafka.topic-key}")
public String TOPIC_WITH_KEY;
@Autowired
private KafkaAdmin kafkaAdmin;
private NewTopic defaultTopic() {
return TopicBuilder.name(DEFAULT_TOPIC)
.partitions(2)
.replicas(2)
.build();
}
private NewTopic topicWithKey() {
return TopicBuilder.name(TOPIC_WITH_KEY)
.partitions(2)
.replicas(2)
.build();
}
@PostConstruct
public void init() {
kafkaAdmin.createOrModifyTopics(defaultTopic());
kafkaAdmin.createOrModifyTopics(topicWithKey());
}
...
Key๋ ๋ค์์ ์ ์ฉํด ๋ณด๋๋ก ํ๊ณ , ์ ์กํ๋ ์์ ์ฝ๋๋ฅผ ๋ณด์.
1-1. ๋น๋๊ธฐ ์ ์ก (Non-Blocking)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
Java์ CompletableFuture์ ํตํด ๋น๋๊ธฐ ์ฒ๋ฆฌ์ ๋ํ ๊ฒฐ๊ณผ๊ฐ์ ๋ฐ์ ์ ์๋ค.
(ํ์ฌ ๋ฒ์ ์์๋ CompletableFuture๋ฅผ ๋ฐํํ๊ณ ์์ง๋ง, ๋ค๋ฅธ ๋ฒ์ ์์๋ ListenableFuture๋ก ๋ฐํํ๋ ๊ฒ ๊ฐ๋ค.)
์ดํ์ Suceess, Failure๋ฅผ handleํ๋ ๋ฉ์๋๋ ์๋ตํ๊ฒ ๋ค.
1-2. ๋๊ธฐ ์ ์ก(Blocking)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
๋๊ธฐ ์ ์ก๋ ๋ง์ฐฌ๊ฐ์ง๋ก CompletableFuture๋ฅผ ๋ฐํํ๊ธฐ ๋๋ฌธ์ get๋ฉ์๋๋ฅผ ํตํด handleํ ์ ์๋ค.
1-3. ProducerRecordํด๋์ค
ProducerRecordํด๋์ค๋ Kafka์์ ์ ๊ณตํ๋ ํด๋์ค์ด๋ค. ์ด๋ฅผ ์ด์ฉํ์ฌ send๋ฉ์๋์ ํ๋ผ๋ฏธํฐ๋ค์ ๋ด์ ์ ์๋ค.
์๋๋ ProducerRecordํด๋์ค์ ํ๋๊ฐ๋ค์ด๋ค.
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
...
}
send์ ํ์ํ ๊ฐ๋ค์ ํ๋ ๊ฐ์ผ๋ก ๊ฐ๊ณ ์๋ค.
์ฌ๋ฌ ๋ธ๋ก๊ธ๋ฅผ ๋ณด์์ ๋ ๊ฐ๋จํ๊ฒ topic์ ์ ํด๋๊ณ message๋ง ์ ๋ฌํ๊ณ ์์๋ค.
์์ ๊ฐ์ด ProducerRecord๋ฅผ ์ด์ฉํด์ ๋งค๊ฐ๋ณ์๋ค์ ์ ํํํ๊ณ controller๋ ๋ค๋ฅธ ์๋น์ค์์ ์ฒ๋ฆฌํ๋๊ฒ ์ข์๋ณด์ธ๋ค.
- ์์ ( Consumer )
0. ์ค์
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
@KafkaListner๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด, @EnableKafka๋ฅผ ํ์ฑํ์์ผ์ฃผ์ด์ผ ํ๋ค.
1. ์์
๋ฉ์ธ์ง๋ฅผ ์์ ํ๊ธฐ ์ํด์ @KafkaListner ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํ๋ค.
1-1. ๊ธฐ๋ณธ ์์
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
ํ์ํ ๋งค๊ฐ๋ณ์๋ค์ ์ค์ ํ์ฌ data๋ฅผ listenํ ์ ์๋ค.
์ฌ๊ธฐ์ Kafka์ ํน์ง์ธ Event Driven์ ์ฑ๊ฒฉ์ ์ ๋ณผ ์ ์๋ค.
Consumer๋ ๋ฉ์ธ์ง๊ฐ ์๋์ง DB๋ฅผ ์ ๊ทผํ๋ ๋ฑ ์์๋ก ํ์ธํ๋๊ฒ ์๋๋ผ, ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ Event๊ฐ ๋ฐ์ํจ์ ๋ฐ๋ผ ์คํ๋๋ค.
1-2. ํ ํฝ, ํํฐ์ ์ง์ ์์
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
์์ ๊ฐ์ด ํน์ ํ ํฝ์ด๋ ํํฐ์ ์ ์ง์ ํ์ฌ ์ฌ์ฉํ ์ ์๋ค.
Producer๊ฐ ์ด๋ ํ ํฝ๊ณผ ํํฐ์ ์ ๋ฃ๊ฒ ๋ค๊ณ ์ฝ์ํด ๋๊ณ ์ฌ์ฉํ๋ฉด ๋๊ฒ ๋ค.
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
์์ ๊ฐ์ด parition = "*" ์ฒ๋ผ wildcard๋ก ์ฌ์ฉํ ์ ์๋ค.
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
์์ ๊ฐ์ด ํํฐ์ ์ ๋ฒ์๋ฅผ ์ง์ ํ์ฌ ์ฌ์ฉํ ์๋ ์๋ค.
1-3. ๋ ์ฝ๋์ metadata์์ง
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
๋ฉ์ธ์ง๋ก๋ถํฐ ๋ฐ์ ํค, ํํฐ์ , ํ ํฝ ๋ฑ๋ฑ์ ๋ฐ์ ์ ์์๋ค.
ํ์ง๋ง 2.5 ๋ฒ์ ๋ถํฐ๋,
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
ConsumerRecordMetadata ํด๋์ค๋ฅผ ๋งค๊ฐ๋ณ์๋ก ํ๊บผ๋ฒ์ ๋ฐ์๋ณผ ์ ์๋ค.
- ConsumerRecordMetadata.java
public class ConsumerRecordMetadata {
private final RecordMetadata delegate;
private final TimestampType timestampType;
...
public boolean hasOffset() {...}
public long offset() {...}
public boolean hasTimestamp() {...}
public long timestamp() {...}
public int serializedKeySize() {...}
public int serializedValueSize() {...}
public String topic() {...}
public int partition() {...}
public TimestampType timestampType() {...}
@Override
public int hashCode() {...}
@Override
public boolean equals(Object obj) {...}
@Override
public String toString() {...}
}
ํ์ํ ๋ฐ์ดํฐ๋ฅผ ํด๋์ค์์ ๊ฐ์ ธ์ฌ ์ ์๊ฒ ๋ค.
- ์ ์ฉ
์ด์ Spring ํ๋ก์ ํธ ์์ ์ Kafka๋ฅผ ์ ์ฉ์์ผ ํ ์คํธํด ๋ณด๊ฒ ๋ค.
0. ์ค์
dependency์ ๋ค์์ ์ถ๊ฐํด์ค๋ค.
implementation 'org.springframework.kafka:spring-kafka'
application.yml์ ๋ค์์ ์ถ๊ฐํด์ค๋ค.
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
DB์๋ ํ ์คํธ ์ ํ๋ฆฌ์ผ์ด์ ์ด๋ค.
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
DataSrouce๋ฅผ ์ ์ธํ๊ณ ์คํํ์.
1. KafkaConfig
- KafkaConfig.java
@EnableKafka
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// -------------------------------------- Producer --------------------------------------
// New Topic
@Bean
public NewTopic myTopic1() {
return new NewTopic("topic_1", 1, (short) 1);
}
// New Topic 2
@Bean
public NewTopic myTopic2() {
return new NewTopic("topic_2", 1, (short) 1);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// -------------------------------------- Consumer --------------------------------------
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
ํ ์คํธ์ฉ ํ ํฝ topic_1, topic_2๋ฅผ ๋ง๋ค์๋ค. yml์ ์ค์ ํด๋ kafka address์ ๋งคํ์์ผ์ฃผ๊ณ , Producer์ Consumer ์ค์ ์ ํด๋์๋ค.
2. KafkaController
- KafkaController.java
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
@Autowired
private KafkaService kafkaService;
@PostMapping("")
public String sendMessage(@RequestParam("message") String message) {
kafkaService.sendMessage(message);
return "Send Success!";
}
}
url์ ๋จ์ํ๊ฒ /kafka๋ก ์ค์ ํ๊ณ , ์ฌ์ฉ์๊ฐ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ ค๊ณ ์๋ํ๋ API๋ฅผ ์์ฑํ๋ค.
๋งค๊ฐ๋ณ์ message๋ก ์ ๋ฌํ๋ ค๋ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๋ฉด ๋๋ค.
3. KafkaService
- KafkaService.java
@Slf4j
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//----------- Producer -----------
public void sendMessage(String message) {
log.info("[Producer] Message Send : " + message);
this.kafkaTemplate.send("topic_1", message);
}
//----------- Consumer -----------
@KafkaListener(topics = "topic_1", groupId = "myGroup")
public void consumeMyopic1(@Payload String message, ConsumerRecordMetadata metadata) {
log.info("[Consumer] Message Received : " + message);
log.info("[Consumer] Partition : " + metadata.partition());
}
}
Producer๊ฐ ๋ฉ์ธ์ง๋ฅผ sendํ๊ธฐ ์ํด KafkaTemplate์ ์ ์ธํ๋ค.
Producer์ ๊ฒฝ์ฐ message๋ฅผ ๋ฐ์์ ๋จ์ํ sendํ๊ณ , ์ค์ ํด๋์๋ ํ ํฝ(topic_1 )์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ธ๋ค.
Consumer์ ๊ฒฝ์ฐ KafkaListener ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํด์ message๋ฅผ listenํ๋ค.
๋ฉ์ธ์ง๋ฅผ ๋ฐ์ผ๋ฉด log๋ฅผ ์ถ๋ ฅํ๋๋ก ํด๋์๋ค.
- ํ ์คํธ
์ด์ ํ ์คํธ๋ฅผ ์งํํด๋ณด์.
๋ฉ์ธ์ง๋ฅผ ๋ณด๋๊ณ , Reponse 200๊ณผ ํจ๊ป Send success! ๋ฌธ๊ตฌ๋ฅผ ๋ฐ์ ์ ์์๋ค.
log๋ ์ ์ถ๋ ฅ๋ ๊ฒ์ ํ์ธํ๋ค. ๋ฉ์ธ์ง ์ ์ก์ ์ฑ๊ณตํ๋ค.
์ ์ฉ ์ฝ๋๋ ๊นํ๋ธ์ ์ ๋ก๋ํด ๋์๋ค.
https://github.com/DDonghyeo/kafka-spring
Kafka๋ฅผ ๊ณต๋ถํ๊ณ Spring์ ์ง์ ์ ์ฉ์์ผ์ ์ฌ์ฉํด๋ณด์๋ค.
๋ง์ ๋ด์ฉ์ด ์์์ง๋ง ํ์ฌ๋ ๋จ์ ๋ฉ์ธ์ง ์ ์ก๋ง ํ ์คํธ ํด๋ณด์๋ค.
๋ค์ ํฌ์คํ ๋ถํด ๊ณต์ ๋ฌธ์์ ์๋ ์ฌ๋ฌ ๋ ํผ๋ฐ์ค๋ฅผ ํ์ฉํด์ ํ ํฝ&ํํฐ์ ๊ด๋ฆฌ, ํค ์ฌ์ฉ, Producer, Consumer ์ค์ ๋ฑ์ ์ง์ ์ ์ฉํด๋ณด๊ณ , ๋์ฑ ์ค์ฉ์ ์ผ๋ก ์ฝ๋๋ฅผ ์ ํํํด ๋ณด๊ฒ ๋ค.
์ฐธ๊ณ
https://docs.spring.io/spring-kafka/reference/
https://spring.io/projects/spring-kafka
https://happy-jjang-a.tistory.com/201
https://devocean.sk.com/blog/techBoardDetail.do?ID=164096