mirror of https://github.com/apache/druid.git
PrioritizedExecutorService: Properly wrap on direct calls to "execute". (#11956)
Usually, "execute" is called by methods defined in the superclass AbstractExecutorService, and the passed-in Runnable has been wrapped by newTaskFor inside a PrioritizedListenableFutureTask. But this method can also be called directly, and if so, the same wrapping is necessary for the delegate to get a Runnable that can be entered into a priority queue with the others.
This commit is contained in:
parent
a4cb1de87a
commit
d6507c9428
|
@ -192,7 +192,11 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
|
|||
@Override
|
||||
public void execute(final Runnable runnable)
|
||||
{
|
||||
delegate.execute(runnable);
|
||||
if (runnable instanceof PrioritizedListenableFutureTask) {
|
||||
delegate.execute(runnable);
|
||||
} else {
|
||||
delegate.execute(newTaskFor(runnable, null));
|
||||
}
|
||||
}
|
||||
|
||||
public int getQueueSize()
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class PrioritizedExecutorServiceTest
|
||||
|
@ -182,6 +183,41 @@ public class PrioritizedExecutorServiceTest
|
|||
Assert.assertEquals(expected, ImmutableList.copyOf(order));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteRegularRunnable()
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
Assert.assertThrows(
|
||||
"Class does not implemented PrioritizedRunnable",
|
||||
IllegalArgumentException.class,
|
||||
() -> exec.execute(latch::countDown)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecutePrioritizedRunnable() throws InterruptedException
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
exec.execute(
|
||||
new PrioritizedRunnable()
|
||||
{
|
||||
@Override
|
||||
public int getPriority()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
);
|
||||
latch.await();
|
||||
}
|
||||
|
||||
// Make sure entries are processed FIFO
|
||||
@Test
|
||||
public void testOrderedExecutionEqualPriorityRunnable() throws ExecutionException, InterruptedException
|
||||
|
|
Loading…
Reference in New Issue