From ba18865ff3aa6a2906c5188e0e72581e41348486 Mon Sep 17 00:00:00 2001 From: Tapan Avasthi Date: Wed, 27 Jul 2022 05:51:31 +0530 Subject: [PATCH] BAEL-4840: Read Flux into a single InputStream (#12500) Co-authored-by: Tapan Avasthi --- .../databuffer/DataBufferToInputStream.java | 94 +++++++++++++++++++ .../DataBufferToInputStreamUnitTest.java | 77 +++++++++++++++ .../src/test/resources/user-response.json | 72 ++++++++++++++ 3 files changed, 243 insertions(+) create mode 100644 spring-5-reactive-modules/spring-5-reactive-3/src/main/java/com/baeldung/databuffer/DataBufferToInputStream.java create mode 100644 spring-5-reactive-modules/spring-5-reactive-3/src/test/java/databuffer/DataBufferToInputStreamUnitTest.java create mode 100644 spring-5-reactive-modules/spring-5-reactive-3/src/test/resources/user-response.json diff --git a/spring-5-reactive-modules/spring-5-reactive-3/src/main/java/com/baeldung/databuffer/DataBufferToInputStream.java b/spring-5-reactive-modules/spring-5-reactive-3/src/main/java/com/baeldung/databuffer/DataBufferToInputStream.java new file mode 100644 index 0000000000..82f0658f51 --- /dev/null +++ b/spring-5-reactive-modules/spring-5-reactive-3/src/main/java/com/baeldung/databuffer/DataBufferToInputStream.java @@ -0,0 +1,94 @@ +package com.baeldung.databuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.web.reactive.function.BodyExtractors; +import org.springframework.web.reactive.function.client.WebClient; + +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; + +public class DataBufferToInputStream { + private static final Logger logger = LoggerFactory.getLogger(DataBufferToInputStream.class); + private static final String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users"; + + private static WebClient getWebClient() { + WebClient.Builder webClientBuilder = WebClient.builder(); + return webClientBuilder.build(); + } + + public static InputStream getResponseAsInputStream(WebClient client, String url) throws IOException, InterruptedException { + + PipedOutputStream pipedOutputStream = new PipedOutputStream(); + PipedInputStream pipedInputStream = new PipedInputStream(1024 * 10); + pipedInputStream.connect(pipedOutputStream); + + Flux body = client.get() + .uri(url) + .exchangeToFlux(clientResponse -> { + return clientResponse.body(BodyExtractors.toDataBuffers()); + }) + .doOnError(error -> { + logger.error("error occurred while reading body", error); + }) + .doFinally(s -> { + try { + pipedOutputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .doOnCancel(() -> { + logger.error("Get request is cancelled"); + }); + + DataBufferUtils.write(body, pipedOutputStream) + .log("Writing to output buffer") + .subscribe(); + return pipedInputStream; + } + + private static String readContentFromPipedInputStream(PipedInputStream stream) throws IOException { + StringBuffer contentStringBuffer = new StringBuffer(); + try { + Thread pipeReader = new Thread(() -> { + try { + contentStringBuffer.append(readContent(stream)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + pipeReader.start(); + pipeReader.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + stream.close(); + } + + return String.valueOf(contentStringBuffer); + } + + private static String readContent(InputStream stream) throws IOException { + StringBuffer contentStringBuffer = new StringBuffer(); + byte[] tmp = new byte[stream.available()]; + int byteCount = stream.read(tmp, 0, tmp.length); + logger.info(String.format("read %d bytes from the stream\n", byteCount)); + contentStringBuffer.append(new String(tmp)); + return String.valueOf(contentStringBuffer); + } + + public static void main(String[] args) throws IOException, InterruptedException { + WebClient webClient = getWebClient(); + InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT); + Thread.sleep(3000); + String content = readContentFromPipedInputStream((PipedInputStream) inputStream); + logger.info("response content: \n{}", content.replace("}", "}\n")); + } +} diff --git a/spring-5-reactive-modules/spring-5-reactive-3/src/test/java/databuffer/DataBufferToInputStreamUnitTest.java b/spring-5-reactive-modules/spring-5-reactive-3/src/test/java/databuffer/DataBufferToInputStreamUnitTest.java new file mode 100644 index 0000000000..b885919bbb --- /dev/null +++ b/spring-5-reactive-modules/spring-5-reactive-3/src/test/java/databuffer/DataBufferToInputStreamUnitTest.java @@ -0,0 +1,77 @@ +package databuffer; + +import com.baeldung.databuffer.DataBufferToInputStream; + +import io.restassured.internal.util.IOUtils; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFunction; +import org.springframework.web.reactive.function.client.WebClient; + +import reactor.core.publisher.Mono; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +class DataBufferToInputStreamUnitTest { + private String getResponseStub() throws IOException { + InputStream inputStream = null; + BufferedReader reader = null; + String content = null; + try { + inputStream = this.getClass() + .getClassLoader() + .getResourceAsStream("user-response.json"); + if (inputStream != null) { + reader = new BufferedReader(new InputStreamReader(inputStream)); + content = reader.lines() + .collect(Collectors.joining(System.lineSeparator())); + } + } catch (Exception ex) { + throw new RuntimeException("exception caught while getting response stub"); + } finally { + reader.close(); + inputStream.close(); + } + return content; + } + + private InputStream getResponseStubAsInputStream() { + return this.getClass() + .getClassLoader() + .getResourceAsStream("user-response.json"); + } + + private WebClient getMockWebClient() throws IOException { + String content = getResponseStub(); + ClientResponse clientResponse = ClientResponse.create(HttpStatus.OK) + .header("Content-Type", "application/json") + .body(content) + .build(); + + ExchangeFunction exchangeFunction = clientRequest -> Mono.just(clientResponse); + + WebClient.Builder webClientBuilder = WebClient.builder() + .exchangeFunction(exchangeFunction); + WebClient webClient = webClientBuilder.build(); + return webClient; + } + + @Test + public void testResponseAsInputStream() throws IOException, InterruptedException { + String mockUrl = Mockito.anyString(); + WebClient mockWebClient = getMockWebClient(); + InputStream inputStream = DataBufferToInputStream.getResponseAsInputStream(mockWebClient, mockUrl); + byte[] expectedBytes = IOUtils.toByteArray(getResponseStubAsInputStream()); + byte[] actualBytes = IOUtils.toByteArray(inputStream); + assertArrayEquals(expectedBytes, actualBytes); + } +} \ No newline at end of file diff --git a/spring-5-reactive-modules/spring-5-reactive-3/src/test/resources/user-response.json b/spring-5-reactive-modules/spring-5-reactive-3/src/test/resources/user-response.json new file mode 100644 index 0000000000..33c03dc492 --- /dev/null +++ b/spring-5-reactive-modules/spring-5-reactive-3/src/test/resources/user-response.json @@ -0,0 +1,72 @@ +[ + { + "id": 2683, + "name": "Maheswar Kocchar", + "email": "maheswar_kocchar@kihn.info", + "gender": "male", + "status": "active" + }, + { + "id": 2680, + "name": "Lakshminath Khan", + "email": "lakshminath_khan@barrows-cormier.biz", + "gender": "female", + "status": "inactive" + }, + { + "id": 2679, + "name": "Tarun Arora", + "email": "tarun_arora@rolfson.net", + "gender": "female", + "status": "inactive" + }, + { + "id": 2678, + "name": "Agnivesh Dubashi", + "email": "dubashi_agnivesh@senger.name", + "gender": "male", + "status": "inactive" + }, + { + "id": 2677, + "name": "Dhanu Gowda", + "email": "gowda_dhanu@hayes.org", + "gender": "male", + "status": "active" + }, + { + "id": 2675, + "name": "Harinakshi Pilla Jr.", + "email": "pilla_jr_harinakshi@rutherford-monahan.com", + "gender": "female", + "status": "inactive" + }, + { + "id": 2673, + "name": "Kalpana Prajapat", + "email": "prajapat_kalpana@wilkinson-schaefer.net", + "gender": "female", + "status": "active" + }, + { + "id": 2672, + "name": "Chakradhar Jha", + "email": "jha_chakradhar@baumbach.info", + "gender": "male", + "status": "active" + }, + { + "id": 2670, + "name": "Divaakar Deshpande Jr.", + "email": "deshpande_jr_divaakar@mertz.info", + "gender": "female", + "status": "inactive" + }, + { + "id": 2669, + "name": "Prasanna Mehra", + "email": "prasanna_mehra@ruecker-larkin.name", + "gender": "female", + "status": "active" + } +] \ No newline at end of file