Spring WebFlux 리액티브 API 구축 커버 이미지

Spring WebFlux는 Spring 5에서 도입된 리액티브 웹 프레임워크로, 논블로킹(Non-blocking) I/O 기반의 고성능 API를 구축할 수 있게 해줍니다. 기존 Spring MVC의 동기 방식과 달리, WebFlux는 적은 스레드로 대량의 동시 요청을 처리할 수 있어 마이크로서비스 환경에서 특히 강력한 성능을 발휘합니다. 이 글에서는 Reactive Streams의 핵심 개념부터 실전 코드까지, WebFlux의 모든 것을 다뤄보겠습니다.

Reactive Streams란 무엇인가?

Reactive Streams는 비동기 스트림 처리를 위한 표준 명세로, 논블로킹 배압(Backpressure)을 갖춘 비동기 데이터 처리 파이프라인을 정의합니다. Java 9의 java.util.concurrent.Flow API로 표준화되었으며, Spring WebFlux는 이 명세의 구현체인 Project Reactor를 기반으로 동작합니다.

Reactive Streams 데이터 흐름 다이어그램

Reactive Streams의 4가지 핵심 인터페이스

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

public interface Subscription {
    void request(long n);
    void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
인터페이스 역할 설명
Publisher 데이터 생산자 구독자에게 데이터 스트림을 제공
Subscriber 데이터 소비자 데이터를 수신하고 처리
Subscription 구독 관계 요청량 제어 및 취소 관리
Processor 중간 처리자 Publisher + Subscriber 역할 동시 수행

Mono와 Flux: WebFlux의 핵심 타입

Project Reactor는 두 가지 핵심 Publisher 타입을 제공합니다.

Mono: 0~1개의 요소

Mono<T>는 최대 하나의 요소를 비동기적으로 반환합니다. 단일 결과를 반환하는 API에 적합합니다.

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable Long id) {
        return userService.findById(id)
                .map(ResponseEntity::ok)
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> createUser(@RequestBody Mono<User> userMono) {
        return userMono.flatMap(userService::save);
    }
}

Flux: 0~N개의 요소

Flux<T>는 여러 요소로 구성된 스트림을 비동기적으로 반환합니다. 컬렉션 데이터나 실시간 스트리밍에 적합합니다.

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
    return userService.findAll()
            .delayElements(Duration.ofMillis(100));
}

@GetMapping("/search")
public Flux<User> searchUsers(@RequestParam String keyword) {
    return userService.findByNameContaining(keyword)
            .filter(user -> user.isActive())
            .take(20);
}

Mono/Flux 주요 연산자 비교

연산자 Mono Flux 설명
map 동기 변환
flatMap 비동기 변환
filter 조건 필터링
zip 여러 소스 결합
switchIfEmpty 빈 경우 대체
collectList Flux → Mono 변환
reduce 누적 연산
window 청크 분할

WebClient: 논블로킹 HTTP 클라이언트

WebFlux 환경에서 외부 API를 호출할 때는 기존의 RestTemplate 대신 논블로킹 WebClient를 사용해야 합니다. RestTemplate은 내부적으로 블로킹 I/O를 사용하므로 리액티브 파이프라인의 이점을 상쇄시킵니다.

@Service
public class ExternalApiService {

    private final WebClient webClient;

    public ExternalApiService(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder
                .baseUrl("https://api.example.com")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .filter(ExchangeFilterFunction.ofRequestProcessor(request -> {
                    log.debug("Request: {} {}", request.method(), request.url());
                    return Mono.just(request);
                }))
                .build();
    }

    public Mono<ProductResponse> getProduct(String productId) {
        return webClient.get()
                .uri("/products/{id}", productId)
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError,
                    response -> Mono.error(new ProductNotFoundException(productId)))
                .onStatus(HttpStatusCode::is5xxServerError,
                    response -> Mono.error(new ExternalServiceException("외부 API 서버 오류")))
                .bodyToMono(ProductResponse.class)
                .timeout(Duration.ofSeconds(5))
                .retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
    }

    public Flux<OrderResponse> getOrders(String userId) {
        return webClient.get()
                .uri(uriBuilder -> uriBuilder
                        .path("/orders")
                        .queryParam("userId", userId)
                        .queryParam("status", "ACTIVE")
                        .build())
                .retrieve()
                .bodyToFlux(OrderResponse.class);
    }
}

여러 API를 병렬로 호출하기

public Mono<UserDashboard> getUserDashboard(String userId) {
    Mono<UserProfile> profileMono = getProfile(userId);
    Mono<List<Order>> ordersMono = getOrders(userId).collectList();
    Mono<List<Notification>> notificationsMono = getNotifications(userId).collectList();

    return Mono.zip(profileMono, ordersMono, notificationsMono)
            .map(tuple -> UserDashboard.builder()
                    .profile(tuple.getT1())
                    .orders(tuple.getT2())
                    .notifications(tuple.getT3())
                    .build());
}

R2DBC: 리액티브 데이터베이스 접근

R2DBC(Reactive Relational Database Connectivity)는 관계형 데이터베이스에 논블로킹으로 접근하기 위한 명세입니다. JDBC가 블로킹 방식인 반면, R2DBC는 리액티브 스트림 기반으로 동작합니다.

의존성 설정

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    runtimeOnly 'io.r2dbc:r2dbc-postgresql'
    // 또는 MySQL: runtimeOnly 'io.asyncer:r2dbc-mysql'
}

R2DBC Repository 구현

@Table("users")
public class User {
    @Id
    private Long id;
    private String name;
    private String email;
    private boolean active;
    private LocalDateTime createdAt;
    // getter, setter 생략
}

public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Flux<User> findByActiveTrue();

    @Query("SELECT * FROM users WHERE name LIKE :keyword ORDER BY created_at DESC")
    Flux<User> findByNameContaining(String keyword);

    @Query("SELECT COUNT(*) FROM users WHERE active = true")
    Mono<Long> countActiveUsers();
}

트랜잭션 처리

@Service
public class UserService {

    private final UserRepository userRepository;
    private final AuditLogRepository auditLogRepository;
    private final TransactionalOperator transactionalOperator;

    public Mono<User> createUserWithAudit(User user) {
        return userRepository.save(user)
                .flatMap(savedUser -> {
                    AuditLog log = AuditLog.of("USER_CREATED", savedUser.getId());
                    return auditLogRepository.save(log)
                            .thenReturn(savedUser);
                })
                .as(transactionalOperator::transactional);
    }
}

배압(Backpressure)이란 무엇인가?

배압(Backpressure)은 리액티브 프로그래밍에서 가장 중요한 개념 중 하나입니다. 데이터 생산자가 소비자보다 빠르게 데이터를 생성할 때, 소비자가 처리 가능한 만큼만 데이터를 요청하는 메커니즘입니다.

배압이 없을 때의 문제

Producer: 1000 items/sec → Consumer: 100 items/sec
→ 메모리 초과 (OutOfMemoryError)
→ 시스템 장애 발생

배압 전략

// 1. buffer: 초과 데이터를 버퍼에 저장
Flux.range(1, 1000)
    .onBackpressureBuffer(256)
    .subscribe(item -> processSlowly(item));

// 2. drop: 초과 데이터를 버림
Flux.range(1, 1000)
    .onBackpressureDrop(dropped -> log.warn("Dropped: {}", dropped))
    .subscribe(item -> processSlowly(item));

// 3. latest: 최신 데이터만 유지
Flux.range(1, 1000)
    .onBackpressureLatest()
    .subscribe(item -> processSlowly(item));

// 4. error: 초과 시 에러 발생
Flux.range(1, 1000)
    .onBackpressureError()
    .subscribe(
        item -> processSlowly(item),
        error -> log.error("Backpressure overflow", error)
    );

실전 배압 적용: 대용량 파일 스트리밍

@GetMapping(value = "/export/users", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<User> exportUsers() {
    return userRepository.findAll()
            .onBackpressureBuffer(512,
                dropped -> log.warn("Export buffer overflow"),
                BufferOverflowStrategy.DROP_OLDEST)
            .limitRate(100);  // 한 번에 100개씩만 요청
}

성능 비교: Spring MVC vs Spring WebFlux

성능 비교 벤치마크 결과

벤치마크 환경 구성

동일한 API를 Spring MVC와 WebFlux로 각각 구현하여 성능을 비교합니다. 외부 API 호출(500ms 지연)을 시뮬레이션하는 시나리오입니다.

// Spring MVC 방식
@RestController
public class MvcController {
    @GetMapping("/api/data")
    public ResponseEntity<Data> getData() {
        // 블로킹: 스레드가 500ms 동안 대기
        Data result = restTemplate.getForObject("https://api.example.com/data", Data.class);
        return ResponseEntity.ok(result);
    }
}

// Spring WebFlux 방식
@RestController
public class WebFluxController {
    @GetMapping("/api/data")
    public Mono<ResponseEntity<Data>> getData() {
        // 논블로킹: 스레드 반환 후 콜백으로 처리
        return webClient.get()
                .uri("https://api.example.com/data")
                .retrieve()
                .bodyToMono(Data.class)
                .map(ResponseEntity::ok);
    }
}

벤치마크 결과 (1,000 동시 요청 기준)

항목 Spring MVC Spring WebFlux
평균 응답 시간 2,340ms 520ms
최대 처리량 (RPS) ~420 ~1,900
메모리 사용량 ~512MB ~256MB
스레드 수 200 (Tomcat 기본) 4 (CPU 코어 수)
에러율 (5,000 동시) 12.3% 0.1%

어떤 경우에 WebFlux를 선택해야 하는가?

WebFlux가 적합한 경우:

  • 외부 API 호출이 많은 마이크로서비스
  • 실시간 스트리밍 (SSE, WebSocket)
  • 높은 동시 접속을 처리해야 하는 API 게이트웨이
  • I/O 바운드 작업이 대부분인 서비스

Spring MVC가 여전히 적합한 경우:

  • CPU 바운드 작업이 많은 서비스
  • 블로킹 라이브러리 의존성이 많은 경우 (JPA, JDBC)
  • 팀의 리액티브 경험이 부족한 경우
  • 단순 CRUD 위주의 서비스

실전 프로젝트: 리액티브 REST API 전체 구성

@Configuration
public class WebFluxConfig {

    @Bean
    public RouterFunction<ServerResponse> routes(UserHandler handler) {
        return RouterFunctions.route()
                .path("/api/v2/users", builder -> builder
                        .GET("", handler::findAll)
                        .GET("/{id}", handler::findById)
                        .POST("", handler::create)
                        .PUT("/{id}", handler::update)
                        .DELETE("/{id}", handler::delete))
                .build();
    }
}

@Component
public class UserHandler {

    private final UserService userService;
    private final Validator validator;

    public Mono<ServerResponse> findAll(ServerRequest request) {
        int page = request.queryParam("page").map(Integer::parseInt).orElse(0);
        int size = request.queryParam("size").map(Integer::parseInt).orElse(20);

        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userService.findAllPaged(page, size), User.class);
    }

    public Mono<ServerResponse> create(ServerRequest request) {
        return request.bodyToMono(UserCreateRequest.class)
                .doOnNext(this::validate)
                .flatMap(userService::create)
                .flatMap(user -> ServerResponse
                        .created(URI.create("/api/v2/users/" + user.getId()))
                        .bodyValue(user))
                .onErrorResume(ValidationException.class,
                    e -> ServerResponse.badRequest().bodyValue(e.getMessage()));
    }

    private void validate(UserCreateRequest request) {
        Errors errors = new BeanPropertyBindingResult(request, "request");
        validator.validate(request, errors);
        if (errors.hasErrors()) {
            throw new ValidationException(errors.getAllErrors().toString());
        }
    }
}

전역 에러 핸들링

@Component
@Order(-2)
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {

    public GlobalErrorWebExceptionHandler(
            ErrorAttributes errorAttributes,
            WebProperties webProperties,
            ApplicationContext applicationContext) {
        super(errorAttributes, webProperties.getResources(), applicationContext);
    }

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
        return RouterFunctions.route(RequestPredicates.all(), this::renderError);
    }

    private Mono<ServerResponse> renderError(ServerRequest request) {
        Throwable error = getError(request);
        HttpStatus status = determineStatus(error);

        Map<String, Object> body = Map.of(
            "status", status.value(),
            "error", status.getReasonPhrase(),
            "message", error.getMessage(),
            "timestamp", LocalDateTime.now()
        );

        return ServerResponse.status(status)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(body);
    }
}

마무리

Spring WebFlux는 높은 동시성과 효율적인 리소스 사용이 필요한 현대 마이크로서비스에서 강력한 선택지입니다. 특히 Reactive Streams 기반의 배압 메커니즘은 시스템 안정성을 크게 향상시켜 줍니다. 다만 리액티브 프로그래밍의 학습 곡선이 가파르고, 디버깅이 복잡해질 수 있으므로 팀의 역량과 프로젝트 요구사항을 신중하게 고려한 뒤 도입을 결정하시기 바랍니다.

핵심 정리:

  • Reactive Streams 표준으로 논블로킹 비동기 처리를 구현
  • Mono/Flux로 단일/다중 데이터 스트림을 선언적으로 처리
  • WebClient로 외부 API를 논블로킹으로 호출
  • R2DBC로 데이터베이스까지 완전한 논블로킹 파이프라인 구축
  • 배압(Backpressure)으로 시스템 과부하 방지
  • I/O 바운드 서비스에서 MVC 대비 3~5배 이상의 처리량 달성 가능

참고 자료