Wrap Executor in TaskExecutor to never reject (#13622)

Make it so rejected tasks are execute right away on the caller thread.
Users of the API shouldn't have to worry about rejections when we don't
expose any upper limit to the task count that we put on the executor that
would help in sizing a queue for the executor.
This commit is contained in:
Armin Braun 2024-07-31 19:34:35 +02:00 committed by GitHub
parent b4a8810b7a
commit a1816e3f65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 35 additions and 1 deletions

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -53,7 +54,20 @@ public final class TaskExecutor {
* @param executor the executor to be used for running tasks concurrently
*/
public TaskExecutor(Executor executor) {
this.executor = Objects.requireNonNull(executor, "Executor is null");
Objects.requireNonNull(executor, "Executor is null");
this.executor =
r -> {
try {
executor.execute(r);
} catch (
@SuppressWarnings("unused")
RejectedExecutionException rejectedExecutionException) {
// execute directly on the current thread in case of rejection to ensure a rejecting
// executor only reduces parallelism and does not
// result in failure
r.run();
}
};
}
/**

View File

@ -363,4 +363,24 @@ public class TestTaskExecutor extends LuceneTestCase {
assertEquals(0, throwable.getSuppressed().length);
assertEquals(throwingTask, executedTasks.get());
}
public void testTaskRejectionDoesNotFailExecution() throws Exception {
try (ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1))) {
final int taskCount = 1000; // enough tasks to cause queuing and rejections on the executor
final ArrayList<Callable<Void>> callables = new ArrayList<>(taskCount);
final AtomicInteger executedTasks = new AtomicInteger(0);
for (int i = 0; i < taskCount; i++) {
callables.add(
() -> {
executedTasks.incrementAndGet();
return null;
});
}
final TaskExecutor taskExecutor = new TaskExecutor(threadPoolExecutor);
var res = taskExecutor.invokeAll(callables);
assertEquals(taskCount, res.size());
assertEquals(taskCount, executedTasks.get());
}
}
}