Why Reactive?
Traditional blocking code waits for each operation: read database, wait, read file, wait, call API, wait. With reactive programming, you don't wait - you describe what to do when data arrives.
// BLOCKING: Thread waits for each operation
User user = userRepository.findById(id); // Wait...
List<Order> orders = orderService.getOrders(user); // Wait...
sendEmail(user); // Wait...
return orders;
// REACTIVE: Non-blocking, events trigger next step
return userRepository.findById(id) // Returns Mono<User>
.flatMap(user -> orderService.getOrders(user)) // When user arrives
.doOnNext(orders -> sendEmail(user)) // When orders arrive
.subscribe(); // Start the flow
Better Resource Usage
Handle thousands of connections with fewer threads. No waiting = no wasted resources.
Backpressure
Control data flow. If consumer is slow, tell producer to slow down.
Resilience
Built-in error handling, retries, fallbacks, and timeouts.
Mono and Flux: Core Types
// Mono: 0 or 1 element
Mono<User> user = Mono.just(new User("John"));
Mono<User> empty = Mono.empty();
Mono<User> error = Mono.error(new RuntimeException("Failed"));
// Flux: 0 to N elements
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<String> names = Flux.fromIterable(List.of("John", "Jane"));
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // Infinite!
// Creating from async sources
Mono<User> fromFuture = Mono.fromFuture(asyncOperation());
Mono<User> fromCallable = Mono.fromCallable(() -> fetchUser());
Common Operators
// Transform
Flux.just(1, 2, 3)
.map(n -> n * 2) // [2, 4, 6]
.filter(n -> n > 3) // [4, 6]
.subscribe(System.out::println);
// FlatMap: Async transformations
userService.findById(id)
.flatMap(user -> orderService.findByUser(user)) // Returns Flux
.subscribe();
// Combine
Mono<User> user = userService.findById(id);
Mono<Profile> profile = profileService.findById(id);
Mono.zip(user, profile)
.map(tuple -> new UserWithProfile(tuple.getT1(), tuple.getT2()));
// Error handling
Mono.just(value)
.onErrorReturn("default") // Return default on error
.onErrorResume(e -> fallbackMono()) // Switch to fallback
.retry(3) // Retry 3 times
.timeout(Duration.ofSeconds(5)) // Timeout after 5s
.subscribe();
Spring WebFlux
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Reactive Controller
@RestController
@RequestMapping("/api/users")
public class UserController {
@Autowired
private UserService userService;
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
}
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findById(id);
}
@PostMapping
public Mono<User> createUser(@RequestBody Mono<User> user) {
return user.flatMap(userService::save);
}
// Server-Sent Events (streaming)
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.findAll()
.delayElements(Duration.ofSeconds(1));
}
}
Reactive Repository
// With R2DBC (Reactive Database)
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByAgeGreaterThan(int age);
Mono<User> findByEmail(String email);
}
// With MongoDB
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
Flux<Product> findByCategory(String category);
}
WebClient: Reactive HTTP
@Service
public class ApiClient {
private final WebClient webClient = WebClient.create("https://api.example.com");
public Mono<User> getUser(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> Mono.empty());
}
public Flux<Product> getProducts() {
return webClient.get()
.uri("/products")
.retrieve()
.bodyToFlux(Product.class);
}
public Mono<User> createUser(User user) {
return webClient.post()
.uri("/users")
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
}
When to Use Reactive
Good Fit:
- High concurrency (many simultaneous connections)
- Streaming data (real-time feeds, SSE)
- Microservices calling multiple external APIs
- I/O bound applications
Not Ideal:
- CPU-bound tasks (heavy computation)
- Simple CRUD with blocking database
- Team unfamiliar with reactive patterns