BAEL-6611 add example code for completablefuture vs future vs rxjava comparison

This commit is contained in:
Sam Gardner 2023-07-25 14:05:05 +01:00
parent 508a5e414b
commit 686f39fc38
6 changed files with 130 additions and 0 deletions

View File

@ -25,6 +25,11 @@
<version>${avaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.6</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,10 @@
package com.baeldung.concurrent.futurevscompletablefuturevsrxjava;
import java.util.concurrent.Callable;
public class ObjectCallable implements Callable<TestObject> {
@Override
public TestObject call() throws Exception {
return new TestObject();
}
}

View File

@ -0,0 +1,10 @@
package com.baeldung.concurrent.futurevscompletablefuturevsrxjava;
public class ObjectHydrator {
public TestObject hydrateTestObject(TestObject testObject){
testObject.setDataPointTwo(20);
return testObject;
}
}

View File

@ -0,0 +1,11 @@
package com.baeldung.concurrent.futurevscompletablefuturevsrxjava;
import java.util.function.Supplier;
public class ObjectSupplier implements Supplier<TestObject> {
@Override
public TestObject get() {
return new TestObject();
}
}

View File

@ -0,0 +1,32 @@
package com.baeldung.concurrent.futurevscompletablefuturevsrxjava;
public class TestObject {
private int dataPointOne;
private int dataPointTwo;
public TestObject() {
dataPointOne = 10;
}
public int getDataPointOne() {
return dataPointOne;
}
public void setDataPointOne(int dataPointOne) {
this.dataPointOne = dataPointOne;
}
public int getDataPointTwo() {
return dataPointTwo;
}
public void setDataPointTwo(int dataPointTwo) {
this.dataPointTwo = dataPointTwo;
}
@Override
public String toString() {
return "TestObject{" + "dataPointOne=" + dataPointOne + ", dataPointTwo=" + dataPointTwo + '}';
}
}

View File

@ -0,0 +1,62 @@
package com.baeldung.concurrent.futurevscompletablefuturevsrxjava;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Test;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class FutureVsCompletableFutureVsRxJavaUnitTest {
@Test
public void whenRetrievingObjectWithBasicFuture_thenExpectOnlySingleDataPointSet() throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<TestObject> future = exec.submit(new ObjectCallable());
TestObject retrievedObject = future.get();
assertEquals(10, retrievedObject.getDataPointOne());
assertEquals(0, retrievedObject.getDataPointTwo());
}
@Test
public void givenACompletableFuture_whenHydratingObjectAfterRetrieval_thenExpectBothDataPointsSet() throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
ObjectHydrator objectHydrator = new ObjectHydrator();
CompletableFuture<TestObject> future = CompletableFuture.supplyAsync(new ObjectSupplier(), exec)
.thenApply(objectHydrator::hydrateTestObject);
TestObject retrievedObject = future.get();
assertEquals(10, retrievedObject.getDataPointOne());
assertEquals(20, retrievedObject.getDataPointTwo());
}
@Test
public void givenAnObservable_whenRequestingData_thenItIsRetrieved() {
ObjectHydrator objectHydrator = new ObjectHydrator();
Observable<TestObject> observable = Observable.fromCallable(new ObjectCallable()).map(objectHydrator::hydrateTestObject);
observable.subscribe(System.out::println);
}
@Test
public void givenAnObservable_whenPushedData_thenItIsReceived() {
PublishSubject<Integer> source = PublishSubject.create();
Observable<Integer> observable = source.observeOn(Schedulers.computation());
observable.subscribe(System.out::println, (throwable) -> System.out.println("Error"), () -> System.out.println("Done"));
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onComplete();
}
}