diff --git a/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/BatchingApp.java b/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/BatchingApp.java new file mode 100644 index 0000000000..8f95d5cbee --- /dev/null +++ b/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/BatchingApp.java @@ -0,0 +1,72 @@ +package com.baeldung.smartbatching; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + +/** + * @author KPentaris + * @date 07/06/2023 + * @project design-patterns-behavioral-2 + */ +public class BatchingApp { + + static void simpleProcessing() throws Exception { + final Path testPath = Paths.get("./testio.txt"); + testPath.toFile().createNewFile(); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(100); + Set futures = new HashSet<>(); + for (int i = 0; i < 50000; i++) { + futures.add(executorService.submit(() -> { + try { + Files.write(testPath, Collections.singleton(Thread.currentThread().getName()), StandardOpenOption.APPEND); + } catch (IOException e) { + e.printStackTrace(); + } + })); + } + long start = System.currentTimeMillis(); + for (Future future : futures) { + future.get(); + } + System.out.println("Time: " + (System.currentTimeMillis() - start)); + executorService.shutdown(); + } + + static void batchedProcessing() throws Exception { + final Path testPath = Paths.get("./testio.txt"); + testPath.toFile().createNewFile(); + SmartBatcher batcher = new SmartBatcher(10, strings -> { + List content = new ArrayList<>(strings); + content.add("-----Batch Operation-----"); + try { + Files.write(testPath, content, StandardOpenOption.APPEND); + } catch (IOException e) { + e.printStackTrace(); + } + }); + for (int i = 0; i < 50000; i++) { + batcher.submit(Thread.currentThread().getName() + "-1"); + } + long start = System.currentTimeMillis(); + while (!batcher.finished()) { + Thread.sleep(10); + } + System.out.println("Time: " + (System.currentTimeMillis() - start)); + } + + public static void main(String[] args) throws Exception { +// simpleProcessing(); + batchedProcessing(); + } +} diff --git a/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/MicroBatcher.java b/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/MicroBatcher.java new file mode 100644 index 0000000000..b6d171246a --- /dev/null +++ b/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/MicroBatcher.java @@ -0,0 +1,59 @@ +package com.baeldung.smartbatching; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; + +/** + * @author KPentaris + * @date 07/06/2023 + * @project design-patterns-behavioral-2 + */ +public class MicroBatcher { + Queue tasksQueue = new ConcurrentLinkedQueue<>(); + Thread batchThread; + int executionThreshold; + int timeoutThreshold; + boolean working = false; + + MicroBatcher(int executionThreshold, int timeoutThreshold, Consumer> executionLogic) { + batchThread = new Thread(batchHandling(executionLogic)); + batchThread.setDaemon(true); + batchThread.start(); + this.executionThreshold = executionThreshold; + this.timeoutThreshold = timeoutThreshold; + } + + void submit(String task) { + tasksQueue.add(task); + } + + Runnable batchHandling(Consumer> executionLogic) { + return () -> { + while (!batchThread.isInterrupted()) { + long startTime = System.currentTimeMillis(); + while (tasksQueue.size() < executionThreshold && (System.currentTimeMillis() - startTime) < timeoutThreshold) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return; // exit the external loop + } + } + List tasks = new ArrayList<>(executionThreshold); + while (tasksQueue.size() > 0 && tasks.size() < executionThreshold) { + tasks.add(tasksQueue.poll()); + } + working = true; + executionLogic.accept(tasks); + working = false; + } + }; + } + + boolean finished() { + return tasksQueue.isEmpty() && !working; + } + +} diff --git a/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/SmartBatcher.java b/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/SmartBatcher.java new file mode 100644 index 0000000000..d4aab8e986 --- /dev/null +++ b/patterns-modules/design-patterns-behavioral-2/src/main/java/com/baeldung/smartbatching/SmartBatcher.java @@ -0,0 +1,52 @@ +package com.baeldung.smartbatching; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; + +/** + * @author KPentaris + * @date 07/06/2023 + * @project design-patterns-behavioral-2 + */ +public class SmartBatcher { + BlockingQueue tasksQueue = new LinkedBlockingQueue<>(); + Thread batchThread; + int executionThreshold; + boolean working = false; + + SmartBatcher(int executionThreshold, Consumer> executionLogic) { + batchThread = new Thread(batchHandling(executionLogic)); + batchThread.setDaemon(true); + batchThread.start(); + this.executionThreshold = executionThreshold; + } + + void submit(String task) { + tasksQueue.add(task); + } + + Runnable batchHandling(Consumer> executionLogic) { + return () -> { + while (!batchThread.isInterrupted()) { + List tasks = new ArrayList<>(executionThreshold); + while (tasksQueue.drainTo(tasks, executionThreshold) == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return; // exit the external loop + } + } + working = true; + executionLogic.accept(tasks); + working = false; + } + }; + } + + boolean finished() { + return tasksQueue.isEmpty() && !working; + } +}