์ด์ ํฌ์คํ ์์ Webclient๋ฅผ ์ด์ฉํ์ฌ ์๋น์ค๋ค๊ฐ์ ํต์ ์ ๊ตฌ์ถํ๋ค.
์ด๋ฒ์๋ Kafka๋ฅผ ํตํด์ ๋ฉ์ธ์ง ํ๋ฅผ ๊ธฐ์ฉํ์ฌ ์๋น์ค๋ค ๊ฐ์ ํต์ ํ๊ฒฝ์ ๊ตฌ์ถํด๋ณด๋ ค๊ณ ํ๋ค.
kafka์ ๋ํ ์์ ํฌ์คํ ์ ์ฐธ๊ณ ํด์ ๊ตฌ์ถํ๋ค.
ํ์ฌ ๋ด ์๋น์ค๋ app-service ์ user-service๊ฐ ์๋๋ฐ,
app-service๋ฅผ ํตํด ์ฌ์ฉ์๊ฐ ์์ ์ ์ฑ์ ์๋ฒ์ ๋ฑ๋กํ ์ ์๋ ๊ธฐ๋ฅ์ ๊ตฌํํด ๋์๋ค.
ํ์๋ ์ง๊ธ ์ฌ์ฉ์๊ฐ ์ฑ์ ๋ฑ๋กํ ๋, User ์ํฐํฐ์ ์๋ appCount๋ฅผ +1ํด์ฃผ๋ ์๋น์ค๋ฅผ ๊ตฌํํ๋ ค๊ณ ํ๋ค.
0. Depdency
๋ค์๊ณผ ๊ฐ์ด ์์กด์ฑ์ ์ถ๊ฐํ๋ฉด kafka๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
1. Producer
๋จผ์ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋ Producer์ชฝ์ ์์๋ณด๊ฒ ๋ค.
1-1. Config
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// New Topic
@Bean
public NewTopic userTopic() {
return new NewTopic("user_topic", 1, (short) 1);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
์๋ก์ด ํ ํฝ์ ์์ฑํ๋ ๋ฐฉ๋ฒ์ ์ฌ๋ฌ๊ฐ์ง๊ฐ ์๋ค.
public NewTopic(String name, int numPartitions, short replicationFactor) {
...
}
public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
...
}
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
...
}
์ฌ๋ฌ ๋ฐฉ๋ฒ์ผ๋ก ํ ํฝ์ ์ด๋ฆ, ํํฐ์ , ๋ณต์ ์ธ์๋ฅผ ์ ๋ฌํ๋ฉด ๋๊ฒ ๋ค.
user-service๊ฐ subscribe ํ ํ ํฝ์ ์ด๋ฆ์ user_topic์ผ๋ก ์ ํ๋ค.
(ํ ํฝ์ subscribeํ๋ consumer์ชฝ์์ ์์ฑํ๋๊ฒ ๋ง๋๊ฑฐ ๊ฐ๋ค.)
ํ์ฌ ์ง๊ธ app-service ์ธ์๋ user-service์ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ผ ์ผ์ด ์๋ค๋ฉด, ํด๋น ํ ํฝ์ผ๋ก ๋ฉ์ธ์ง๋ฅผ publishํ๋ฉด ๋๊ฒ ๋ค.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
์ค๊ฐ์ Serializer๋ฅผ ์ค์ ํ๋ ๋ถ๋ถ์ด ์๋๋ฐ, ์ด๋ Producer๊ฐ ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ ๋ ๋ฉ์ธ์ง๋ฅผ ์ง๋ ฌํํ์ฌ ๋ณด๋ด๋๋ฐ, ์ด ๋ฉ์ธ์ง์ ๋ํ ํํ๋ฅผ ์ค์ ํ๋ ๊ฒ์ผ๋ก ๋ณด๋ฉด ๋๊ฒ ๋ค.
๋ฐ๋๋ก Consumer๋ Deserializer๋ฅผ ์ฌ์ฉํ๋๋ฐ, Producer์ ํด๋์ค๋ฅผ ๋ง์ถ๋ฉด ๋๊ฒ ๋ค.
Key์ Value ๋ชจ๋ String์ผ๋ก ์ ๋ฌํ ๊ฒ์ด๊ธฐ ๋๋ฌธ์, Serializer๋ฅผ String์ผ๋ก ์ ํ๋ค.
- application.yml
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "producerGroupId"
1-2. Service
@Slf4j
@RequiredArgsConstructor
@Service
public class KafkaService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessageUserTopic(String message) {
log.info("[ KafkaProducer ] Send Message : " + message);
kafkaTemplate.send("user_topic", message);
}
}
Producer๊ฐ ๋ app-service์์๋ user_topic์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ๋ ์๋น์ค๋ฅผ ๋ง๋ค์๋ค.
kafkaService.sendMessageUserTopic("userAppCountPlus - " + appRequestDto.getUserId());
์๋น์ค ๋จ์์ kafkaService๋ฅผ ํธ์ถํ์ฌ ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ๋ ๋ก์ง์ผ๋ก ๊ตฌํํ๋ค.
KafkaTemplate์ ์ ๋ค๋ฆญ์คํ์ K, V๋ ๊ฐ๊ฐ ํ ํฝ์ ํ์ ๊ณผ ๋ฉ์ธ์ง์ ํ์ ์ ๋ช ์ํ๋ฉด ๋๋ค.
ํ์ฌ๋ ํ ์คํธ์ฉ ํ๋ก์ ํธ๋ฅผ ๊ตฌ์ถํ๋ ์ค์ด๋ฏ๋ก, ๊ฐ๋จํ String์ ์ ์กํ์ง๋ง ์ค์ ํ๋ก์ ํธ์์๋ ๊ฐ์ฒด๋ฅผ ๋ณด๋ด๊ฒ๋ ์๋ ์๊ฒ ๋ค.
2. Consumer
์ด์ consumer์ชฝ์ ์ดํด๋ณด๊ฒ ๋ค. ๋ด ํ๋ก์ ํธ์์๋ user-service๊ฐ ๋๊ฒ ๋ค.
2-1. Config
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
ConcurrentKafkaListenerContainerFactory์์ concurrency๋ ์์ฑํ consumer ์๋ฅผ ๋ปํ๋ค.
ํด๋น Factory๋ AbstractKafkaListenerContainerFactory๋ฅผ ์์ํ๋๋ฐ,
ํด๋น ํฉํ ๋ฆฌ์๋ ์ค์ ํ ์ ์๋ ๊ฐ๋ค์ด ๋๋ฌด ๋ง๋ค. ๐ต๐ซ
์ฌ๋งํ๋ฉด ์ธ์ธํ๊ฒ ๋ฏ์ด๋ณด๋ฉด์ ์ด๋ค ์ค์ ๋ค์ธ์ง ํ๋์ฉ ์ดํด๋ณด๋๋ฐ, ์ด๊ฑด ์ดํด๋ณด๊ธฐ์ ๋๋ฌด ๋ง๋ค. .
๋์ค์ ๊ธฐํ๊ฐ ๋๋ฉด ๊น๊ฒ ํ๊ณ ๋ค์ด ๋ณด์ ๐ซ
์ด AbstractKafkaListenerContainerFactory์๋ ContainerProperties๊ฐ ์๋๋ฐ, (.getContainerProperties()๋ก ํ๋ฒ ๊ฐ์ ธ์์ผ ํ๋ค.) ์ด ํด๋์ค๋ Listner์ Container์ ๋ํ ์ค์ ์ ํ ์ ์๋ค.
์ฌ๊ธฐ์ ๋๋ consumer๊ฐ record๋ฅผ ๊ธฐ๋ค๋ฆด ๋ timeout ์ 3000์ผ๋ก ์ค์ ํด์ฃผ์๋ค.
๊ทธ ์ธ IP ์ฃผ์์ Deserializer๋ Producer์ ๋ง์ถฐ์ฃผ์๋ค.
2-2. Listner
@Slf4j
@Service
public class KafkaConsumer {
@KafkaListener(topics = "user_topic", groupId = "MSAGroupId")
public void consumeUserTopic(@Payload String message, ConsumerRecordMetadata metadata) {
log.info("[ KafkaConsumer] Message Received : " + message);
log.info("[ KafkaConsumer] Partition : " + metadata.partition());
}
}
Consumer๋ @KafkaListner ์ด๋ ธํ ์ด์ ์ผ๋ก ํ ํฝ์ ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ ์ ์๋ค.
ํด๋นํ๋ ํ ํฝ๊ณผ ๊ทธ๋ฃนid๋ฅผ ์ง์ ํด๋๋ฉด ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ ์ ์๋ค.
ํ์ฌ ๋๋ ๋ฉ์ธ์ง๋ฅผ ํ์ธํ๋ ๋ก๊ทธ๋ง ๋์๋ค.
ํ ์คํธ
์ด์ ์์ฒญ์ ๋ณด๋ด ๋ฉ์ธ์ง๋ฅผ ์ ์์ ์ผ๋ก ์ฃผ๊ณ ๋ฐ๋์ง ํ์ธํด๋ณด์.
app-service ์์ Producer๊ฐ ์ ์์ ์ผ๋ก ๋ฉ์ธ์ง๋ฅผ Publishํ๋ค.
user-service๋ํ ์ ์์ ์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ๊ฐ์ ธ์๋ค.
๊ฐ๋ฐ ๊ณ ์(?)๋ค๋ Kafka๋ ํผํ๋ค๋ ๋ง์ ๋ง์ด ๋ค์๋๋ฐ, ์ ๊ทธ๋ฐ์ง ์กฐ๊ธ์ ์ ๊ฒ ๊ฐ๋ค.
์ง๊ธ์ ์์ฃผ์์ฃผ ๊ธฐ๋ณธ์ ์ธ ์ธํ ๋ค๋ก๋ง ๊ตฌ๋ํ์ง๋ง, ๊ท๋ชจ๊ฐ ํฐ ์ฑ์์๋ ์ธ์ธํ ์ค์ ์ด ๋ง์ด ํ์ํ๊ฒ ๋ค.
๊ทธ๋๋ ๋ฉ์ธ์งํ๋ฅผ ์ฌ์ฉํ๊ณ , Pub/Sub๊ตฌ์กฐ๋ก ๋ฉ์ธ์ง๋ฅผ ์ฃผ๊ณ ๋ฐ๋๊ฑด ๋งค๋ ฅ์ ์ธ ๊ฒ ๊ฐ๋ค.
์ฌ๋ฌ ์๋น์ค์์ ํ์ฉ ๋ฐฉ์์ด ์ ๋ง ๋ง์ ๊ฒ ๊ฐ๋ค.
'BackEnd > Spring Cloud' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[ Spring Cloud ] Microservice๋ค ๊ฐ์ ํต์ ๊ตฌ์ถํ๊ธฐ (1) - WebClient (0) | 2024.02.14 |
---|---|
[Spring Cloud] Spring Cloud Config Server ์ค์ ๊ฐ์ Private Repository ์์ ๊ฐ์ ธ์ค๊ธฐ (0) | 2024.02.12 |
[Spring Cloud] Spring Cloud Bus (0) | 2024.02.05 |
[Spring Cloud] Actuator๋ฅผ ์ด์ฉํ Config Server ๊ธฐ๋ (0) | 2024.01.29 |
[Spring Cloud] API Gateway, Filter (1) | 2024.01.08 |