Spring Boot Kafka 연동 실습 – 설정부터 메시지 송수신까지 완전 가이드


Spring Boot Kafka 연동은 백엔드 개발자라면 한 번은 반드시 거쳐야 할 실습 과제입니다. 마이크로서비스 아키텍처가 보편화되면서 서비스 간 비동기 통신 수단으로 Kafka를 도입하는 사례가 폭발적으로 늘고 있기 때문입니다. 처음 연동을 시도할 때 설정 파일 한 줄 차이로 연결이 안 되거나, Producer는 보내는데 Consumer가 받지 못하는 상황에 막히기 쉽습니다. 이 글에서는 로컬 환경 구성부터 실제 메시지 송수신 코드, 에러 핸들링까지 단계별로 정리합니다.


목차

  1. 실습 환경 준비 – Docker로 Kafka 로컬 구성
  2. Spring Boot 프로젝트 설정 – 의존성과 application.yml
  3. Kafka Producer 구현 – KafkaTemplate으로 메시지 전송
  4. Kafka Consumer 구현 – @KafkaListener로 메시지 수신
  5. 실전 예제 – 주문 이벤트 파이프라인 구축
  6. 에러 핸들링과 운영 팁 – 안정적인 Kafka 연동 전략

1. 실습 환경 준비 – Docker로 Kafka 로컬 구성

왜 Docker로 시작하는가

Kafka를 로컬에 직접 설치하려면 Java 런타임, ZooKeeper(또는 KRaft), Kafka 바이너리를 각각 설정해야 합니다. 이 과정에서 환경 변수 충돌이나 포트 충돌로 많은 시간을 낭비하기 쉽습니다. Docker Compose를 사용하면 docker compose up 한 줄로 Kafka 브로커와 ZooKeeper를 동시에 실행할 수 있어 실습 환경을 가장 빠르게 구성할 수 있습니다.

아래는 실습에 사용할 docker-compose.yml 파일입니다. Kafka 3.x 기준으로 작성했으며, Kafka UI도 함께 포함해 메시지 흐름을 시각적으로 확인할 수 있습니다.

yaml

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # 로컬호스트에서 접근할 수 있도록 리스너 설정
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8989:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

실행 및 확인

bash

# Kafka 클러스터 실행
docker compose up -d

# 컨테이너 상태 확인
docker compose ps

# Kafka 브로커 로그 확인 (연결 이슈 디버깅 시)
docker compose logs kafka -f

실행 후 브라우저에서 http://localhost:8989에 접속하면 Kafka UI 대시보드를 볼 수 있습니다. 토픽 생성, 메시지 조회, 컨슈머 그룹 상태를 시각적으로 모니터링할 수 있어 개발 중 매우 유용합니다.

💡 포트 충돌 시: 9092 포트가 이미 사용 중이라면 docker-compose.yml의 왼쪽 포트 번호를 19092로 변경하고, 이후 application.yml의 bootstrap-servers도 동일하게 수정하세요.


2. Spring Boot 프로젝트 설정 – 의존성과 application.yml

build.gradle 의존성 추가

Spring Initializr(start.spring.io)에서 프로젝트를 생성할 때 Spring for Apache Kafka 의존성을 선택하거나, 기존 프로젝트에 아래 의존성을 추가합니다.

groovy

// build.gradle
dependencies {
    // Spring Boot 기본
    implementation 'org.springframework.boot:spring-boot-starter-web'

    // Spring Kafka – Producer/Consumer 모두 포함
    implementation 'org.springframework.kafka:spring-kafka'

    // JSON 직렬화를 위한 Jackson (spring-boot-starter-web에 포함되나 명시 권장)
    implementation 'com.fasterxml.jackson.core:jackson-databind'

    // 테스트
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

Maven 사용자라면 pom.xml에 아래와 같이 추가합니다.

xml

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml 핵심 설정

yaml

# src/main/resources/application.yml
spring:
  kafka:
    # Kafka 브로커 주소 (Docker 기준)
    bootstrap-servers: localhost:9092

    producer:
      # 메시지 키 직렬화: String 타입 사용
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 메시지 값 직렬화: JSON 형식 (Object → JSON 문자열)
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 전송 실패 시 재시도 횟수
      retries: 3
      # acks=all: 모든 복제본이 저장 확인 후 응답 (데이터 유실 방지)
      acks: all

    consumer:
      # 컨슈머 그룹 ID (같은 그룹 내 컨슈머들이 파티션을 나눠서 처리)
      group-id: my-consumer-group
      # 컨슈머 그룹이 처음 시작될 때 어디서부터 읽을지 설정
      # earliest: 토픽의 맨 처음부터 / latest: 새 메시지부터
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        # JsonDeserializer가 역직렬화할 때 패키지 신뢰 설정 (* = 전체 허용)
        spring.json.trusted.packages: "*"

토픽 자동 생성 설정 (KafkaAdmin Bean)

운영 환경에서는 토픽을 미리 생성해두는 것이 원칙이지만, 개발 단계에서는 애플리케이션 시작 시 자동으로 토픽을 생성하도록 설정하는 것이 편리합니다.

java

// KafkaConfig.java
@Configuration
public class KafkaConfig {

    public static final String TOPIC_NAME = "order-events";

    /**
     * 토픽 자동 생성 Bean
     * - 파티션 수: 3 (처리량 확장을 위해 복수 설정)
     * - 복제 계수: 1 (로컬 단일 브로커 환경)
     */
    @Bean
    public NewTopic orderEventsTopic() {
        return TopicBuilder.name(TOPIC_NAME)
                .partitions(3)
                .replicas(1)
                .build();
    }
}

3. Kafka Producer 구현 – KafkaTemplate으로 메시지 전송

KafkaTemplate이란

KafkaTemplate은 Spring Kafka가 제공하는 핵심 클래스로, Kafka Producer의 복잡한 설정을 추상화하여 간단한 메서드 호출만으로 메시지를 전송할 수 있게 해줍니다. RestTemplate이 HTTP 요청을 추상화하듯, KafkaTemplate은 Kafka 메시지 전송을 추상화합니다.

메시지 DTO 정의

java

// OrderEvent.java
public class OrderEvent {

    private String orderId;
    private String productName;
    private int quantity;
    private String status;  // CREATED, PAID, SHIPPED, DELIVERED

    // 기본 생성자 필수 (Jackson 역직렬화에 필요)
    public OrderEvent() {}

    public OrderEvent(String orderId, String productName, int quantity, String status) {
        this.orderId = orderId;
        this.productName = productName;
        this.quantity = quantity;
        this.status = status;
    }

    // Getter / Setter
    public String getOrderId() { return orderId; }
    public void setOrderId(String orderId) { this.orderId = orderId; }
    public String getProductName() { return productName; }
    public void setProductName(String productName) { this.productName = productName; }
    public int getQuantity() { return quantity; }
    public void setQuantity(int quantity) { this.quantity = quantity; }
    public String getStatus() { return status; }
    public void setStatus(String status) { this.status = status; }

    @Override
    public String toString() {
        return "OrderEvent{orderId='" + orderId + "', product='" + productName
                + "', qty=" + quantity + ", status='" + status + "'}";
    }
}

Producer 서비스 구현

java

// OrderProducerService.java
@Service
@Slf4j  // Lombok 로그 어노테이션 (없으면 Logger 직접 선언)
public class OrderProducerService {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
    private static final String TOPIC = KafkaConfig.TOPIC_NAME;

    // 생성자 주입 (Spring 권장 방식)
    public OrderProducerService(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 주문 이벤트를 Kafka 토픽으로 전송
     * @param event 전송할 주문 이벤트 객체
     */
    public void sendOrderEvent(OrderEvent event) {
        // orderId를 메시지 키로 사용 → 같은 주문은 항상 같은 파티션으로 전달
        kafkaTemplate.send(TOPIC, event.getOrderId(), event)
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        // 전송 성공 시 파티션·오프셋 로깅
                        log.info("메시지 전송 성공 | topic={}, partition={}, offset={}",
                                result.getRecordMetadata().topic(),
                                result.getRecordMetadata().partition(),
                                result.getRecordMetadata().offset());
                    } else {
                        // 전송 실패 시 에러 로깅
                        log.error("메시지 전송 실패 | orderId={}, error={}",
                                event.getOrderId(), ex.getMessage());
                    }
                });
    }
}

REST API로 메시지 전송 트리거

java

// OrderController.java
@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderProducerService producerService;

    public OrderController(OrderProducerService producerService) {
        this.producerService = producerService;
    }

    /**
     * POST /api/orders/publish
     * Body: { "orderId": "ORD-001", "productName": "노트북", "quantity": 1, "status": "CREATED" }
     */
    @PostMapping("/publish")
    public ResponseEntity<String> publishOrder(@RequestBody OrderEvent event) {
        producerService.sendOrderEvent(event);
        return ResponseEntity.ok("이벤트 전송 완료: " + event.getOrderId());
    }
}

4. Kafka Consumer 구현 – @KafkaListener로 메시지 수신

@KafkaListener 기본 구조

@KafkaListener는 Spring Kafka에서 Consumer를 가장 간단하게 구현하는 방법입니다. 해당 어노테이션이 붙은 메서드는 지정한 토픽에 메시지가 도착하면 자동으로 호출됩니다. 내부적으로는 별도 스레드가 지속적으로 Kafka 브로커를 폴링(polling)하다가 메시지가 오면 메서드를 실행합니다.

java

// OrderConsumerService.java
@Service
@Slf4j
public class OrderConsumerService {

    /**
     * 기본 컨슈머: order-events 토픽 구독
     * - groupId: application.yml의 consumer.group-id 사용
     * - 메시지 수신 시 자동 호출
     */
    @KafkaListener(topics = KafkaConfig.TOPIC_NAME, groupId = "my-consumer-group")
    public void consumeOrderEvent(OrderEvent event) {
        log.info("메시지 수신 | orderId={}, product={}, status={}",
                event.getOrderId(), event.getProductName(), event.getStatus());

        // 실제 비즈니스 로직 처리
        processOrder(event);
    }

    private void processOrder(OrderEvent event) {
        // 주문 상태에 따른 처리 분기
        switch (event.getStatus()) {
            case "CREATED"  -> log.info("신규 주문 처리: {}", event.getOrderId());
            case "PAID"     -> log.info("결제 완료 처리: {}", event.getOrderId());
            case "SHIPPED"  -> log.info("배송 시작 처리: {}", event.getOrderId());
            default         -> log.warn("알 수 없는 상태: {}", event.getStatus());
        }
    }
}

ConsumerRecord로 메타데이터 함께 수신

메시지 내용뿐 아니라 파티션 번호, 오프셋, 수신 시각 등의 메타데이터가 필요하다면 ConsumerRecord<K, V> 타입으로 받을 수 있습니다.

java

/**
 * 메타데이터 포함 수신 – ConsumerRecord 활용
 */
@KafkaListener(topics = KafkaConfig.TOPIC_NAME, groupId = "audit-group")
public void consumeWithMetadata(ConsumerRecord&lt;String, OrderEvent> record) {
    log.info("=== 메시지 상세 ===");
    log.info("토픽     : {}", record.topic());
    log.info("파티션   : {}", record.partition());
    log.info("오프셋   : {}", record.offset());
    log.info("키       : {}", record.key());        // orderId
    log.info("수신 시각: {}", record.timestamp());
    log.info("값       : {}", record.value());      // OrderEvent 객체
}

복수 토픽 및 배치 수신

java

/**
 * 배치 수신 – 여러 메시지를 한 번에 처리 (처리량 최적화)
 * application.yml에 spring.kafka.listener.type: batch 추가 필요
 */
@KafkaListener(topics = KafkaConfig.TOPIC_NAME, groupId = "batch-group",
               containerFactory = "batchKafkaListenerContainerFactory")
public void consumeBatch(List&lt;OrderEvent> events) {
    log.info("배치 수신 건수: {}", events.size());
    events.forEach(event ->
            log.info("처리 중: {}", event.getOrderId())
    );
}

5. 실전 예제 – 주문 이벤트 파이프라인 구축

전체 아키텍처 흐름

지금까지 구현한 코드를 합치면 아래와 같은 이벤트 파이프라인이 완성됩니다.

[클라이언트]
    │
    │ POST /api/orders/publish
    ▼
[OrderController]
    │
    │ sendOrderEvent(event)
    ▼
[OrderProducerService]
    │
    │ kafkaTemplate.send("order-events", orderId, event)
    ▼
[Kafka Broker : order-events 토픽]
    │  Partition 0  │  Partition 1  │  Partition 2  │
    │  (orderId 해시 기반 파티션 배정)               │
    ▼
[OrderConsumerService]   [AuditConsumerService]
(my-consumer-group)      (audit-group)
    │                        │
비즈니스 로직 처리        감사 로그 기록

전체 실행 및 테스트

① 애플리케이션 실행

bash

./gradlew bootRun

② 메시지 전송 테스트 (curl)

bash

# 주문 생성 이벤트 전송
curl -X POST http://localhost:8080/api/orders/publish \
  -H "Content-Type: application/json" \
  -d '{
    "orderId": "ORD-2024-001",
    "productName": "MacBook Pro",
    "quantity": 1,
    "status": "CREATED"
  }'

# 결제 완료 이벤트 전송
curl -X POST http://localhost:8080/api/orders/publish \
  -H "Content-Type: application/json" \
  -d '{
    "orderId": "ORD-2024-001",
    "productName": "MacBook Pro",
    "quantity": 1,
    "status": "PAID"
  }'

③ 예상 콘솔 출력

[Producer] 메시지 전송 성공 | topic=order-events, partition=1, offset=0
[Consumer] 메시지 수신 | orderId=ORD-2024-001, product=MacBook Pro, status=CREATED
[Consumer] 신규 주문 처리: ORD-2024-001

[Producer] 메시지 전송 성공 | topic=order-events, partition=1, offset=1
[Consumer] 메시지 수신 | orderId=ORD-2024-001, product=MacBook Pro, status=PAID
[Consumer] 결제 완료 처리: ORD-2024-001

💡 같은 orderId는 항상 같은 파티션으로: 메시지 키(orderId)를 해시하여 파티션을 결정하므로, 동일 주문의 이벤트는 항상 같은 파티션에 저장됩니다. 이로써 파티션 내 순서 보장이 달성됩니다.

간단한 통합 테스트 작성

java

// OrderKafkaIntegrationTest.java
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {KafkaConfig.TOPIC_NAME})
class OrderKafkaIntegrationTest {

    @Autowired
    private OrderProducerService producerService;

    @Test
    void 주문_이벤트_전송_수신_테스트() throws InterruptedException {
        // Given
        OrderEvent event = new OrderEvent("TEST-001", "테스트 상품", 1, "CREATED");

        // When: 메시지 전송
        producerService.sendOrderEvent(event);

        // Then: Consumer가 수신할 시간 대기 (실제 환경에서는 CountDownLatch 사용 권장)
        Thread.sleep(2000);

        // 로그 또는 저장소에서 처리 완료 여부 검증
        // (실제 테스트에서는 CountDownLatch로 Consumer 처리 완료를 기다린 뒤 assert)
    }
}

6. 에러 핸들링과 운영 팁 – 안정적인 Kafka 연동 전략

Consumer 에러 핸들링 – DefaultErrorHandler

메시지 처리 중 예외가 발생했을 때의 동작을 명확하게 정의해야 합니다. Spring Kafka는 DefaultErrorHandler를 통해 재시도 횟수, 재시도 간격, Dead Letter Topic(DLT) 전송을 설정할 수 있습니다.

java

// KafkaErrorConfig.java
@Configuration
public class KafkaErrorConfig {

    /**
     * 에러 핸들러 설정
     * - 최대 3회 재시도 (1초 간격)
     * - 3회 실패 시 Dead Letter Topic으로 이동
     */
    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate&lt;String, Object> kafkaTemplate) {

        // Dead Letter Topic 발행자 설정
        DeadLetterPublishingRecoverer recoverer =
                new DeadLetterPublishingRecoverer(kafkaTemplate,
                        (record, ex) -> new TopicPartition(
                                record.topic() + ".DLT",  // 실패 메시지 → order-events.DLT 토픽
                                record.partition()
                        ));

        // 재시도 정책: 1초 간격으로 최대 3회
        FixedBackOff backOff = new FixedBackOff(1000L, 3L);

        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);

        // 재시도하지 않을 예외 등록 (데이터 파싱 오류는 재시도해도 무의미)
        handler.addNotRetryableExceptions(JsonProcessingException.class);

        return handler;
    }
}

운영 환경에서 반드시 확인할 설정 항목

항목개발 환경운영 환경 권장값
acks1all (데이터 유실 방지)
replication.factor13 (브로커 3대 이상)
auto.offset.resetearliestlatest (신규 메시지만)
enable.auto.committruefalse (수동 커밋 권장)
max.poll.records500처리량에 맞게 조정
session.timeout.ms1000030000 (긴 처리 작업 시)

자주 만나는 에러와 해결법

org.apache.kafka.common.errors.TimeoutException

원인: Kafka 브로커에 연결할 수 없음
해결: bootstrap-servers 주소 확인, Docker 컨테이너 실행 여부 확인
     → docker compose ps 로 kafka 컨테이너 상태 확인

ClassCastException 또는 역직렬화 오류

원인: JsonDeserializer가 신뢰하지 않는 패키지의 클래스 역직렬화 시도
해결: application.yml에 추가
     spring.kafka.consumer.properties.spring.json.trusted.packages: "*"

③ Consumer가 메시지를 수신하지 못함

원인 1: group-id가 이미 오프셋을 커밋한 상태 (auto-offset-reset: latest)
해결 1: 새로운 group-id 사용 또는 auto-offset-reset: earliest 설정

원인 2: 토픽이 존재하지 않음
해결 2: Kafka UI에서 토픽 존재 여부 확인 또는 NewTopic Bean 등록

결론

Spring Boot Kafka 연동의 핵심은 세 가지입니다. 첫째, application.yml에서 직렬화·역직렬화·컨슈머 그룹 설정을 정확히 맞춰야 하고, 둘째, KafkaTemplate으로 메시지를 전송하고 @KafkaListener로 수신하는 기본 패턴을 확실히 이해해야 합니다. 셋째, 운영 환경을 위한 에러 핸들링과 Dead Letter Topic 전략을 처음부터 함께 설계해야 합니다. 오늘 실습한 코드를 바탕으로 직접 토픽을 만들고 메시지를 주고받으며 파이프라인을 확장해 보세요.


⚠️ 면책 고지: 본 글은 기술 학습 목적으로 작성된 실습 가이드입니다. 예제 코드는 학습·개발 환경 기준으로 작성되었으며, 실제 운영 서비스에 적용 시 보안 설정(SSL/SASL), 고가용성 구성, 성능 튜닝 등 추가적인 검토가 반드시 필요합니다. 코드 적용으로 인한 장애·데이터 손실에 대해 본 블로그는 책임을 지지 않습니다.

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다