adjust broker parallel merge to help managed blocking be more well behaved (#14427)

This commit is contained in:
Clint Wylie 2023-06-15 00:10:31 -07:00 committed by GitHub
parent 5314db9f85
commit ca116cf886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 97 deletions

View File

@ -800,6 +800,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
if (hasTimeout) {
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (thisTimeoutNanos < 0) {
item = null;
throw new QueryTimeoutException("QueuePusher timed out offering data");
}
success = queue.offer(item, thisTimeoutNanos, TimeUnit.NANOSECONDS);
@ -826,6 +827,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
ForkJoinPool.managedBlock(this);
}
catch (InterruptedException e) {
this.item = null;
throw new RuntimeException("Failed to offer result to output queue", e);
}
}
@ -930,6 +932,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
return batchYielder;
}
catch (InterruptedException e) {
batchYielder = Yielders.done(null, null);
throw new RuntimeException("Failed to load initial batch of results", e);
}
}
@ -1073,7 +1076,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
@Override
public boolean isReleasable()
{
return resultBatch != null && !resultBatch.isDrained();
return yielder.isDone() || (resultBatch != null && !resultBatch.isDrained());
}
@Override
@ -1144,6 +1147,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
if (hasTimeout) {
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (thisTimeoutNanos < 0) {
resultBatch = ResultBatch.TERMINAL;
throw new QueryTimeoutException("BlockingQueue cursor timed out waiting for data");
}
resultBatch = queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS);

View File

@ -24,12 +24,11 @@ import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.JvmUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
@ -63,9 +62,6 @@ public class ParallelMergeCombiningSequenceTest
private ForkJoinPool pool;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
@ -243,14 +239,14 @@ public class ParallelMergeCombiningSequenceTest
}
@Test
public void testNone() throws Exception
public void testNone() throws IOException
{
List<Sequence<IntPair>> input = new ArrayList<>();
assertResult(input);
}
@Test
public void testEmpties() throws Exception
public void testEmpties() throws IOException
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -266,7 +262,7 @@ public class ParallelMergeCombiningSequenceTest
}
@Test
public void testEmptiesAndNonEmpty() throws Exception
public void testEmptiesAndNonEmpty() throws IOException
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -285,8 +281,9 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(5));
assertResult(input);
}
@Test
public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool() throws Exception
public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool()
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -299,16 +296,19 @@ public class ParallelMergeCombiningSequenceTest
(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
true
);
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage(
"Query did not complete within configured timeout period"
Throwable t = Assert.assertThrows(
QueryTimeoutException.class,
() -> assertResultWithCustomPool(input, 10, 20, reportMetrics -> {}, customBadPool)
);
Assert.assertEquals(
"Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query.",
t.getMessage()
);
assertResultWithCustomPool(input, 10, 20, reportMetrics -> {}, customBadPool);
customBadPool.shutdown();
}
@Test
public void testAllInSingleBatch() throws Exception
public void testAllInSingleBatch() throws IOException
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -343,7 +343,7 @@ public class ParallelMergeCombiningSequenceTest
}
@Test
public void testAllInSingleYield() throws Exception
public void testAllInSingleYield() throws IOException
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -379,7 +379,7 @@ public class ParallelMergeCombiningSequenceTest
@Test
public void testMultiBatchMultiYield() throws Exception
public void testMultiBatchMultiYield() throws IOException
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -412,7 +412,7 @@ public class ParallelMergeCombiningSequenceTest
}
@Test
public void testMixedSingleAndMultiYield() throws Exception
public void testMixedSingleAndMultiYield() throws IOException
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
@ -431,9 +431,8 @@ public class ParallelMergeCombiningSequenceTest
}
@Test
public void testLongerSequencesJustForFun() throws Exception
public void testLongerSequencesJustForFun() throws IOException
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(10_000));
input.add(nonBlockingSequence(9_001));
@ -457,23 +456,21 @@ public class ParallelMergeCombiningSequenceTest
}
@Test
public void testExceptionOnInputSequenceRead() throws Exception
public void testExceptionOnInputSequenceRead()
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(explodingSequence(15));
input.add(nonBlockingSequence(25));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input));
Assert.assertEquals("exploded", t.getMessage());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
}
@Test
public void testExceptionOnInputSequenceRead2() throws Exception
public void testExceptionOnInputSequenceRead2()
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
@ -481,15 +478,14 @@ public class ParallelMergeCombiningSequenceTest
input.add(explodingSequence(11));
input.add(nonBlockingSequence(12));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input));
Assert.assertEquals("exploded", t.getMessage());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
}
@Test
public void testExceptionFirstResultFromSequence() throws Exception
public void testExceptionFirstResultFromSequence()
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(explodingSequence(0));
@ -497,15 +493,14 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(2));
input.add(nonBlockingSequence(2));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input));
Assert.assertEquals("exploded", t.getMessage());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
}
@Test
public void testExceptionFirstResultFromMultipleSequence() throws Exception
public void testExceptionFirstResultFromMultipleSequence()
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(explodingSequence(0));
@ -515,15 +510,14 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(2));
input.add(nonBlockingSequence(2));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input));
Assert.assertEquals("exploded", t.getMessage());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
}
@Test
public void testTimeoutExceptionDueToStalledInput() throws Exception
public void testTimeoutExceptionDueToStalledInput()
{
final int someSize = 2048;
List<Sequence<IntPair>> input = new ArrayList<>();
@ -531,21 +525,33 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(blockingSequence(someSize, 400, 500, 1, 500, true));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query");
assertException(
input,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS,
1000L,
0
Throwable t = Assert.assertThrows(
QueryTimeoutException.class,
() -> assertException(
input,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS,
1000L,
0
)
);
Assert.assertEquals("Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query.", t.getMessage());
// these tests when run in java 11, 17 and maybe others in between 8 and 20 don't seem to correctly clean up the
// pool, however this behavior is flaky and doesn't always happen so we can't definitively assert that the pool is
// or isn't
if (JvmUtils.majorVersion() >= 20 || JvmUtils.majorVersion() < 9) {
Assert.assertTrue(pool.awaitQuiescence(3, TimeUnit.SECONDS));
// good result, we want the pool to always be idle if an exception occurred during processing
Assert.assertTrue(pool.isQuiescent());
}
}
@Test
public void testTimeoutExceptionDueToStalledReader() throws Exception
public void testTimeoutExceptionDueToStalledReader()
{
final int someSize = 2048;
List<Sequence<IntPair>> input = new ArrayList<>();
@ -554,14 +560,15 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query");
assertException(input, 8, 64, 1000, 500);
Throwable t = Assert.assertThrows(QueryTimeoutException.class, () -> assertException(input, 8, 64, 1000, 1500));
Assert.assertEquals("Query did not complete within configured timeout period. " +
"You can increase query timeout or tune the performance of query.", t.getMessage());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
}
@Test
public void testGracefulCloseOfYielderCancelsPool() throws Exception
public void testGracefulCloseOfYielderCancelsPool() throws IOException
{
List<Sequence<IntPair>> input = new ArrayList<>();
@ -585,8 +592,7 @@ public class ParallelMergeCombiningSequenceTest
});
}
private void assertResult(List<Sequence<IntPair>> sequences) throws InterruptedException, IOException
private void assertResult(List<Sequence<IntPair>> sequences) throws IOException
{
assertResult(
sequences,
@ -597,7 +603,7 @@ public class ParallelMergeCombiningSequenceTest
}
private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter)
throws InterruptedException, IOException
throws IOException
{
assertResult(
sequences,
@ -662,13 +668,14 @@ public class ParallelMergeCombiningSequenceTest
// (though shouldn't actually matter even if it was...)
Assert.assertFalse(parallelMergeCombineSequence.getCancellationGizmo().isCancelled());
}
private void assertResult(
List<Sequence<IntPair>> sequences,
int batchSize,
int yieldAfter,
Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter
)
throws InterruptedException, IOException
throws IOException
{
final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create(
new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)),
@ -706,10 +713,8 @@ public class ParallelMergeCombiningSequenceTest
Assert.assertTrue(combiningYielder.isDone());
Assert.assertTrue(parallelMergeCombineYielder.isDone());
while (pool.getRunningThreadCount() > 0) {
Thread.sleep(100);
}
Assert.assertEquals(0, pool.getRunningThreadCount());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
combiningYielder.close();
parallelMergeCombineYielder.close();
// cancellation trigger should not be set if sequence was fully yielded and close is called
@ -724,7 +729,7 @@ public class ParallelMergeCombiningSequenceTest
int closeYielderAfter,
Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter
)
throws InterruptedException, IOException
throws IOException
{
final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create(
new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)),
@ -769,15 +774,10 @@ public class ParallelMergeCombiningSequenceTest
}
// trying to next the yielder creates sadness for you
final String expectedExceptionMsg = "Already closed";
try {
Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get());
parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
// this should explode so the contradictory next statement should not be reached
Assert.assertTrue(false);
}
catch (RuntimeException rex) {
Assert.assertEquals(expectedExceptionMsg, rex.getMessage());
}
Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get());
final Yielder<IntPair> finalYielder = parallelMergeCombineYielder;
Throwable t = Assert.assertThrows(RuntimeException.class, () -> finalYielder.next(finalYielder.get()));
Assert.assertEquals(expectedExceptionMsg, t.getMessage());
// cancellation gizmo of sequence should be cancelled, and also should contain our expected message
Assert.assertTrue(parallelMergeCombineSequence.getCancellationGizmo().isCancelled());
@ -786,16 +786,14 @@ public class ParallelMergeCombiningSequenceTest
parallelMergeCombineSequence.getCancellationGizmo().getRuntimeException().getMessage()
);
while (pool.getRunningThreadCount() > 0) {
Thread.sleep(100);
}
Assert.assertEquals(0, pool.getRunningThreadCount());
Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS));
Assert.assertTrue(pool.isQuiescent());
Assert.assertFalse(combiningYielder.isDone());
Assert.assertFalse(parallelMergeCombineYielder.isDone());
}
private void assertException(List<Sequence<IntPair>> sequences) throws Exception
private void assertException(List<Sequence<IntPair>> sequences) throws Throwable
{
assertException(
sequences,
@ -813,9 +811,9 @@ public class ParallelMergeCombiningSequenceTest
long timeout,
int readDelayMillis
)
throws Exception
throws Throwable
{
try {
Throwable t = Assert.assertThrows(Exception.class, () -> {
final ParallelMergeCombiningSequence<IntPair> parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>(
pool,
sequences,
@ -844,17 +842,16 @@ public class ParallelMergeCombiningSequenceTest
parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
}
parallelMergeCombineYielder.close();
}
catch (Exception ex) {
sequences.forEach(sequence -> {
if (sequence instanceof ExplodingSequence) {
ExplodingSequence exploder = (ExplodingSequence) sequence;
Assert.assertEquals(1, exploder.getCloseCount());
}
});
LOG.warn(ex, "exception:");
throw ex;
}
});
sequences.forEach(sequence -> {
if (sequence instanceof ExplodingSequence) {
ExplodingSequence exploder = (ExplodingSequence) sequence;
Assert.assertEquals(1, exploder.getCloseCount());
}
});
LOG.warn(t, "exception:");
throw t;
}
public static class IntPair extends Pair<Integer, Integer>