1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package cn.idea360.data.gateway.flux;
import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux;
import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch;
@Slf4j class FluxTest {
@DisplayName("WebClient请求stream") @Test void webClient() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); WebClient client = WebClient.create("http://localhost:2033"); Flux<String> result = client .get() .uri(uriBuilder -> uriBuilder .path("/api/flux/sink") .queryParam("startTime", "2024-08-16 13:00:00") .queryParam("endTime", "2024-08-16 13:59:59") .build()) .acceptCharset(StandardCharsets.UTF_8) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class); result .doOnComplete(() -> { log.info("Data fetching completed"); countDownLatch.countDown(); }) .subscribe(log::info, error -> log.error("Error occurred", error), () -> log.info("Subscription closed")); countDownLatch.await(); } }
|