diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java index 10cdab7c7a..88a8d67a7d 100644 --- a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java +++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationIntegrationTest.java @@ -81,11 +81,12 @@ public class ClientOrchestrationIntegrationTest { public void callBackOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + userIdService.request().accept(MediaType.APPLICATION_JSON).async().get(new InvocationCallback>() { @Override public void completed(List employeeIds) { logger.info("[CallbackExample] id-service result: {}", employeeIds); - CountDownLatch completionTracker = new CountDownLatch(employeeIds.size()); // used to keep track of the progress of the subsequent calls employeeIds.forEach((id) -> { // for each employee ID, get the name nameService.resolveTemplate("userId", id).request().async().get(new InvocationCallback() { @@ -94,17 +95,16 @@ public class ClientOrchestrationIntegrationTest { public void completed(String response) { logger.info("[CallbackExample] name-service result: {}", response); - completionTracker.countDown(); hashService.resolveTemplate("rawValue", response + id).request().async().get(new InvocationCallback() { @Override public void completed(String response) { logger.info("[CallbackExample] hash-service result: {}", response); receivedHashValues.add(response); + completionTracker.countDown(); } @Override public void failed(Throwable throwable) { - completionTracker.countDown(); logger.warn("[CallbackExample] An error has occurred in the hashing request step!", throwable); } }); @@ -112,21 +112,11 @@ public class ClientOrchestrationIntegrationTest { @Override public void failed(Throwable throwable) { - completionTracker.countDown(); logger.warn("[CallbackExample] An error has occurred in the username request step!", throwable); } }); }); - try { - // wait for inner requests to complete in 10 seconds - if (!completionTracker.await(10, TimeUnit.SECONDS)) { - logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); - } - } catch (InterruptedException e) { - logger.error("Interrupted!", e); - } - } @Override @@ -136,7 +126,14 @@ public class ClientOrchestrationIntegrationTest { }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } @@ -145,6 +142,8 @@ public class ClientOrchestrationIntegrationTest { public void rxOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + CompletionStage> userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON).rx().get(new GenericType>() { }).exceptionally((throwable) -> { logger.warn("[CompletionStageExample] An error has occurred"); @@ -161,8 +160,10 @@ public class ClientOrchestrationIntegrationTest { hashService.resolveTemplate("rawValue", userName + id).request().rx().get(String.class).toCompletableFuture().thenAcceptAsync(hashValue -> { logger.info("[CompletionStageExample] hash-service result: {}", hashValue); receivedHashValues.add(hashValue); + completionTracker.countDown(); }).exceptionally((throwable) -> { logger.warn("[CompletionStageExample] Hash computation failed for {}", id); + completionTracker.countDown(); return null; }); @@ -172,7 +173,14 @@ public class ClientOrchestrationIntegrationTest { }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } @@ -181,13 +189,15 @@ public class ClientOrchestrationIntegrationTest { public void observableJavaOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + Observable> observableUserIdService = userIdService.register(RxObservableInvokerProvider.class).request().accept(MediaType.APPLICATION_JSON).rx(RxObservableInvoker.class).get(new GenericType>() { }).asObservable(); observableUserIdService.subscribe((List employeeIds) -> { logger.info("[ObservableExample] id-service result: {}", employeeIds); Observable.from(employeeIds).subscribe(id -> nameService.register(RxObservableInvokerProvider.class).resolveTemplate("userId", id).request().rx(RxObservableInvoker.class).get(String.class).asObservable() // gotten the name for the given - // userId + // userId .doOnError((throwable) -> { logger.warn("[ObservableExample] An error has occurred in the username request step {}", throwable.getMessage()); }).subscribe(userName -> { @@ -197,14 +207,22 @@ public class ClientOrchestrationIntegrationTest { .doOnError((throwable) -> { logger.warn("[ObservableExample] An error has occurred in the hashing request step {}", throwable.getMessage()); }).subscribe(hashValue -> { - logger.info("[ObservableExample] hash-service result: {}", hashValue); - receivedHashValues.add(hashValue); - }); + logger.info("[ObservableExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + completionTracker.countDown(); + }); })); }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); } @@ -213,6 +231,8 @@ public class ClientOrchestrationIntegrationTest { public void flowableJavaOrchestrate() throws InterruptedException { List receivedHashValues = new ArrayList<>(); + final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls + Flowable> userIdFlowable = userIdService.register(RxFlowableInvokerProvider.class).request().rx(RxFlowableInvoker.class).get(new GenericType>() { }); @@ -223,20 +243,28 @@ public class ClientOrchestrationIntegrationTest { .doOnError((throwable) -> { logger.warn("[FlowableExample] An error has occurred in the username request step {}", throwable.getMessage()); }).subscribe(userName -> { - logger.info("[FlowableExample] name-service result: {}", userName); - hashService.register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username - .doOnError((throwable) -> { - logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable); - }).subscribe(hashValue -> { - logger.info("[FlowableExample] hash-service result: {}", hashValue); - receivedHashValues.add(hashValue); - }); - }); + logger.info("[FlowableExample] name-service result: {}", userName); + hashService.register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username + .doOnError((throwable) -> { + logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable); + }).subscribe(hashValue -> { + logger.info("[FlowableExample] hash-service result: {}", hashValue); + receivedHashValues.add(hashValue); + completionTracker.countDown(); + }); + }); }); }); // wait for async calls to complete - Thread.sleep(1000); + try { + // wait for inner requests to complete in 10 seconds + if (!completionTracker.await(10, TimeUnit.SECONDS)) { + logger.warn("[CallbackExample] Some requests didn't complete within the timeout"); + } + } catch (InterruptedException e) { + logger.error("Interrupted!", e); + } assertThat(receivedHashValues).containsAll(expectedHashValues); }