diff --git a/maven-api-impl/src/main/java/org/apache/maven/internal/impl/util/PhasingExecutor.java b/maven-api-impl/src/main/java/org/apache/maven/internal/impl/util/PhasingExecutor.java index 85d4ed5976..da7b9f7d64 100644 --- a/maven-api-impl/src/main/java/org/apache/maven/internal/impl/util/PhasingExecutor.java +++ b/maven-api-impl/src/main/java/org/apache/maven/internal/impl/util/PhasingExecutor.java @@ -20,47 +20,146 @@ package org.apache.maven.internal.impl.util; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * The phasing executor is a simple executor that allows to execute tasks in parallel - * and wait for all tasks to be executed before closing the executor. The tasks that are - * currently being executed are allowed to submit new tasks while the executor is closed. - * The executor implements {@link AutoCloseable} to allow using the executor with - * a try-with-resources statement. + * The phasing executor allows executing tasks in parallel and waiting for all tasks + * to be executed before fully closing the executor. Tasks can be submitted even after + * the close method has been called, allowing for use with try-with-resources. + * The {@link #phase()} method can be used to submit tasks and wait for them to be + * executed without closing the executor. * - * The {@link #phase()} method can be used to submit tasks and wait for them to be executed - * without closing the executor. + *
Example usage: + *
+ * try (PhasingExecutor executor = createExecutor()) { + * try (var phase = executor.phase()) { + * executor.execute(() -> { /* task 1 */ }); + * executor.execute(() -> { /* task 2 */ }); + * More tasks... + * } This will wait for all tasks in this phase to complete + * + * You can have multiple phases + * try (var anotherPhase = executor.phase()) { + * executor.execute(() -> { /* another task */ }); + * } + * } The executor will wait for all tasks to complete before shutting down + **/ public class PhasingExecutor implements Executor, AutoCloseable { + private static final AtomicInteger ID = new AtomicInteger(0); + private static final Logger LOGGER = LoggerFactory.getLogger(PhasingExecutor.class); + private final ExecutorService executor; - private final Phaser phaser = new Phaser(); + private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false); + private final AtomicBoolean inPhase = new AtomicBoolean(false); + private final AtomicInteger activeTaskCount = new AtomicInteger(0); + private final AtomicInteger completedTaskCount = new AtomicInteger(0); + private final int id = ID.incrementAndGet(); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition taskCompletionCondition = lock.newCondition(); public PhasingExecutor(ExecutorService executor) { this.executor = executor; - this.phaser.register(); + log("[{}][general] PhasingExecutor created."); } @Override public void execute(Runnable command) { - phaser.register(); - executor.submit(() -> { + activeTaskCount.incrementAndGet(); + log("[{}][task] Task submitted. Active tasks: {}", activeTaskCount.get()); + executor.execute(() -> { try { + log("[{}][task] Task executing. Active tasks: {}", activeTaskCount.get()); command.run(); } finally { - phaser.arriveAndDeregister(); + lock.lock(); + try { + completedTaskCount.incrementAndGet(); + activeTaskCount.decrementAndGet(); + log("[{}][task] Task completed. Active tasks: {}", activeTaskCount.get()); + taskCompletionCondition.signalAll(); + if (activeTaskCount.get() == 0 && shutdownInitiated.get()) { + log("[{}][task] Last task completed. Initiating executor shutdown."); + executor.shutdown(); + } + } finally { + lock.unlock(); + } } }); } public AutoCloseable phase() { - phaser.register(); - return () -> phaser.awaitAdvance(phaser.arriveAndDeregister()); + if (inPhase.getAndSet(true)) { + throw new IllegalStateException("Already in a phase"); + } + int phaseNumber = completedTaskCount.get(); + log("[{}][phase] Entering phase {}. Active tasks: {}", phaseNumber, activeTaskCount.get()); + return () -> { + try { + int tasksAtPhaseStart = completedTaskCount.get(); + log("[{}][phase] Closing phase {}. Waiting for all tasks to complete.", phaseNumber); + lock.lock(); + try { + while (activeTaskCount.get() > 0 + && completedTaskCount.get() - tasksAtPhaseStart < activeTaskCount.get()) { + taskCompletionCondition.await(100, TimeUnit.MILLISECONDS); + } + } finally { + lock.unlock(); + } + log("[{}][phase] Phase {} completed. Total completed tasks: {}", phaseNumber, completedTaskCount.get()); + } catch (InterruptedException e) { + log("[{}][phase] Phase {} was interrupted.", phaseNumber); + Thread.currentThread().interrupt(); + throw new RuntimeException("Phase interrupted", e); + } finally { + inPhase.set(false); + } + }; } @Override public void close() { - phaser.arriveAndAwaitAdvance(); - executor.shutdownNow(); + log("[{}][close] Closing PhasingExecutor. Active tasks: {}", activeTaskCount.get()); + if (shutdownInitiated.getAndSet(true)) { + log("[{}][close] Shutdown already initiated. Returning."); + return; + } + + lock.lock(); + try { + while (activeTaskCount.get() > 0) { + log("[{}][close] Waiting for {} active tasks to complete.", activeTaskCount.get()); + taskCompletionCondition.await(100, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + log("[{}][close] Interrupted while waiting for tasks to complete."); + Thread.currentThread().interrupt(); + } finally { + lock.unlock(); + log("[{}][close] All tasks completed. Shutting down executor."); + executor.shutdown(); + } + log("[{}][close] PhasingExecutor closed. Total completed tasks: {}", completedTaskCount.get()); + } + + private void log(String message) { + LOGGER.debug(message, id); + } + + private void log(String message, Object o1) { + LOGGER.debug(message, id, o1); + } + + private void log(String message, Object o1, Object o2) { + LOGGER.debug(message, id, o1, o2); } }