Kafka version : 3.0.10
spring-kafka ๊ณต์๋ฌธ์๋ฅผ ๋ง์ด ์ฝ์ด๋ณด์.
https://docs.spring.io/spring-kafka/reference/index.html
- ์ ์ก ( Producer )
๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋๊ฑด KafkaTemplate ํด๋์ค๋ฅผ ์ด์ฉํ ์ ์๋ค.
0. ์ค์
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
KafkaTemplate์ ์ฌ์ฉํ๊ธฐ ์ํด์ , producerFactory๋ฅผ ๋น์ผ๋ก ๋ฑ๋กํ๊ณ ์์ฑ์๋ก ์ ๊ณตํ๋ค.
1. ์ ์ก
kafkaTemplate์ ์ ์ธํด์ฃผ์.
private final KafkaTemplate<String, String> kafkaTemplate;
Kafka์ send๋ฉ์๋๋ฅผ ์ดํด๋ณด๊ฒ ๋ค.
- KafkaTemplate.java
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
topic๊ณผ ํจ๊ป ๋ณด๋ด๋ ๋ฉ์๋๋ฅผ ๋ณด๋ฉด, ์ฒซ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ๋ก๋ topic์ Stringํ์์ผ๋ก ์ ๋ฌํ๋๊ฑด ๊ณ ์ ์ด๋ค.
๋ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ๋ถํฐ partition, timestamp, key, data๋ฅผ ์ ๋ฌํ ์ ์๋ค.
timestamp๋ฅผ ๊ฐ์ด ์ ๋ฌํ๋ฉด, ๋ ์ฝ๋ ๋ด์ ์๊ฐ์ ๊ฐ์ด ๋ด๊ฒ ๋๋ค.
ํด๋น topic์ ์ค์ ์ ๋ฐ๋ผ ๋ค๋ฅด๋ค.
- CREATE_TIME ์ผ๋ก ์ค์ ๋ ๊ฒฝ์ฐ : ์ฌ์ฉ์๊ฐ ์ ๋ฌํ timestamp๋ก ๊ธฐ๋ก๋๊ฑฐ๋, ์ ๋ฌ๋์ง ์์ผ๋ฉด ์๋์ผ๋ก ์์ฑ๋๋ค.
- LOG_APPEND_TIME ์ผ๋ก ์ค์ ๋ ๊ฒฝ์ฐ : ์ฌ์ฉ์๊ฐ ์ ๋ฌํ timestamp๋ ๋ฌด์๋๋ค. ๋ธ๋ก์ปค๊ฐ ๋ก์ปฌ ์๊ฐ์ ๋ฃ๊ฒ๋๋ค.
์ด์ ํฌ์คํ ์์ ๋ค์๊ณผ ๊ฐ์ ๋ด์ฉ์ด ์์๋ค.
ํ๋ก๋์๊ฐ ๋ง์ฝ Topic 0์ ๋ฉ์ธ์ง๋ฅผ ๋ฃ์ผ๋ ค๊ณ ์๋ํ ๋, ๋ผ์ด๋ ๋ก๋นRound Robin ์๊ณ ๋ฆฌ์ฆ์ผ๋ก ํํฐ์ ์ ์ ํํ๊ฑฐ๋, ํํฐ์ ์ ํคKey(์ด๋ ํ๋ก๋์๊ฐ ๋ฉ์ธ์ง๋ฅผ ์ ์ฅํ ๋ ํ ํฝ๊ณผ ๊ฐ์ด ์ ๋ฌํด์ผ ํ๋ค.)๋ฅผ ํตํด ์ ์ฅํ Partition์ ์ ํํ๊ฒ ๋๋ค.
์ฌ๊ธฐ์ ํ๋ก๋์๊ฐ ์ ๋ฌํ๋ ํค๋ฅผ ์ ๋ฌํ๋ฉฐ ์ ์ฅํ Partition์ ์ง์ ํ๊ฒ ๋๋ค.
ํค๋ฅผ ์ ๋ฌํ๋ฉด ํค๋ฅผ ํด์ฑํ์ฌ ๊ฒฐ๊ณผ์ ๋ฐ๋ผ ํํฐ์ ์ ์ ์ฅํ๊ฒ ๋๋ค.
ํค๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด KafkaConfig์ ๋ค์ ์ค์ ์ ์ถ๊ฐํด์ผํ๋ค.
...
@Value("${kafka.topic-key}")
public String TOPIC_WITH_KEY;
@Autowired
private KafkaAdmin kafkaAdmin;
private NewTopic defaultTopic() {
return TopicBuilder.name(DEFAULT_TOPIC)
.partitions(2)
.replicas(2)
.build();
}
private NewTopic topicWithKey() {
return TopicBuilder.name(TOPIC_WITH_KEY)
.partitions(2)
.replicas(2)
.build();
}
@PostConstruct
public void init() {
kafkaAdmin.createOrModifyTopics(defaultTopic());
kafkaAdmin.createOrModifyTopics(topicWithKey());
}
...
Key๋ ๋ค์์ ์ ์ฉํด ๋ณด๋๋ก ํ๊ณ , ์ ์กํ๋ ์์ ์ฝ๋๋ฅผ ๋ณด์.
1-1. ๋น๋๊ธฐ ์ ์ก (Non-Blocking)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
Java์ CompletableFuture์ ํตํด ๋น๋๊ธฐ ์ฒ๋ฆฌ์ ๋ํ ๊ฒฐ๊ณผ๊ฐ์ ๋ฐ์ ์ ์๋ค.
(ํ์ฌ ๋ฒ์ ์์๋ CompletableFuture๋ฅผ ๋ฐํํ๊ณ ์์ง๋ง, ๋ค๋ฅธ ๋ฒ์ ์์๋ ListenableFuture๋ก ๋ฐํํ๋ ๊ฒ ๊ฐ๋ค.)
์ดํ์ Suceess, Failure๋ฅผ handleํ๋ ๋ฉ์๋๋ ์๋ตํ๊ฒ ๋ค.
1-2. ๋๊ธฐ ์ ์ก(Blocking)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
๋๊ธฐ ์ ์ก๋ ๋ง์ฐฌ๊ฐ์ง๋ก CompletableFuture๋ฅผ ๋ฐํํ๊ธฐ ๋๋ฌธ์ get๋ฉ์๋๋ฅผ ํตํด handleํ ์ ์๋ค.
1-3. ProducerRecordํด๋์ค
ProducerRecordํด๋์ค๋ Kafka์์ ์ ๊ณตํ๋ ํด๋์ค์ด๋ค. ์ด๋ฅผ ์ด์ฉํ์ฌ send๋ฉ์๋์ ํ๋ผ๋ฏธํฐ๋ค์ ๋ด์ ์ ์๋ค.
์๋๋ ProducerRecordํด๋์ค์ ํ๋๊ฐ๋ค์ด๋ค.
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
...
}
send์ ํ์ํ ๊ฐ๋ค์ ํ๋ ๊ฐ์ผ๋ก ๊ฐ๊ณ ์๋ค.
์ฌ๋ฌ ๋ธ๋ก๊ธ๋ฅผ ๋ณด์์ ๋ ๊ฐ๋จํ๊ฒ topic์ ์ ํด๋๊ณ message๋ง ์ ๋ฌํ๊ณ ์์๋ค.
์์ ๊ฐ์ด ProducerRecord๋ฅผ ์ด์ฉํด์ ๋งค๊ฐ๋ณ์๋ค์ ์ ํํํ๊ณ controller๋ ๋ค๋ฅธ ์๋น์ค์์ ์ฒ๋ฆฌํ๋๊ฒ ์ข์๋ณด์ธ๋ค.
- ์์ ( Consumer )
0. ์ค์
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
@KafkaListner๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด, @EnableKafka๋ฅผ ํ์ฑํ์์ผ์ฃผ์ด์ผ ํ๋ค.
1. ์์
๋ฉ์ธ์ง๋ฅผ ์์ ํ๊ธฐ ์ํด์ @KafkaListner ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํ๋ค.
1-1. ๊ธฐ๋ณธ ์์
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
ํ์ํ ๋งค๊ฐ๋ณ์๋ค์ ์ค์ ํ์ฌ data๋ฅผ listenํ ์ ์๋ค.
์ฌ๊ธฐ์ Kafka์ ํน์ง์ธ Event Driven์ ์ฑ๊ฒฉ์ ์ ๋ณผ ์ ์๋ค.
Consumer๋ ๋ฉ์ธ์ง๊ฐ ์๋์ง DB๋ฅผ ์ ๊ทผํ๋ ๋ฑ ์์๋ก ํ์ธํ๋๊ฒ ์๋๋ผ, ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ Event๊ฐ ๋ฐ์ํจ์ ๋ฐ๋ผ ์คํ๋๋ค.
1-2. ํ ํฝ, ํํฐ์ ์ง์ ์์
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
์์ ๊ฐ์ด ํน์ ํ ํฝ์ด๋ ํํฐ์ ์ ์ง์ ํ์ฌ ์ฌ์ฉํ ์ ์๋ค.
Producer๊ฐ ์ด๋ ํ ํฝ๊ณผ ํํฐ์ ์ ๋ฃ๊ฒ ๋ค๊ณ ์ฝ์ํด ๋๊ณ ์ฌ์ฉํ๋ฉด ๋๊ฒ ๋ค.
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
์์ ๊ฐ์ด parition = "*" ์ฒ๋ผ wildcard๋ก ์ฌ์ฉํ ์ ์๋ค.
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
์์ ๊ฐ์ด ํํฐ์ ์ ๋ฒ์๋ฅผ ์ง์ ํ์ฌ ์ฌ์ฉํ ์๋ ์๋ค.
1-3. ๋ ์ฝ๋์ metadata์์ง
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
๋ฉ์ธ์ง๋ก๋ถํฐ ๋ฐ์ ํค, ํํฐ์ , ํ ํฝ ๋ฑ๋ฑ์ ๋ฐ์ ์ ์์๋ค.
ํ์ง๋ง 2.5 ๋ฒ์ ๋ถํฐ๋,
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
ConsumerRecordMetadata ํด๋์ค๋ฅผ ๋งค๊ฐ๋ณ์๋ก ํ๊บผ๋ฒ์ ๋ฐ์๋ณผ ์ ์๋ค.
- ConsumerRecordMetadata.java
public class ConsumerRecordMetadata {
private final RecordMetadata delegate;
private final TimestampType timestampType;
...
public boolean hasOffset() {...}
public long offset() {...}
public boolean hasTimestamp() {...}
public long timestamp() {...}
public int serializedKeySize() {...}
public int serializedValueSize() {...}
public String topic() {...}
public int partition() {...}
public TimestampType timestampType() {...}
@Override
public int hashCode() {...}
@Override
public boolean equals(Object obj) {...}
@Override
public String toString() {...}
}
ํ์ํ ๋ฐ์ดํฐ๋ฅผ ํด๋์ค์์ ๊ฐ์ ธ์ฌ ์ ์๊ฒ ๋ค.
- ์ ์ฉ
์ด์ Spring ํ๋ก์ ํธ ์์ ์ Kafka๋ฅผ ์ ์ฉ์์ผ ํ ์คํธํด ๋ณด๊ฒ ๋ค.
0. ์ค์
dependency์ ๋ค์์ ์ถ๊ฐํด์ค๋ค.
implementation 'org.springframework.kafka:spring-kafka'
application.yml์ ๋ค์์ ์ถ๊ฐํด์ค๋ค.
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
DB์๋ ํ ์คํธ ์ ํ๋ฆฌ์ผ์ด์ ์ด๋ค.
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
DataSrouce๋ฅผ ์ ์ธํ๊ณ ์คํํ์.
1. KafkaConfig
- KafkaConfig.java
@EnableKafka
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// -------------------------------------- Producer --------------------------------------
// New Topic
@Bean
public NewTopic myTopic1() {
return new NewTopic("topic_1", 1, (short) 1);
}
// New Topic 2
@Bean
public NewTopic myTopic2() {
return new NewTopic("topic_2", 1, (short) 1);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// -------------------------------------- Consumer --------------------------------------
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
ํ ์คํธ์ฉ ํ ํฝ topic_1, topic_2๋ฅผ ๋ง๋ค์๋ค. yml์ ์ค์ ํด๋ kafka address์ ๋งคํ์์ผ์ฃผ๊ณ , Producer์ Consumer ์ค์ ์ ํด๋์๋ค.
2. KafkaController
- KafkaController.java
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
@Autowired
private KafkaService kafkaService;
@PostMapping("")
public String sendMessage(@RequestParam("message") String message) {
kafkaService.sendMessage(message);
return "Send Success!";
}
}
url์ ๋จ์ํ๊ฒ /kafka๋ก ์ค์ ํ๊ณ , ์ฌ์ฉ์๊ฐ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ ค๊ณ ์๋ํ๋ API๋ฅผ ์์ฑํ๋ค.
๋งค๊ฐ๋ณ์ message๋ก ์ ๋ฌํ๋ ค๋ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๋ฉด ๋๋ค.
3. KafkaService
- KafkaService.java
@Slf4j
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//----------- Producer -----------
public void sendMessage(String message) {
log.info("[Producer] Message Send : " + message);
this.kafkaTemplate.send("topic_1", message);
}
//----------- Consumer -----------
@KafkaListener(topics = "topic_1", groupId = "myGroup")
public void consumeMyopic1(@Payload String message, ConsumerRecordMetadata metadata) {
log.info("[Consumer] Message Received : " + message);
log.info("[Consumer] Partition : " + metadata.partition());
}
}
Producer๊ฐ ๋ฉ์ธ์ง๋ฅผ sendํ๊ธฐ ์ํด KafkaTemplate์ ์ ์ธํ๋ค.
Producer์ ๊ฒฝ์ฐ message๋ฅผ ๋ฐ์์ ๋จ์ํ sendํ๊ณ , ์ค์ ํด๋์๋ ํ ํฝ(topic_1 )์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ธ๋ค.
Consumer์ ๊ฒฝ์ฐ KafkaListener ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํด์ message๋ฅผ listenํ๋ค.
๋ฉ์ธ์ง๋ฅผ ๋ฐ์ผ๋ฉด log๋ฅผ ์ถ๋ ฅํ๋๋ก ํด๋์๋ค.
- ํ ์คํธ
์ด์ ํ ์คํธ๋ฅผ ์งํํด๋ณด์.
๋ฉ์ธ์ง๋ฅผ ๋ณด๋๊ณ , Reponse 200๊ณผ ํจ๊ป Send success! ๋ฌธ๊ตฌ๋ฅผ ๋ฐ์ ์ ์์๋ค.
log๋ ์ ์ถ๋ ฅ๋ ๊ฒ์ ํ์ธํ๋ค. ๋ฉ์ธ์ง ์ ์ก์ ์ฑ๊ณตํ๋ค.
์ ์ฉ ์ฝ๋๋ ๊นํ๋ธ์ ์ ๋ก๋ํด ๋์๋ค.
https://github.com/DDonghyeo/kafka-spring
Kafka๋ฅผ ๊ณต๋ถํ๊ณ Spring์ ์ง์ ์ ์ฉ์์ผ์ ์ฌ์ฉํด๋ณด์๋ค.
๋ง์ ๋ด์ฉ์ด ์์์ง๋ง ํ์ฌ๋ ๋จ์ ๋ฉ์ธ์ง ์ ์ก๋ง ํ ์คํธ ํด๋ณด์๋ค.
๋ค์ ํฌ์คํ ๋ถํด ๊ณต์ ๋ฌธ์์ ์๋ ์ฌ๋ฌ ๋ ํผ๋ฐ์ค๋ฅผ ํ์ฉํด์ ํ ํฝ&ํํฐ์ ๊ด๋ฆฌ, ํค ์ฌ์ฉ, Producer, Consumer ์ค์ ๋ฑ์ ์ง์ ์ ์ฉํด๋ณด๊ณ , ๋์ฑ ์ค์ฉ์ ์ผ๋ก ์ฝ๋๋ฅผ ์ ํํํด ๋ณด๊ฒ ๋ค.
์ฐธ๊ณ
https://docs.spring.io/spring-kafka/reference/
https://spring.io/projects/spring-kafka
https://happy-jjang-a.tistory.com/201
https://devocean.sk.com/blog/techBoardDetail.do?ID=164096
'DevOps > Kafka' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Kafka] ์นดํ์นด ๊ฐ๋ ์ ๋ฆฌ (0) | 2023.09.04 |
---|