Infrastructure, Spring

Apache Kafka를 활용한 이벤트 기반 Spring Boot 마이크로서비스 구축 — Part1

Written by 개발자서동우 · 2 min read >
카프카로 스프링부트 마이크로서비스 구축하기

안녕하세요 Devloo 입니다 🙂 . 오늘날 이벤트 중심 마이크로서비스는 혁신적인 변화를 일으키고 있습니다. 이러한 민첩하고 독립적인 모듈들은 이벤트를 통해 원활하게 통신하여 확장성, 적응성, 그리고 유연성을 촉진합니다. 이 혁신의 중심에는 메시지 큐, 특히 분산 이벤트 스트리밍의 강자인 Apache Kafka가 자리 잡고 있으며, 이는 현대 아키텍처의 핵심 역할을 하고 있습니다.

이벤트 주도 아키텍처 이해하기

이벤트 주도 아키텍처(EDA)는 소프트웨어 시스템 내의 구성 요소들이 이벤트를 생성하고 소비하며 서로 통신하는 설계 패턴입니다. 이벤트는 시스템 내에서 중요한 사건을 나타내며 다른 구성 요소의 동작을 유발할 수 있습니다. 이 접근 방식은 느슨하게 결합된, 매우 확장 가능한 시스템을 가능하게 하여 실시간 변화에 대응할 수 있습니다.

Apache Kafka 소개

Apache Kafka는 대용량 데이터와 실시간 스트림을 처리할 수 있는 분산형 내결함성 메시징 시스템입니다. Kafka는 게시-구독 모델을 사용하여, 생산자가 토픽에 메시지를 게시하고 소비자가 이를 구독하여 메시지를 받습니다. Kafka의 내구성 있는 저장소와 복제 메커니즘은 데이터 신뢰성과 내결함성을 보장합니다.

Kubernetes나 로컬 환경에서 Apache Kafka를 쉽게 배포하는 간단한 가이드를 확인해보세요.

환경 설정

마이크로서비스를 구축하기 전에 작업 환경을 설정해야 합니다. 다음 도구들이 필요합니다:

  • Java Development Kit (JDK)
  • Gradle (gradle-wrapper를 사용하는 경우 시스템 전체에 Gradle을 설치할 필요가 없습니다)
  • Docker
  • 실행 중인 Apache Kafka 인스턴스
  • 코드 작성 툴 (예: IntelliJ IDEA, Eclipse, VSCode)

1단계: 스프링 부트 프로젝트 설정

먼저, Spring Initializr를 사용하여 새로운 스프링 부트 프로젝트를 생성하고 필요한 의존성을 추가하세요. 웹 관련 기능을 관리하기 위해 “Spring Web”을, 개발 효율성을 높이기 위해 “Spring Boot DevTools”를 선택합니다. 새로운 문자열 템플릿과 자바 가상 스레드의 원활한 호환성을 위해 Java 21을 사용합니다.

프로젝트 파일을 생성하고 저장한 후, 사용하시는 개발 툴(IDE)에서 파일을 엽니다.

프로젝트 구현을 시작하기 전에, application.properties 파일에서 스프링 부트 가상 스레드를 활성화하여 최적의 성능을 보장합니다. 이 설정은 default thread executor(기본 스레드 실행기)에서 virtual thread(가상 스레드)를 사용하여 HTTP 연결을 처리할 수 있게 합니다.

spring.threads.virtual.enabled=true

2단계: TextProducer 마이크로서비스 구현

이 마이크로서비스에서는 텍스트 데이터를 업로드하고 이를 TEXT_DATA라는 이름의 Apache Kafka 토픽으로 게시합니다.

Apache Kafka 설정을 application.properties에 추가하세요.

# src/main/resources/application.properties
spring.kafka.bootstrap-servers=localhost:9092

필요한 구성 요소를 만듭니다:

TextDataProducer 클래스
package com.example.kafkaapp;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import java.util.stream.Stream;

import static java.lang.StringTemplate.STR;

@Component
public class TextDataProducer {
    private static final Logger logger = Logger.getLogger(TextDataProducer.class.getName());

    // 토픽 설정을 위한 상수
    private static final int PARTITION_COUNT = 8;
    private static final String TOPIC = "TEXT-DATA";
    private static final short REPLICATION_FACTOR = 1;

    private final KafkaTemplate<String, String> kafkaTemplate;

    public TextDataProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Autowired
    public void configureTopic(KafkaAdmin kafkaAdmin) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(TOPIC, PARTITION_COUNT, REPLICATION_FACTOR));
    }

    private void sendTextMessage(String text, int lineIndex) {
        if (text == null || text.isEmpty()) {
            return;
        }
        // 텍스트 메시지를 라인 인덱스에 따라 파티션에 분배하여 토픽으로 보냅니다.
        kafkaTemplate.send(TOPIC, "KEY-" + (lineIndex % PARTITION_COUNT), text);
    }

    public void sendContentOf(File file) {
        Instant before = Instant.now();
        try (Stream<String> lines = Files.lines(file.toPath())) {
            AtomicInteger counter = new AtomicInteger();
            lines.forEach(line -> sendTextMessage(line, counter.getAndIncrement()));
            Instant after = Instant.now();
            Duration duration = Duration.between(before, after);
            logger.info(STR."스트리밍된 라인 수: \{counter.get()}개, 소요 시간: \{duration.toMillis()} 밀리초");
        } catch (IOException e) {
            throw new RuntimeException("파일 읽기 오류", e);
        }
    }
}
  • 이 클래스는 텍스트 데이터를 생성하고 이를 TEXT-DATA라는 이름의 Kafka 토픽으로 전송합니다.
  • @Component로 주석을 달아 스프링에서 관리되는 컴포넌트로 만듭니다.
  • PARTITION_COUNT, TOPIC, REPLICATION_FACTOR와 같은 토픽 구성을 위한 상수를 가지고 있습니다.
  • configureTopic 메서드는 KafkaAdmin을 사용하여 Kafka 토픽을 설정합니다. 지정된 설정으로 토픽을 생성하거나 수정합니다.
  • sendTextMessage 메서드는 라인 인덱스에 따라 메시지를 파티션에 분배하여 Kafka 토픽으로 텍스트 메시지를 전송합니다.
  • sendContentOf(File file) 메서드는 주어진 파일의 내용을 한 줄씩 읽어 TEXT-DATA라는 Kafka 토픽으로 각 라인을 메시지로 전송합니다. 이 메서드는 TextDataProducer 클래스에서 가장 중요한 부분입니다.
TextDataController 클래스
package com.example.kafkaapp;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

@RestController
public class TextDataController {

    private final TextDataProducer producer;

    public TextDataController(TextDataProducer producer) {
        this.producer = producer;
    }

    @PostMapping("/upload")
    public ResponseEntity<String> uploadTextFile(@RequestParam("file") MultipartFile file) {
        try {
            Path tempFile = Files.createTempFile(file.getOriginalFilename(), null);
            file.transferTo(tempFile);
            Thread.ofVirtual().start(() -> producer.sendContentOf(tempFile.toFile()));
            return ResponseEntity.ok(tempFile.toString());
        } catch (IOException e) {
            return ResponseEntity
            .status(HttpStatus.INTERNAL_SERVER_ERROR)
            .body("파일 업로드 중 오류가 발생했습니다.");
        }
    }
}
  • 이 클래스는 HTTP 요청을 처리하는 스프링 REST 컨트롤러입니다.
  • 생성자를 통해 TextDataProducer 빈을 주입받습니다.
  • uploadTextFile 메서드는 /upload 경로로 오는 POST 요청을 처리합니다. 이 메서드는 multipart 파일을 받아 업로드합니다.
  • 임시 파일을 생성하고, 업로드된 파일의 내용을 임시 파일에 저장한 후, TextDataProducer를 사용하여 임시 파일의 내용을 Kafka로 전송합니다. 마지막으로 임시 파일의 경로를 반환합니다.
TextProducerApplication 클래스
package com.example.kafkaapp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TextProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(TextProducerApplication.class, args);
    }
}

이 클래스는 스프링 부트 애플리케이션의 진입점입니다.

  • @SpringBootApplication 어노테이션은 @Configuration, @EnableAutoConfiguration, @ComponentScan을 결합한 것입니다.
  • main 메서드는 스프링 부트 애플리케이션을 시작합니다.

텍스트 프로듀서를 구현한 후에는 다음 명령어로 프로젝트를 실행할 수 있습니다:

./gradlew bootRun

선호하는 REST 클라이언트(예: Postman 또는 SwaggerUI)를 사용하여 http://localhost:8080/upload 엔드포인트로 예제 텍스트 파일을 업로드하세요. 또는 IntelliJ IDEA에서 다음과 같은 원시 HTTP 명령을 사용하여 파일을 업로드할 수 있습니다:

###텍스트와 파일 필드가 있는 폼 전송
POST http://localhost:8080/upload HTTP/1.1
Content-Type: multipart/form-data; boundary=boundary

--boundary
Content-Disposition: form-data; name="file"; filename="shakespeares.txt"

// 'shakespeares.txt' 파일이 업로드됩니다
< /Users/mustafaguc/Desktop/kafka-demo-content/shakespeares.txt

--boundary

지금까지 우리는 텍스트 파일을 한 줄씩 읽어 TEXT-DATA Kafka 토픽으로 전송할 수 있는 TextDataProducer를 만들었습니다. 또한 텍스트 데이터 스트리밍을 시작하기 위해 파일을 업로드하는 TextDataController REST 컨트롤러도 구현했습니다.

3단계: TextData Producer 도커라이징

마이크로서비스를 Docker 이미지로 패키징하기 위한 Dockerfile을 작성합니다.

FROM openjdk:21-slim
WORKDIR /app
COPY build/libs/text-producer-1.0.jar app.jar
EXPOSE 8080

ENTRYPOINT ["java", "-jar", "app.jar"]

도커 이미지를 빌드하기 전에 Gradle을 통해 마이크로서비스를 먼저 빌드해야 합니다:

./gradlew assemble

JAR 파일을 성공적으로 생성한 후, 프로젝트 루트 폴더에서 Docker 이미지를 빌드할 수 있습니다:

docker build . -t text-producer

이미지가 성공적으로 빌드되면 다음 명령어를 실행하여 도커 이미지를 실행합니다:

docker run -it -p 8080:8080 text-producer

위 명령어를 실행하면 다음과 유사한 로그가 나타납니다:

c.e.kafkaapp.TextProducerApplication     : Starting TextProducerApplication using Java 21.0.1 with PID 4871 (/Users/mustafaguc/projects/java/text-producer/build/classes/java/main started by mustafaguc in /Users/mustafaguc/projects/java/text-producer)
c.e.kafkaapp.TextProducerApplication     : No active profile set, falling back to 1 default profile: "default"
o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8080 (http)
o.apache.catalina.core.StandardService   : Starting service [Tomcat]
o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.17]
o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 307 ms
o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
........
o.a.k.clients.admin.AdminClientConfig    : These configurations '[sasl.jaas.config, idompotence.enabled]' were supplied but are not used yet.
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.6.1
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 5e3c2b738d253ff5
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1708262321726
o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8080 (http) with context path ''
c.e.kafkaapp.TextProducerApplication     : Started TextProducerApplication in 0.764 seconds (process running for 0.976)

현재까지 프로듀서 부분을 구현했습니다. 컨슈머와 애그리게이터 마이크로서비스는 다음 글에서 다뤄볼 예정입니다.

끝까지 읽어주셔서 정말 감사합니다. (_ _) 궁금하신 사항은 편하게 댓글로 남겨주세요 🙂

Written by 개발자서동우
안녕하세요! 저는 기술 분야에서 활동 중인 개발자 서동우입니다. 명품 플랫폼 (주)트렌비의 창업 멤버이자 CTO로 활동했으며, AI 기술회사 (주)헤드리스의 공동 창업자이자 CTO로서 역할을 수행했습니다. 다양한 스타트업에서 일하며 회사의 성장과 더불어 비즈니스 상황에 맞는 기술 선택, 개발팀 구성 및 문화 정착에 깊은 경험을 쌓았습니다. 개발 관련 고민은 언제든지 편하게 연락주세요 :) https://linktr.ee/dannyseo Profile

Leave a Reply

Your email address will not be published. Required fields are marked *