Infrastructure, Spring

Apache Kafka를 활용한 이벤트 기반 Spring Boot 마이크로서비스 구축 — Part2

Written by 개발자서동우 · 2 min read >
카프카로 스프링부트 마이크로서비스 구축하기

안녕하세요 Devloo 입니다. 🙂 이 글의 첫 번째 부분에서는 텍스트 파일의 각 줄을 TEXT-DATA Kafka 토픽에 게시하는 TextProducer 스프링 부트 마이크로서비스를 구현했습니다. 이번 글에서는 TEXT-DATA Kafka 토픽에서 텍스트 한 줄을 가져와 공백으로 나눈 후, 각 단어의 빈도를 계산하고 이를 그룹화하는 TextConsumer 스프링 부트 마이크로서비스에 대해 설명하겠습니다.

환경 설정

마이크로서비스를 구축하기 전에 작업 환경을 먼저 설정해야 합니다. 다음이 필요합니다:

  • Java Development Kit (JDK)
  • Gradle (gradle-wrapper를 사용하는 경우 시스템 전체에 gradle을 설치할 필요는 없습니다)
  • Docker
  • Apache Kafka 실행 인스턴스
  • 코드 작성 툴 (예: IntelliJ IDEA, Eclipse, VSCode)

1단계: Spring Boot 프로젝트 초기화

Spring Initializer나 선호하는 IDE를 사용하여 Spring Web (Spring Boot Web 아님), Spring Kafka, Jackson Databind 종속성을 포함한 새 Spring Boot 프로젝트를 생성합니다:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework:spring-web:6.1.2'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.1'
}

2단계: TextConsumer 구현

다음과 같이 application.properties 파일을 생성합니다:

pring.kafka.bootstrap-servers=localhost:9092
spring.kafka.properties.security.protocol=PLAINTEXT
spring.kafka.properties.sasl.mechanism=GSSAPI
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.properties.spring.json.type.mapping=wordFrequency:com.example.textconsumer.WordFrequency,wordFrequencyList:com.example.textconsumer.WordFrequencyList

Kafka 메시지를 구체적인 타입으로 직렬화하기 위해 JsonSerializerWordFrequencyWordFrequencyList 클래스의 타입 매핑을 properties 파일에 지정합니다.

추가적으로, 불용어 목록을 생성합니다:

stop-words.txt

가, 가까스로, 가령, 각, 각각, 각자, 각종, 갖고말하자면, 같다, 같이, 개의치않고, 거니와, 거바, 거의, 것, 것과 같이, 것들, 게다가, 게우다, 겨우, 견지에서, 결과에 이르다, 결국, 결론을 낼 수 있다, 겸사겸사, 고려하면, 고로, 곧, 공동으로, 과, 과연, 관계가 있다, 관계없이, 관련이 있다, 관하여, 관한, 관해서는, 구, 구체적으로, 구토하다, 그, 그들, 그때, 그래, 그래도, 그래서, 그러나, 그러니, 그러니까, 그러면, 그러므로, 그러한즉, 그런 까닭에, 그런데, 그런즉, 그럼, 그럼에도 불구하고, 그렇게 함으로써, 그렇지, 그렇지 않다면, 그렇지 않으면, 그렇지만, 그렇지않으면, 그리고, 그리하여, 그만이다, 그에 따르는, 그위에, 그저, 그중에서, 그치지 않다, 근거로, 근거하여, 기대여, 기점으로, 기준으로, 기타, 까닭으로, 까악, 까지, 까지 미치다, 까지도, 꽈당, 끙끙, 끼익, 나, 나머지는, 남들, 남짓, 너, 너희, 너희들, 네, 넷, 년, 논하지 않다, 놀라다, 누가 알겠는가, 누구, 다른, 다른 방면으로, 다만, 다섯, 다소, 다수, 다시 말하자면, 다시말하면, 다음, 다음에, 다음으로, 단지, 답다, 당신, 당장, 대로 하다, 대하면, 대하여, 대해 말하자면, 대해서, 댕그, 더구나, 더군다나, 더라도, 더불어, 더욱더, 더욱이는, 도달하다, 도착하다, 동시에, 동안, 된바에야, 된이상, 두번째로, 둘, 둥둥, 뒤따라, 뒤이어, 든간에, 들, 등, 등등, 딩동, 따라, 따라서, 따위, 따지지 않다, 딱, 때, 때가 되어, 때문에, 또, 또한, 뚝뚝, 라 해도, 령, 로, 로 인하여, 로부터, 로써, 륙, 를, 마음대로, 마저, 마저도, 마치, 막론하고, 만 못하다, 만약, 만약에, 만은 아니다, 만이 아니다, 만일, 만큼, 말하자면, 말할것도 없고, 매, 매번, 메쓰겁다, 몇, 모, 모두, 무렵, 무릎쓰고, 무슨, 무엇, 무엇때문에, 물론, 및, 바꾸어말하면, 바꾸어말하자면, 바꾸어서 말하면, 바꾸어서 한다면, 바꿔 말하면, 바로, 바와같이, 밖에 안된다, 반대로, 반대로 말하자면, 반드시, 버금, 보는데서, 보다더, 보드득, 본대로, 봐, 봐라, 부류의 사람들, 부터, 불구하고, 불문하고, 붕붕, 비걱거리다, 비교적, 비길수 없다, 비로소, 비록, 비슷하다, 비추어 보아, 비하면, 뿐만 아니라, 뿐만아니라, 뿐이다, 삐걱, 삐걱거리다, 사, 삼, 상대적으로 말하자면, 생각한대로, 설령, 설마, 설사, 셋, 소생, 소인, 솨, 쉿, 습니까, 습니다, 시각, 시간, 시작하여, 시초에, 시키다, 실로, 심지어, 아, 아니, 아니나다를가, 아니라면, 아니면, 아니었다면, 아래윗, 아무거나, 아무도, 아야, 아울러, 아이, 아이고, 아이구, 아이야, 아이쿠, 아하, 아홉, 안 그러면, 않기 위하여, 않기 위해서, 알 수 있다, 알았어, 앗, 앞에서, 앞의것, 야, 약간, 양자, 어, 어기여차, 어느, 어느 년도, 어느것, 어느곳, 어느때, 어느쪽, 어느해, 어디, 어때, 어떠한, 어떤, 어떤것, 어떤것들, 어떻게, 어떻해, 어이, 어째서, 어쨋든, 어쩔수 없다, 어찌, 어찌됏든, 어찌됏어, 어찌하든지, 어찌하여, 언제, 언젠가, 얼마, 얼마 안 되는 것, 얼마간, 얼마나, 얼마든지, 얼마만큼, 얼마큼, 엉엉, 에, 에 가서, 에 달려 있다, 에 대해, 에 있다, 에 한하다, 에게, 에서, 여, 여기, 여덟, 여러분, 여보시오, 여부, 여섯, 여전히, 여차, 연관되다, 연이서, 영, 영차, 옆사람, 예, 예를 들면, 예를 들자면, 예컨대, 예하면, 오, 오로지, 오르다, 오자마자, 오직, 오호, 오히려, 와, 와 같은 사람들, 와르르, 와아, 왜, 왜냐하면, 외에도, 요만큼, 요만한 것, 요만한걸, 요컨대, 우르르, 우리, 우리들, 우선, 우에 종합한것과같이, 운운, 월, 위에서 서술한바와같이, 위하여, 위해서, 윙윙, 육, 으로, 으로 인하여, 으로서, 으로써, 을, 응, 응당, 의, 의거하여, 의지하여, 의해, 의해되다, 의해서, 이, 이 되다, 이 때문에, 이 밖에, 이 외에, 이 정도의, 이것, 이곳, 이때, 이라면, 이래, 이러이러하다, 이러한, 이런, 이럴정도로, 이렇게 많은 것, 이렇게되면, 이렇게말하자면, 이렇구나, 이로 인하여, 이르기까지, 이리하여, 이만큼, 이번, 이봐, 이상, 이어서,이었다, 이와 같다, 이와 같은, 이와 반대로, 이와같다면, 이외에도, 이용하여, 이유만으로, 이젠, 이지만, 이쪽, 이천구, 이천육, 이천칠, 이천팔, 인 듯하다, 인젠, 일, 일것이다, 일곱, 일단, 일때, 일반적으로, 일지라도, 임에 틀림없다, 입각하여, 입장에서, 잇따라, 있다, 자, 자기, 자기집, 자마자, 자신, 잠깐, 잠시, 저, 저것, 저것만큼, 저기, 저쪽, 저희, 전부, 전자, 전후, 점에서 보아, 정도에 이르다, 제, 제각기, 제외하고, 조금, 조차, 조차도, 졸졸, 좀, 좋아, 좍좍, 주룩주룩, 주저하지 않고, 줄은 몰랏다, 줄은모른다, 중에서, 중의하나, 즈음하여, 즉, 즉시, 지든지, 지만, 지말고, 진짜로, 쪽으로, 차라리, 참, 참나, 첫번째로, 쳇, 총적으로, 총적으로 말하면, 총적으로 보면, 칠, 콸콸, 쾅쾅, 쿵, 타다, 타인, 탕탕, 토하다, 통하여, 툭, 퉤, 틈타, 팍, 팔, 퍽, 펄렁, 하, 하게될것이다, 하게하다, 하겠는가, 하고 있다, 하고있었다, 하곤하였다, 하구나, 하기 때문에, 하기 위하여, 하기는한데, 하기만 하면, 하기보다는, 하기에, 하나, 하느니, 하는 김에, 하는 편이 낫다, 하는것도, 하는것만 못하다, 하는것이 낫다, 하는바, 하더라도, 하도다, 하도록시키다, 하도록하다, 하든지, 하려고하다, 하마터면, 하면 할수록, 하면된다, 하면서, 하물며, 하여금, 하여야, 하자마자, 하지 않는다면, 하지 않도록, 하지마, 하지마라, 하지만, 하하, 한 까닭에, 한 이유는, 한 후, 한다면, 한다면 몰라도, 한데, 한마디, 한적이있다, 한켠으로는, 한항목, 할 따름이다, 할 생각이다, 할 줄 안다, 할 지경이다, 할 힘이 있다, 할때, 할만하다, 할망정, 할뿐, 할수있다, 할수있어, 할줄알다, 할지라도, 할지언정, 함께, 해도된다, 해도좋다, 해봐요, 해서는 안된다, 해야한다, 해요, 했어요, 향하다, 향하여, 향해서, 허, 허걱, 허허, 헉, 헉헉, 헐떡헐떡, 형식으로 쓰여, 혹시, 혹은, 혼자, 훨씬, 휘익, 휴, 흐흐, 흥, 힘입어

다음 클래스를 생성하세요:

WordFrequency 클래스
package com.example.textconsumer;

public record WordFrequency(String word, Long count) {
}
WordFrequencyList 클래스
package com.example.textconsumer;

import java.util.List;

public record WordFrequencyList(List<WordFrequency> frequencies) {
}

WordFrequencyWordFrequencyList 클래스는 타입 안전성을 유지하면서 필요한 Kafka 메시지를 전달하기 위해 생성되었습니다. 이 클래스들은 프로듀서와 컨슈머 마이크로서비스 모두에서 사용됩니다.

WordCounter 클래스
package com.example.textconsumer;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class WordCounter {

    private Set<String> stopWords;

    @Value("${stopwords.file:stop-words.txt}")
    private String stopWordsFile;

    @PostConstruct
    public void init() {
        stopWords = loadStopWords(stopWordsFile);
    }

    private Set<String> loadStopWords(String fileName) {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(
                Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream(fileName)), StandardCharsets.UTF_8))) {
            return Arrays.stream(reader.readLine().split(",\\s*"))
                         .collect(Collectors.toSet());
        } catch (IOException | NullPointerException e) {
            throw new RuntimeException("Failed to load stop words from file: " + fileName, e);
        }
    }

    public Stream<String> splitText(String text) {
        return Arrays.stream(text.replaceAll("\\p{P}", "").toLowerCase().split("\\s+"));
    }

    public Stream<WordFrequency> groupByWords(Stream<String> words) {
        return words.filter(this::isNotStopWord)
                    .collect(Collectors.groupingBy(word -> word, Collectors.counting()))
                    .entrySet()
                    .stream()
                    .map(e -> new WordFrequency(e.getKey(), e.getValue()))
                    .sorted(Comparator.comparing(WordFrequency::count).reversed());
    }

    private boolean isNotStopWord(String word) {
        return !stopWords.contains(word);
    }
}

WordCounter 클래스의 핵심 기능은 주어진 텍스트에서 일반적인 불용어를 제외하고 단어 빈도를 효율적으로 계산하는 것입니다. groupByWords 메소드는 불용어를 필터링하고, 나머지 단어들을 빈도별로 그룹화하여 내림차순으로 정렬합니다. 이는 자연어 처리 작업에서 의미 있는 단어들이 최종 빈도 계산에 기여할 수 있도록 보장합니다.

TextConsumer 클래스
package com.example.textconsumer;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Stream;

@Component
public class TextConsumer {

    private final Logger logger = LoggerFactory.getLogger(getClass().getName());

    private static final String INPUT_TOPIC = "TEXT-DATA";
    private static final String OUTPUT_TOPIC = "AGGREGATE-DATA";
    private final WordCounter wordCounter;
    private final KafkaTemplate<String, WordFrequencyList> kafkaTemplate;

    public TextConsumer(WordCounter wordCounter, KafkaTemplate<String, WordFrequencyList> kafkaTemplate) {
        this.wordCounter = wordCounter;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Autowired
    public void configureTopic(KafkaAdmin kafkaAdmin) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(OUTPUT_TOPIC, 1, (short) 1));
    }

    @KafkaListener(topics = INPUT_TOPIC, groupId = "TEXT_CONSUMERS")
    public void consumeMessage(String message) {
        Stream<WordFrequency> group = wordCounter.groupByWords(wordCounter.splitText(message));
        kafkaTemplate.send(OUTPUT_TOPIC, "KEY-1", new WordFrequencyList(group.toList()));
    }
}

TextConsumer 클래스의 주요 목적은 Kafka 토픽에서 텍스트 메시지를 소비하고, 이를 WordCounter 인스턴스를 사용해 처리한 후, 결과 단어 빈도를 다른 Kafka 토픽에 게시하는 것입니다. 이를 위해 @KafkaListener를 사용하여 consumeMessage 메서드를 어노테이션하고, 수신된 메시지를 처리한 후, WordCounter로 처리한 결과를 KafkaTemplate을 통해 출력 Kafka 토픽으로 보냅니다. 또한, 초기화 시 출력 토픽이 생성되거나 수정되도록 보장합니다.

새로운 Kafka 메시지를 TEXT-DATA 토픽에서 가져오자마자, WordFrequencyList 데이터를 AGGREGATE-DATA라는 또 다른 Kafka 토픽으로 보냅니다. 이는 TextConsumer 앱이 Producer 이자 Consumer 마이크로서비스라는 것을 의미합니다. AGGREGATE-DATA 토픽에 메시지를 게시함으로써 또 다른 흐름을 시작하게 됩니다. TextAggregate 마이크로서비스에 대해서는 세 번째 단계에서 다루겠습니다.

TextConsumerApplication 클래스
package com.example.textconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TextConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(TextConsumerApplication.class, args);
    }
}

TextConsumer 마이크로서비스는 소비자와 생산자 역할을 모두 수행합니다.

클래스와 리소스를 생성한 후, Kafka 프로듀서 마이크로서비스가 정상적으로 작동하는지 확인하려면 다음 명령을 실행하세요:

./gradlew bootRun

텍스트 프로듀서 마이크로서비스로 이동하여 텍스트 파일을 업로드합니다. 업로드가 완료되면 텍스트 내용이 한 줄씩 읽혀서 TEXT-DATA로 전송됩니다. 이 텍스트 메시지는 TextConsumerconsumeMessage 메서드를 통해 소비할 수 있습니다. 소비 서비스가 정상적으로 작동하면 패키징하여 도커화할 수 있습니다.

단일 JAR 파일 빌드:

./gradlew assemble

Dockerfile 생성:

FROM openjdk:21-slim
WORKDIR /app
COPY build/libs/text-consumer-1.0.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]

이미지 빌드:

docker build . -t text-consumer

도커 컨테이너 실행:

docker run -it text-consumer

3단계: TextAggregator Consumer 마이크로서비스 구현

단어 빈도를 집계하기 위해 새로운 Spring Boot 마이크로서비스를 구축해야 합니다.

집계 마이크로서비스의 구현을 준비하려면 위에서 학습한 환경 설정, 1단계, application.properties 파일 생성 단계를 반복하세요.

WordFrequencyConsumer 클래스
package com.example.textaggregator;

import com.example.textconsumer.WordFrequency;
import com.example.textconsumer.WordFrequencyList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class WordFrequencyConsumer {

    private static final String TOPIC = "AGGREGATE-DATA";
    private final Logger logger = LoggerFactory.getLogger(getClass().getName());

    private List<WordFrequency> inMemoryFrequencyList = List.of();
    private final AtomicInteger counter = new AtomicInteger(0);

    @KafkaListener(topics = TOPIC, groupId = "AGGREGATE_CONSUMERS")
    public void consumeMessage(WordFrequencyList frequencyList) {
        inMemoryFrequencyList = Stream
                .concat(inMemoryFrequencyList.stream(), frequencyList.frequencies().stream())
                .collect(Collectors.groupingBy(WordFrequency::word, Collectors.summingLong(WordFrequency::count)))
                .entrySet()
                .stream()
                .map(e -> new WordFrequency(e.getKey(), e.getValue()))
                .sorted(Comparator.comparing(WordFrequency::count).reversed())
                .collect(Collectors.toList());

        if (counter.incrementAndGet() % 5 == 0) {
            inMemoryFrequencyList.forEach(wordFrequency -> logger.info("In memory word frequency: {}", wordFrequency));
        }
    }
}

WordFrequencyConsumer 클래스는 @Component로 어노테이션된 스프링 관리 컴포넌트입니다. 이 클래스에는 @KafkaListener로 어노테이션된 consumeMessage 메서드가 포함되어 있으며, 이는 TOPIC 상수에 지정된 Kafka 토픽을 수신합니다.

consumeMessage 메서드는 수신된 WordFrequencyList 객체를 처리하고, 단어 빈도를 집계하여 메모리에 저장된 빈도 리스트를 업데이트합니다. 또한, 메시지를 5개 받을 때마다 메모리 내 단어 빈도를 로그로 출력합니다.

앞서 설명한 명령을 반복하여 집계 마이크로서비스를 빌드하고 패키징할 수 있습니다.

세 가지 마이크로서비스(TextConsumer, TextProducer, TextAggregator)를 모두 구현하고 실행한 후, 첫 번째 글에서 생성한 TextProducer 마이크로서비스의 http://localhost:8080/upload 엔드포인트에 대용량 텍스트 파일을 업로드하여 큰 텍스트 콘텐츠를 집계할 수 있습니다.

이번 시간에는 Apache Kafka를 활용한 이벤트 기반 Spring Boot 마이크로서비스 구축하는 방법에 대해 알아보았습니다. 파트1, 2로 구성되어 글이 꽤 길었는데 끝까지 읽어주셔서 정말 감사합니다. (_ _) 궁금하신 점은 편하게 댓글 남겨주세요 !! ㅎㅎ 🙂

Written by 개발자서동우
안녕하세요! 저는 기술 분야에서 활동 중인 개발자 서동우입니다. 명품 플랫폼 (주)트렌비의 창업 멤버이자 CTO로 활동했으며, AI 기술회사 (주)헤드리스의 공동 창업자이자 CTO로서 역할을 수행했습니다. 다양한 스타트업에서 일하며 회사의 성장과 더불어 비즈니스 상황에 맞는 기술 선택, 개발팀 구성 및 문화 정착에 깊은 경험을 쌓았습니다. 개발 관련 고민은 언제든지 편하게 연락주세요 :) https://linktr.ee/dannyseo Profile

Leave a Reply

Your email address will not be published. Required fields are marked *