From af4ddaeb34ab56215c5437fddba22e0c5eb27ef9 Mon Sep 17 00:00:00 2001 From: k0l0ssus Date: Mon, 6 Aug 2018 21:20:02 -0400 Subject: [PATCH] Add files via upload --- jersey-client-rx/pom.xml | 21 +++++ .../samples/jerseyrx/ClientOrchestration.java | 94 +++++++++++-------- .../samples/jerseyrx/EmployeeDTO.java | 21 +++++ .../jerseyrx/ClientOrchestrationTest.java | 67 +++++++++++++ 4 files changed, 162 insertions(+), 41 deletions(-) create mode 100644 jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java create mode 100644 jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml index cc5f28c938..fb7494fab1 100644 --- a/jersey-client-rx/pom.xml +++ b/jersey-client-rx/pom.xml @@ -26,6 +26,27 @@ jersey-rx-client-rxjava2 2.27 + + com.github.tomakehurst + wiremock + 1.58 + test + + + org.junit.vintage + junit-vintage-engine + 5.2.0 + + + org.glassfish.jersey.media + jersey-media-json-jackson + 2.22 + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + 2.4.1 + UTF-8 diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java index 2b5c6bf965..5c145ca5d9 100644 --- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java @@ -11,12 +11,12 @@ import java.util.logging.Level; import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider; import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker; import java.util.logging.Logger; -import java.util.stream.Collectors; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker; import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider; import rx.Observable; @@ -28,7 +28,7 @@ import rx.Observable; public class ClientOrchestration { Client client = ClientBuilder.newClient(); - WebTarget userIdService = client.target("http://localhost:8080/serviceA/id?limit=10"); + WebTarget userIdService = client.target("http://localhost:8080/serviceA/id"); WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name"); WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash"); @@ -46,30 +46,33 @@ public class ClientOrchestration { public void callBackOrchestrate() { logger.info("Orchestrating with the pyramid of doom"); - userIdService.request() + userIdService.request().accept(MediaType.APPLICATION_JSON) .async() - .get(new InvocationCallback>() { + .get(new InvocationCallback() { @Override - public void completed(List empIds) { + public void completed(EmployeeDTO empIdList) { + logger.info("[InvocationCallback] Got all the IDs " + empIdList.getEmpIds()); + List empIds = empIdList.getEmpIds(); CountDownLatch completionTracker = new CountDownLatch(empIds.size()); //used to keep track of the progress of the subsequent calls empIds.forEach((id) -> { //for each employee ID, get the name nameService.resolveTemplate("empId", id).request() .async() .get(new InvocationCallback() { + @Override public void completed(String response) { completionTracker.countDown(); - hashService.request().async().get(new InvocationCallback() { + hashService.resolveTemplate("comboIDandName", response + id).request().async().get(new InvocationCallback() { @Override public void completed(String response) { - logger.log(Level.INFO, "The hash output {0}", response); + logger.log(Level.INFO, "[InvocationCallback] The hash output {0}", response); } @Override public void failed(Throwable throwable) { completionTracker.countDown(); - logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage()); + logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the hashing request step {0}", throwable.getMessage()); } }); } @@ -77,14 +80,14 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { completionTracker.countDown(); - logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage()); + logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the username request step {0}", throwable.getMessage()); } }); }); try { if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds - logger.warning("Some requests didn't complete within the timeout"); + logger.warning("[InvocationCallback] Some requests didn't complete within the timeout"); } } catch (InterruptedException ex) { Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex); @@ -94,24 +97,25 @@ public class ClientOrchestration { @Override public void failed(Throwable throwable) { - //implement callback + logger.warning("Couldn't get the list of IDs"); } }); } public void rxOrchestrate() { logger.info("Orchestrating with a CompletionStage"); - CompletionStage> userIdStage = userIdService.request() + CompletionStage userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON) .rx() - .get(new GenericType>() { + .get(new GenericType() { }) .exceptionally((Throwable throwable) -> { - logger.warning("An error has occurred"); + logger.warning("[CompletionStage] An error has occurred"); return null; }); - userIdStage.thenAcceptAsync(listOfIds -> { - listOfIds.stream().forEach((Long id) -> { + userIdStage.thenAcceptAsync(empIdDto -> { + logger.info("[CompletionStage] Got all the IDs " + empIdDto.getEmpIds()); + empIdDto.getEmpIds().stream().forEach((Long id) -> { CompletableFuture completable = nameService.resolveTemplate("empId", id) .request() .rx() @@ -124,9 +128,9 @@ public class ClientOrchestration { .rx() .get(String.class) .toCompletableFuture() - .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)) + .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "[CompletionFuture] The hash output {0}", hashValue)) .exceptionally((Throwable throwable) -> { - logger.log(Level.WARNING, "Hash computation failed for {0}", id); + logger.log(Level.WARNING, "[CompletionStage] Hash computation failed for {0}", id); return null; }); @@ -140,53 +144,61 @@ public class ClientOrchestration { public void observableJavaOrchestrate() { logger.info("Orchestrating with Observables"); - Observable> userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request() + Observable userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request() + .accept(MediaType.APPLICATION_JSON) .rx(RxObservableInvoker.class) - .get(new GenericType>() { + .get(new GenericType() { }); - userIdObservable.subscribe((List listOfIds) -> { - Observable.from(listOfIds).map(id - -> nameService.resolveTemplate("empId", id) + userIdObservable.subscribe((EmployeeDTO empIdList) -> { + logger.info("[Observable] Got all the IDs " + empIdList.getEmpIds()); + Observable.from(empIdList.getEmpIds()).subscribe(id + -> nameService.register(RxObservableInvokerProvider.class) + .resolveTemplate("empId", id) .request() .rx(RxObservableInvoker.class) .get(String.class) .asObservable() //gotten the name for the given empId - .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage())) - .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id) + .doOnError(throwable -> logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage())) + .subscribe(userName -> hashService + .register(RxObservableInvokerProvider.class) + .resolveTemplate("comboIDandName", userName + id) .request() .rx(RxObservableInvoker.class) .get(String.class) .asObservable() //gotten the hash value for empId+username - .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage())) - .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)))); + .doOnError(throwable -> logger.log(Level.WARNING, " [Observable]An error has occurred in the hashing request step {0}", throwable.getMessage())) + .subscribe(hashValue -> logger.log(Level.INFO, "[Observable] The hash output {0}", hashValue)))); }); } + public void flowableJavaOrchestrate() { logger.info("Orchestrating with Flowable"); - Flowable> userIdObservable = userIdService.register(RxFlowableInvokerProvider.class).request() + Flowable userIdObservable = userIdService.register(RxFlowableInvokerProvider.class) + .request() .rx(RxFlowableInvoker.class) - .get(new GenericType>() { + .get(new GenericType() { }); - Disposable subscribe = userIdObservable.subscribe((List listOfIds) -> { + Disposable subscribe = userIdObservable.subscribe((EmployeeDTO dto) -> { + List listOfIds = dto.getEmpIds(); Observable.from(listOfIds).map(id - -> nameService.resolveTemplate("empId", id) + -> nameService.register(RxFlowableInvokerProvider.class) + .resolveTemplate("empId", id) .request() - .rx(RxObservableInvoker.class) - .get(String.class) - .asObservable() //gotten the name for the given empId - .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage())) - .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id) + .rx(RxFlowableInvoker.class) + .get(String.class) //gotten the name for the given empId + .doOnError(throwable -> logger.log(Level.WARNING, "[Flowable] An error has occurred in the username request step {0}", throwable.getMessage())) + .subscribe(userName -> hashService.register(RxFlowableInvokerProvider.class) + .resolveTemplate("comboIDandName", userName + id) .request() - .rx(RxObservableInvoker.class) - .get(String.class) - .asObservable() //gotten the hash value for empId+username - .doOnError(throwable -> logger.warning("An error has occurred in the hashing request step " + throwable.getMessage())) - .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue)))); + .rx(RxFlowableInvoker.class) + .get(String.class) //gotten the hash value for empId+username + .doOnError(throwable -> logger.warning(" [Flowable] An error has occurred in the hashing request step " + throwable.getMessage())) + .subscribe(hashValue -> logger.log(Level.INFO, "[Flowable] The hash output {0}", hashValue)))); }); } diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java new file mode 100644 index 0000000000..ab3cfb54a2 --- /dev/null +++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java @@ -0,0 +1,21 @@ +package com.baeldung.samples.jerseyrx; + +import java.util.List; + +/** + * + * @author SIGINT-X + */ +public class EmployeeDTO { + + private List empIds; + + public List getEmpIds() { + return empIds; + } + + public void setEmpIds(List empIds) { + this.empIds = empIds; + } + +} diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java new file mode 100644 index 0000000000..4286e192c0 --- /dev/null +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java @@ -0,0 +1,67 @@ +package com.baeldung.samples.jerseyrx; + +import com.github.tomakehurst.wiremock.WireMockServer; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.configureFor; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * + * @author baeldung + */ +public class ClientOrchestrationTest { + + ClientOrchestration orchestrator = new ClientOrchestration(); + + String jsonIdList = "{\"empIds\":[1,2,3,4,5,6]}"; + + String[] nameList = new String[]{"n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"}; + + String[] hashResultList = new String[]{"roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"}; + + WireMockServer wireMockServer = new WireMockServer(); + + @Before + public void setup() { + wireMockServer.start(); + configureFor("localhost", 8080); + stubFor(get(urlEqualTo("/serviceA/id")).willReturn(aResponse().withBody(jsonIdList).withHeader("Content-Type", "application/json"))); + + stubFor(get(urlEqualTo("/serviceA/1/name")).willReturn(aResponse().withBody(nameList[1]))); + stubFor(get(urlEqualTo("/serviceA/2/name")).willReturn(aResponse().withBody(nameList[2]))); + stubFor(get(urlEqualTo("/serviceA/3/name")).willReturn(aResponse().withBody(nameList[3]))); + stubFor(get(urlEqualTo("/serviceA/4/name")).willReturn(aResponse().withBody(nameList[4]))); + stubFor(get(urlEqualTo("/serviceA/5/name")).willReturn(aResponse().withBody(nameList[5]))); + stubFor(get(urlEqualTo("/serviceA/6/name")).willReturn(aResponse().withBody(nameList[6]))); + + stubFor(get(urlEqualTo("/serviceA/Thor1/hash")).willReturn(aResponse().withBody(hashResultList[0]))); + stubFor(get(urlEqualTo("/serviceA/Hulk2/hash")).willReturn(aResponse().withBody(hashResultList[1]))); + stubFor(get(urlEqualTo("/serviceA/BlackWidow3/hash")).willReturn(aResponse().withBody(hashResultList[2]))); + stubFor(get(urlEqualTo("/serviceA/BlackPanther4/hash")).willReturn(aResponse().withBody(hashResultList[3]))); + stubFor(get(urlEqualTo("/serviceA/TheTick5/hash")).willReturn(aResponse().withBody(hashResultList[4]))); + stubFor(get(urlEqualTo("/serviceA/Hawkeye6/hash")).willReturn(aResponse().withBody(hashResultList[5]))); + + } + + @Test + public void hits() { + + orchestrator.callBackOrchestrate(); + orchestrator.rxOrchestrate(); + orchestrator.observableJavaOrchestrate(); + orchestrator.flowableJavaOrchestrate(); + + } + + @After + public void tearDown() { + + } + +}