From ca116cf88639cbe8f4705d43b4730dc550272111 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 15 Jun 2023 00:10:31 -0700 Subject: [PATCH] adjust broker parallel merge to help managed blocking be more well behaved (#14427) --- .../guava/ParallelMergeCombiningSequence.java | 6 +- .../ParallelMergeCombiningSequenceTest.java | 189 +++++++++--------- 2 files changed, 98 insertions(+), 97 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 1f09fc9163d..1df3c9a9dc1 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -800,6 +800,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase if (hasTimeout) { final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime(); if (thisTimeoutNanos < 0) { + item = null; throw new QueryTimeoutException("QueuePusher timed out offering data"); } success = queue.offer(item, thisTimeoutNanos, TimeUnit.NANOSECONDS); @@ -826,6 +827,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase ForkJoinPool.managedBlock(this); } catch (InterruptedException e) { + this.item = null; throw new RuntimeException("Failed to offer result to output queue", e); } } @@ -930,6 +932,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase return batchYielder; } catch (InterruptedException e) { + batchYielder = Yielders.done(null, null); throw new RuntimeException("Failed to load initial batch of results", e); } } @@ -1073,7 +1076,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase @Override public boolean isReleasable() { - return resultBatch != null && !resultBatch.isDrained(); + return yielder.isDone() || (resultBatch != null && !resultBatch.isDrained()); } @Override @@ -1144,6 +1147,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase if (hasTimeout) { final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime(); if (thisTimeoutNanos < 0) { + resultBatch = ResultBatch.TERMINAL; throw new QueryTimeoutException("BlockingQueue cursor timed out waiting for data"); } resultBatch = queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS); diff --git a/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java b/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java index 49bd361402c..ca34c364dca 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java @@ -24,12 +24,11 @@ import org.apache.druid.common.guava.CombiningSequence; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.utils.JvmUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.ArrayList; @@ -63,9 +62,6 @@ public class ParallelMergeCombiningSequenceTest private ForkJoinPool pool; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Before public void setup() { @@ -243,14 +239,14 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testNone() throws Exception + public void testNone() throws IOException { List> input = new ArrayList<>(); assertResult(input); } @Test - public void testEmpties() throws Exception + public void testEmpties() throws IOException { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -266,7 +262,7 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testEmptiesAndNonEmpty() throws Exception + public void testEmptiesAndNonEmpty() throws IOException { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -285,8 +281,9 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(5)); assertResult(input); } + @Test - public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool() throws Exception + public void testMergeCombineMetricsAccumulatorNPEOnBadExecutorPool() { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -299,16 +296,19 @@ public class ParallelMergeCombiningSequenceTest (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t), true ); - expectedException.expect(QueryTimeoutException.class); - expectedException.expectMessage( - "Query did not complete within configured timeout period" + Throwable t = Assert.assertThrows( + QueryTimeoutException.class, + () -> assertResultWithCustomPool(input, 10, 20, reportMetrics -> {}, customBadPool) + ); + Assert.assertEquals( + "Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query.", + t.getMessage() ); - assertResultWithCustomPool(input, 10, 20, reportMetrics -> {}, customBadPool); customBadPool.shutdown(); } @Test - public void testAllInSingleBatch() throws Exception + public void testAllInSingleBatch() throws IOException { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -343,7 +343,7 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testAllInSingleYield() throws Exception + public void testAllInSingleYield() throws IOException { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -379,7 +379,7 @@ public class ParallelMergeCombiningSequenceTest @Test - public void testMultiBatchMultiYield() throws Exception + public void testMultiBatchMultiYield() throws IOException { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -412,7 +412,7 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testMixedSingleAndMultiYield() throws Exception + public void testMixedSingleAndMultiYield() throws IOException { // below min threshold, so will merge serially List> input = new ArrayList<>(); @@ -431,9 +431,8 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testLongerSequencesJustForFun() throws Exception + public void testLongerSequencesJustForFun() throws IOException { - List> input = new ArrayList<>(); input.add(nonBlockingSequence(10_000)); input.add(nonBlockingSequence(9_001)); @@ -457,23 +456,21 @@ public class ParallelMergeCombiningSequenceTest } @Test - public void testExceptionOnInputSequenceRead() throws Exception + public void testExceptionOnInputSequenceRead() { List> input = new ArrayList<>(); input.add(explodingSequence(15)); input.add(nonBlockingSequence(25)); - - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "exploded" - ); - assertException(input); + Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input)); + Assert.assertEquals("exploded", t.getMessage()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); } @Test - public void testExceptionOnInputSequenceRead2() throws Exception + public void testExceptionOnInputSequenceRead2() { List> input = new ArrayList<>(); input.add(nonBlockingSequence(5)); @@ -481,15 +478,14 @@ public class ParallelMergeCombiningSequenceTest input.add(explodingSequence(11)); input.add(nonBlockingSequence(12)); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "exploded" - ); - assertException(input); + Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input)); + Assert.assertEquals("exploded", t.getMessage()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); } @Test - public void testExceptionFirstResultFromSequence() throws Exception + public void testExceptionFirstResultFromSequence() { List> input = new ArrayList<>(); input.add(explodingSequence(0)); @@ -497,15 +493,14 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(2)); input.add(nonBlockingSequence(2)); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "exploded" - ); - assertException(input); + Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input)); + Assert.assertEquals("exploded", t.getMessage()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); } @Test - public void testExceptionFirstResultFromMultipleSequence() throws Exception + public void testExceptionFirstResultFromMultipleSequence() { List> input = new ArrayList<>(); input.add(explodingSequence(0)); @@ -515,15 +510,14 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(2)); input.add(nonBlockingSequence(2)); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "exploded" - ); - assertException(input); + Throwable t = Assert.assertThrows(RuntimeException.class, () -> assertException(input)); + Assert.assertEquals("exploded", t.getMessage()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); } @Test - public void testTimeoutExceptionDueToStalledInput() throws Exception + public void testTimeoutExceptionDueToStalledInput() { final int someSize = 2048; List> input = new ArrayList<>(); @@ -531,21 +525,33 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(someSize)); input.add(nonBlockingSequence(someSize)); input.add(blockingSequence(someSize, 400, 500, 1, 500, true)); - expectedException.expect(QueryTimeoutException.class); - expectedException.expectMessage("Query did not complete within configured timeout period. " + - "You can increase query timeout or tune the performance of query"); - assertException( - input, - ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS, - ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS, - 1000L, - 0 + Throwable t = Assert.assertThrows( + QueryTimeoutException.class, + () -> assertException( + input, + ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS, + ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS, + 1000L, + 0 + ) ); + Assert.assertEquals("Query did not complete within configured timeout period. " + + "You can increase query timeout or tune the performance of query.", t.getMessage()); + + + // these tests when run in java 11, 17 and maybe others in between 8 and 20 don't seem to correctly clean up the + // pool, however this behavior is flaky and doesn't always happen so we can't definitively assert that the pool is + // or isn't + if (JvmUtils.majorVersion() >= 20 || JvmUtils.majorVersion() < 9) { + Assert.assertTrue(pool.awaitQuiescence(3, TimeUnit.SECONDS)); + // good result, we want the pool to always be idle if an exception occurred during processing + Assert.assertTrue(pool.isQuiescent()); + } } @Test - public void testTimeoutExceptionDueToStalledReader() throws Exception + public void testTimeoutExceptionDueToStalledReader() { final int someSize = 2048; List> input = new ArrayList<>(); @@ -554,14 +560,15 @@ public class ParallelMergeCombiningSequenceTest input.add(nonBlockingSequence(someSize)); input.add(nonBlockingSequence(someSize)); - expectedException.expect(QueryTimeoutException.class); - expectedException.expectMessage("Query did not complete within configured timeout period. " + - "You can increase query timeout or tune the performance of query"); - assertException(input, 8, 64, 1000, 500); + Throwable t = Assert.assertThrows(QueryTimeoutException.class, () -> assertException(input, 8, 64, 1000, 1500)); + Assert.assertEquals("Query did not complete within configured timeout period. " + + "You can increase query timeout or tune the performance of query.", t.getMessage()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); } @Test - public void testGracefulCloseOfYielderCancelsPool() throws Exception + public void testGracefulCloseOfYielderCancelsPool() throws IOException { List> input = new ArrayList<>(); @@ -585,8 +592,7 @@ public class ParallelMergeCombiningSequenceTest }); } - - private void assertResult(List> sequences) throws InterruptedException, IOException + private void assertResult(List> sequences) throws IOException { assertResult( sequences, @@ -597,7 +603,7 @@ public class ParallelMergeCombiningSequenceTest } private void assertResult(List> sequences, int batchSize, int yieldAfter) - throws InterruptedException, IOException + throws IOException { assertResult( sequences, @@ -662,13 +668,14 @@ public class ParallelMergeCombiningSequenceTest // (though shouldn't actually matter even if it was...) Assert.assertFalse(parallelMergeCombineSequence.getCancellationGizmo().isCancelled()); } + private void assertResult( List> sequences, int batchSize, int yieldAfter, Consumer reporter ) - throws InterruptedException, IOException + throws IOException { final CombiningSequence combiningSequence = CombiningSequence.create( new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)), @@ -706,10 +713,8 @@ public class ParallelMergeCombiningSequenceTest Assert.assertTrue(combiningYielder.isDone()); Assert.assertTrue(parallelMergeCombineYielder.isDone()); - while (pool.getRunningThreadCount() > 0) { - Thread.sleep(100); - } - Assert.assertEquals(0, pool.getRunningThreadCount()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); combiningYielder.close(); parallelMergeCombineYielder.close(); // cancellation trigger should not be set if sequence was fully yielded and close is called @@ -724,7 +729,7 @@ public class ParallelMergeCombiningSequenceTest int closeYielderAfter, Consumer reporter ) - throws InterruptedException, IOException + throws IOException { final CombiningSequence combiningSequence = CombiningSequence.create( new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)), @@ -769,15 +774,10 @@ public class ParallelMergeCombiningSequenceTest } // trying to next the yielder creates sadness for you final String expectedExceptionMsg = "Already closed"; - try { - Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get()); - parallelMergeCombineYielder.next(parallelMergeCombineYielder.get()); - // this should explode so the contradictory next statement should not be reached - Assert.assertTrue(false); - } - catch (RuntimeException rex) { - Assert.assertEquals(expectedExceptionMsg, rex.getMessage()); - } + Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get()); + final Yielder finalYielder = parallelMergeCombineYielder; + Throwable t = Assert.assertThrows(RuntimeException.class, () -> finalYielder.next(finalYielder.get())); + Assert.assertEquals(expectedExceptionMsg, t.getMessage()); // cancellation gizmo of sequence should be cancelled, and also should contain our expected message Assert.assertTrue(parallelMergeCombineSequence.getCancellationGizmo().isCancelled()); @@ -786,16 +786,14 @@ public class ParallelMergeCombiningSequenceTest parallelMergeCombineSequence.getCancellationGizmo().getRuntimeException().getMessage() ); - while (pool.getRunningThreadCount() > 0) { - Thread.sleep(100); - } - Assert.assertEquals(0, pool.getRunningThreadCount()); + Assert.assertTrue(pool.awaitQuiescence(1, TimeUnit.SECONDS)); + Assert.assertTrue(pool.isQuiescent()); Assert.assertFalse(combiningYielder.isDone()); Assert.assertFalse(parallelMergeCombineYielder.isDone()); } - private void assertException(List> sequences) throws Exception + private void assertException(List> sequences) throws Throwable { assertException( sequences, @@ -813,9 +811,9 @@ public class ParallelMergeCombiningSequenceTest long timeout, int readDelayMillis ) - throws Exception + throws Throwable { - try { + Throwable t = Assert.assertThrows(Exception.class, () -> { final ParallelMergeCombiningSequence parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>( pool, sequences, @@ -844,17 +842,16 @@ public class ParallelMergeCombiningSequenceTest parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get()); } parallelMergeCombineYielder.close(); - } - catch (Exception ex) { - sequences.forEach(sequence -> { - if (sequence instanceof ExplodingSequence) { - ExplodingSequence exploder = (ExplodingSequence) sequence; - Assert.assertEquals(1, exploder.getCloseCount()); - } - }); - LOG.warn(ex, "exception:"); - throw ex; - } + }); + + sequences.forEach(sequence -> { + if (sequence instanceof ExplodingSequence) { + ExplodingSequence exploder = (ExplodingSequence) sequence; + Assert.assertEquals(1, exploder.getCloseCount()); + } + }); + LOG.warn(t, "exception:"); + throw t; } public static class IntPair extends Pair