BAEL-3858 Java Exchanger introduction (#9000)

This commit is contained in:
Dhrubajyoti Bhattacharjee 2020-05-15 22:17:08 +02:00 committed by GitHub
parent 821ed435f1
commit bccc9c836e
2 changed files with 146 additions and 0 deletions

View File

@ -0,0 +1,83 @@
package com.baeldung.exchanger;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import static java.util.concurrent.CompletableFuture.runAsync;
public class ExchangerPipeLineManualTest {
private static final int BUFFER_SIZE = 100;
@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException, ExecutionException {
Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
int counter = 0;
Runnable reader = () -> {
Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
while (true) {
readerBuffer.add(UUID.randomUUID().toString());
if (readerBuffer.size() >= BUFFER_SIZE) {
try {
readerBuffer = readerExchanger.exchange(readerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
};
Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writterBuffer = new ConcurrentLinkedQueue<>();
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
while (true) {
writterBuffer.add(processorBuffer.poll());
if (processorBuffer.isEmpty()) {
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
writterBuffer = writerExchanger.exchange(writterBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
Runnable writer = () -> {
Queue<String> writterBuffer = new ConcurrentLinkedQueue<>();
try {
writterBuffer = writerExchanger.exchange(writterBuffer);
while (true) {
System.out.println(writterBuffer.poll());
if (writterBuffer.isEmpty()) {
writterBuffer = writerExchanger.exchange(writterBuffer);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(runAsync(reader), runAsync(processor), runAsync(writer)).get();
}
}

View File

@ -0,0 +1,63 @@
package com.baeldung.exchanger;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import static java.util.concurrent.CompletableFuture.runAsync;
public class ExchangerUnitTest {
@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
Exchanger<String> exchanger = new Exchanger<>();
Runnable taskA = () -> {
try {
String message = exchanger.exchange("from A");
assertEquals("from B", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
Runnable taskB = () -> {
try {
String message = exchanger.exchange("from B");
assertEquals("from A", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(runAsync(taskA), runAsync(taskB)).join();
}
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException, ExecutionException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result = CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
}