BAEL-4466 Smart Batching in Java (#14190)
* Tutorial code for BAEL-4466 Smart Batching in Java * Revert "Tutorial code for BAEL-4466 Smart Batching in Java" This reverts commit d8d4fa7a42b0806d1f0a64d53425a3a4337cb79d. * Tutorial code for BAEL-4466 Smart Batching in Java
This commit is contained in:
parent
91b0d6478b
commit
718e1998cb
@ -0,0 +1,72 @@
|
|||||||
|
package com.baeldung.smartbatching;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author KPentaris
|
||||||
|
* @date 07/06/2023
|
||||||
|
* @project design-patterns-behavioral-2
|
||||||
|
*/
|
||||||
|
public class BatchingApp {
|
||||||
|
|
||||||
|
static void simpleProcessing() throws Exception {
|
||||||
|
final Path testPath = Paths.get("./testio.txt");
|
||||||
|
testPath.toFile().createNewFile();
|
||||||
|
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(100);
|
||||||
|
Set<Future> futures = new HashSet<>();
|
||||||
|
for (int i = 0; i < 50000; i++) {
|
||||||
|
futures.add(executorService.submit(() -> {
|
||||||
|
try {
|
||||||
|
Files.write(testPath, Collections.singleton(Thread.currentThread().getName()), StandardOpenOption.APPEND);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for (Future future : futures) {
|
||||||
|
future.get();
|
||||||
|
}
|
||||||
|
System.out.println("Time: " + (System.currentTimeMillis() - start));
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void batchedProcessing() throws Exception {
|
||||||
|
final Path testPath = Paths.get("./testio.txt");
|
||||||
|
testPath.toFile().createNewFile();
|
||||||
|
SmartBatcher batcher = new SmartBatcher(10, strings -> {
|
||||||
|
List<String> content = new ArrayList<>(strings);
|
||||||
|
content.add("-----Batch Operation-----");
|
||||||
|
try {
|
||||||
|
Files.write(testPath, content, StandardOpenOption.APPEND);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for (int i = 0; i < 50000; i++) {
|
||||||
|
batcher.submit(Thread.currentThread().getName() + "-1");
|
||||||
|
}
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
while (!batcher.finished()) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
System.out.println("Time: " + (System.currentTimeMillis() - start));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
// simpleProcessing();
|
||||||
|
batchedProcessing();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
package com.baeldung.smartbatching;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author KPentaris
|
||||||
|
* @date 07/06/2023
|
||||||
|
* @project design-patterns-behavioral-2
|
||||||
|
*/
|
||||||
|
public class MicroBatcher {
|
||||||
|
Queue<String> tasksQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
Thread batchThread;
|
||||||
|
int executionThreshold;
|
||||||
|
int timeoutThreshold;
|
||||||
|
boolean working = false;
|
||||||
|
|
||||||
|
MicroBatcher(int executionThreshold, int timeoutThreshold, Consumer<List<String>> executionLogic) {
|
||||||
|
batchThread = new Thread(batchHandling(executionLogic));
|
||||||
|
batchThread.setDaemon(true);
|
||||||
|
batchThread.start();
|
||||||
|
this.executionThreshold = executionThreshold;
|
||||||
|
this.timeoutThreshold = timeoutThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
void submit(String task) {
|
||||||
|
tasksQueue.add(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable batchHandling(Consumer<List<String>> executionLogic) {
|
||||||
|
return () -> {
|
||||||
|
while (!batchThread.isInterrupted()) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
while (tasksQueue.size() < executionThreshold && (System.currentTimeMillis() - startTime) < timeoutThreshold) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return; // exit the external loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<String> tasks = new ArrayList<>(executionThreshold);
|
||||||
|
while (tasksQueue.size() > 0 && tasks.size() < executionThreshold) {
|
||||||
|
tasks.add(tasksQueue.poll());
|
||||||
|
}
|
||||||
|
working = true;
|
||||||
|
executionLogic.accept(tasks);
|
||||||
|
working = false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean finished() {
|
||||||
|
return tasksQueue.isEmpty() && !working;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,52 @@
|
|||||||
|
package com.baeldung.smartbatching;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author KPentaris
|
||||||
|
* @date 07/06/2023
|
||||||
|
* @project design-patterns-behavioral-2
|
||||||
|
*/
|
||||||
|
public class SmartBatcher {
|
||||||
|
BlockingQueue<String> tasksQueue = new LinkedBlockingQueue<>();
|
||||||
|
Thread batchThread;
|
||||||
|
int executionThreshold;
|
||||||
|
boolean working = false;
|
||||||
|
|
||||||
|
SmartBatcher(int executionThreshold, Consumer<List<String>> executionLogic) {
|
||||||
|
batchThread = new Thread(batchHandling(executionLogic));
|
||||||
|
batchThread.setDaemon(true);
|
||||||
|
batchThread.start();
|
||||||
|
this.executionThreshold = executionThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
void submit(String task) {
|
||||||
|
tasksQueue.add(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable batchHandling(Consumer<List<String>> executionLogic) {
|
||||||
|
return () -> {
|
||||||
|
while (!batchThread.isInterrupted()) {
|
||||||
|
List<String> tasks = new ArrayList<>(executionThreshold);
|
||||||
|
while (tasksQueue.drainTo(tasks, executionThreshold) == 0) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return; // exit the external loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
working = true;
|
||||||
|
executionLogic.accept(tasks);
|
||||||
|
working = false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean finished() {
|
||||||
|
return tasksQueue.isEmpty() && !working;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user