From d7a15be9bc29dfa3863dce76a8b93770a62cab7f Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 8 Feb 2023 16:33:37 +0530 Subject: [PATCH] Add assertions for counters from reports (#13726) Adds assertions for counters to MSQ unit tests --- .../apache/druid/msq/exec/MSQInsertTest.java | 201 +++++++++++++++++- .../apache/druid/msq/exec/MSQReplaceTest.java | 127 ++++++++++- .../apache/druid/msq/exec/MSQSelectTest.java | 194 ++++++++++++++++- ...ilder.java => CounterSnapshotMatcher.java} | 40 +++- .../apache/druid/msq/test/MSQTestBase.java | 34 +-- 5 files changed, 551 insertions(+), 45 deletions(-) rename extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/{CounterSnapshotBuilder.java => CounterSnapshotMatcher.java} (50%) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index f0719490363..efd9e5e9d9f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.RowTooLargeFault; -import org.apache.druid.msq.test.CounterSnapshotBuilder; +import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -87,6 +87,10 @@ public class MSQInsertTest extends MSQTestBase @Test public void testInsertOnFoo1() { + List expectedRows = expectedFooRows(); + int expectedCounterRows = expectedRows.size(); + long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1); + RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) @@ -98,7 +102,32 @@ public class MSQInsertTest extends MSQTestBase .setQueryContext(context) .setExpectedRowSignature(rowSignature) .setExpectedSegment(expectedFooSegments()) - .setExpectedResultRows(expectedFooRows()) + .setExpectedResultRows(expectedRows) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 2, 0, "input0" + ) .verifyResults(); } @@ -135,11 +164,30 @@ public class MSQInsertTest extends MSQTestBase ))) .setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L})) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotBuilder - .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) - .buildChannelCounter(), + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), 0, 0, "input0" ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 2, 0, "input0" + ) .verifyResults(); } @@ -166,6 +214,10 @@ public class MSQInsertTest extends MSQTestBase @Test public void testInsertOnFoo1WithTimeFunctionWithSequential() { + List expectedRows = expectedFooRows(); + int expectedCounterRows = expectedRows.size(); + long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1); + RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) @@ -185,7 +237,32 @@ public class MSQInsertTest extends MSQTestBase .setExpectedRowSignature(rowSignature) .setQueryContext(MSQInsertTest.this.context) .setExpectedSegment(expectedFooSegments()) - .setExpectedResultRows(expectedFooRows()) + .setExpectedResultRows(expectedRows) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 2, 0, "input0" + ) .verifyResults(); } @@ -284,6 +361,10 @@ public class MSQInsertTest extends MSQTestBase @Test public void testRollUpOnFoo1UpOnFoo1() { + List expectedRows = expectedFooRows(); + int expectedCounterRows = expectedRows.size(); + long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1); + RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) @@ -299,7 +380,32 @@ public class MSQInsertTest extends MSQTestBase .addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt")) .setExpectedRowSignature(rowSignature) .setExpectedSegment(expectedFooSegments()) - .setExpectedResultRows(expectedFooRows()) + .setExpectedResultRows(expectedRows) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 2, 0, "input0" + ) .verifyResults(); } @@ -307,6 +413,10 @@ public class MSQInsertTest extends MSQTestBase @Test public void testRollUpOnFoo1WithTimeFunction() { + List expectedRows = expectedFooRows(); + int expectedCounterRows = expectedRows.size(); + long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1); + RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) .add("dim1", ColumnType.STRING) @@ -322,7 +432,32 @@ public class MSQInsertTest extends MSQTestBase .addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt")) .setExpectedRowSignature(rowSignature) .setExpectedSegment(expectedFooSegments()) - .setExpectedResultRows(expectedFooRows()) + .setExpectedResultRows(expectedRows) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedCounterRows).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(expectedArray).frames(expectedArray), + 2, 0, "input0" + ) .verifyResults(); } @@ -410,6 +545,31 @@ public class MSQInsertTest extends MSQTestBase 0 ))) .setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L})) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 2, 0, "input0" + ) .verifyResults(); } @@ -455,6 +615,31 @@ public class MSQInsertTest extends MSQTestBase new Object[]{1466985600000L, "Wikipedia", 1L}, new Object[]{1466985600000L, "Википедия", 1L} )) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 1, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 2, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index e84179a4d27..56ccb0560d0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -22,7 +22,7 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.msq.test.CounterSnapshotBuilder; +import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.segment.column.ColumnType; @@ -105,6 +105,21 @@ public class MSQReplaceTest extends MSQTestBase new Object[]{978480000000L, 6.0f} ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1, 1, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1, 1, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1), + 1, 0, "input0" + ) .verifyResults(); } @@ -134,6 +149,21 @@ public class MSQReplaceTest extends MSQTestBase 0 ))) .setExpectedResultRows(ImmutableList.of(new Object[]{946771200000L, 2.0f})) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 1, 0, "input0" + ) .verifyResults(); } @@ -190,11 +220,25 @@ public class MSQReplaceTest extends MSQTestBase ) ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotBuilder - .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) - .buildChannelCounter(), + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), 0, 0, "input0" ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 1, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1, 1, 1).frames(1, 1, 1), + 1, 0, "shuffle" + ) .verifyResults(); } @@ -242,11 +286,20 @@ public class MSQReplaceTest extends MSQTestBase ) ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotBuilder - .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) - .buildChannelCounter(), + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), 0, 0, "input0" ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(4).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(4).frames(1), + 1, 0, "input0" + ) .verifyResults(); } @@ -300,6 +353,21 @@ public class MSQReplaceTest extends MSQTestBase ) ) .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 1, 0, "input0" + ) .verifyResults(); } @@ -341,6 +409,21 @@ public class MSQReplaceTest extends MSQTestBase SegmentId.of("foo", Intervals.of("2001-01-01T/P1M"), "test", 0) ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3, 3).frames(1, 1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3, 3).frames(1, 1), + 1, 0, "input0" + ) .verifyResults(); } @@ -380,6 +463,21 @@ public class MSQReplaceTest extends MSQTestBase "test", 0 ))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2).frames(1), + 1, 0, "input0" + ) .verifyResults(); } @@ -419,6 +517,21 @@ public class MSQReplaceTest extends MSQTestBase "test", 0 ))) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(2).frames(1), + 1, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index bd4c43d3acb..b871fc4a6e2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -33,7 +33,7 @@ import org.apache.druid.msq.indexing.ColumnMappings; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; -import org.apache.druid.msq.test.CounterSnapshotBuilder; +import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.query.InlineDataSource; @@ -163,6 +163,21 @@ public class MSQSelectTest extends MSQTestBase ) .setQueryContext(context) .setExpectedRowSignature(resultSignature) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) .setExpectedResultRows(ImmutableList.of( new Object[]{1L, !useDefault ? "" : null}, new Object[]{1L, "10.1"}, @@ -207,7 +222,23 @@ public class MSQSelectTest extends MSQTestBase new Object[]{1L, "en"}, new Object[]{1L, "ru"}, new Object[]{1L, "he"} - )).verifyResults(); + )) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 0, "shuffle" + ) + .verifyResults(); } @Test @@ -248,6 +279,21 @@ public class MSQSelectTest extends MSQTestBase .setExpectedRowSignature(rowSignature) .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L})) .setQueryContext(context) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } @@ -306,6 +352,21 @@ public class MSQSelectTest extends MSQTestBase new Object[]{1f, 1L} ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } @@ -345,6 +406,21 @@ public class MSQSelectTest extends MSQTestBase .setExpectedRowSignature(resultSignature) .setExpectedResultRows(ImmutableList.of(new Object[]{6L})) .setQueryContext(context) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } @@ -476,6 +552,21 @@ public class MSQSelectTest extends MSQTestBase .setExpectedRowSignature(resultSignature) .setExpectedResultRows(expectedResults) .setQueryContext(context) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } @@ -594,6 +685,21 @@ public class MSQSelectTest extends MSQTestBase .setExpectedRowSignature(resultSignature) .setExpectedResultRows(expectedResults) .setQueryContext(context) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } @@ -654,7 +760,23 @@ public class MSQSelectTest extends MSQTestBase new Object[]{2f, 2d}, new Object[]{1f, 1d} ) - ).verifyResults(); + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) + .verifyResults(); } @Test @@ -711,7 +833,23 @@ public class MSQSelectTest extends MSQTestBase new Object[]{5f, 5d}, new Object[]{4f, 4d} ) - ).verifyResults(); + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) + .verifyResults(); } @Test @@ -768,7 +906,23 @@ public class MSQSelectTest extends MSQTestBase new Object[]{5f, 5d}, new Object[]{4f, 4d} ) - ).verifyResults(); + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(6).frames(1), + 0, 0, "shuffle" + ) + .verifyResults(); } @Test @@ -838,11 +992,20 @@ public class MSQSelectTest extends MSQTestBase .build() ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotBuilder - .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) - .buildChannelCounter(), + CounterSnapshotMatcher + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1), 0, 0, "input0" ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } @@ -963,6 +1126,21 @@ public class MSQSelectTest extends MSQTestBase new Object[]{1L, "ru"}, new Object[]{1L, "he"} )) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(3).frames(1), + 0, 0, "shuffle" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java similarity index 50% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java index 0d89c0734ff..d61be47bf80 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java @@ -21,11 +21,12 @@ package org.apache.druid.msq.test; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.QueryCounterSnapshot; +import org.junit.Assert; /** * Utility class to build instances of {@link QueryCounterSnapshot} used in tests. */ -public class CounterSnapshotBuilder +public class CounterSnapshotMatcher { private long[] rows; private long[] bytes; @@ -33,43 +34,62 @@ public class CounterSnapshotBuilder private long[] files; private long[] totalFiles; - public static CounterSnapshotBuilder with() + public static CounterSnapshotMatcher with() { - return new CounterSnapshotBuilder(); + return new CounterSnapshotMatcher(); } - public CounterSnapshotBuilder rows(long... rows) + public CounterSnapshotMatcher rows(long... rows) { this.rows = rows; return this; } - public CounterSnapshotBuilder bytes(long... bytes) + public CounterSnapshotMatcher bytes(long... bytes) { this.bytes = bytes; return this; } - public CounterSnapshotBuilder frames(long... frames) + public CounterSnapshotMatcher frames(long... frames) { this.frames = frames; return this; } - public CounterSnapshotBuilder files(long... files) + public CounterSnapshotMatcher files(long... files) { this.files = files; return this; } - public CounterSnapshotBuilder totalFiles(long... totalFiles) + public CounterSnapshotMatcher totalFiles(long... totalFiles) { this.totalFiles = totalFiles; return this; } - public QueryCounterSnapshot buildChannelCounter() + /** + * Asserts that the matcher matches the queryCounterSnapshot parameter. If a parameter in this class is null, the + * match is not checked + */ + public void matchQuerySnapshot(String errorMessageFormat, QueryCounterSnapshot queryCounterSnapshot) { - return new ChannelCounters.Snapshot(rows, bytes, frames, files, totalFiles); + ChannelCounters.Snapshot channelCountersSnapshot = (ChannelCounters.Snapshot) queryCounterSnapshot; + if (rows != null) { + Assert.assertArrayEquals(errorMessageFormat, rows, channelCountersSnapshot.getRows()); + } + if (bytes != null) { + Assert.assertArrayEquals(errorMessageFormat, bytes, channelCountersSnapshot.getBytes()); + } + if (frames != null) { + Assert.assertArrayEquals(errorMessageFormat, frames, channelCountersSnapshot.getFrames()); + } + if (files != null) { + Assert.assertArrayEquals(errorMessageFormat, files, channelCountersSnapshot.getFiles()); + } + if (totalFiles != null) { + Assert.assertArrayEquals(errorMessageFormat, totalFiles, channelCountersSnapshot.getTotalFiles()); + } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 748ec5c3cdb..8277adca272 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -529,6 +529,16 @@ public class MSQTestBase extends BaseCalciteQueryTest } } + /** + * Creates an array of length and containing values decided by the parameters. + */ + protected long[] createExpectedFrameArray(int length, int value) + { + long[] array = new long[length]; + Arrays.fill(array, value); + return array; + } + @Nonnull private Supplier> getSupplierForSegment(SegmentId segmentId) { @@ -788,7 +798,7 @@ public class MSQTestBase extends BaseCalciteQueryTest protected Matcher expectedExecutionErrorMatcher = null; protected MSQFault expectedMSQFault = null; protected Class expectedMSQFaultClass = null; - protected final Map>> + protected final Map>> expectedStageWorkerChannelToCounters = new HashMap<>(); private boolean hasRun = false; @@ -857,7 +867,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } public Builder setExpectedCountersForStageWorkerChannel( - QueryCounterSnapshot counterSnapshot, + CounterSnapshotMatcher counterSnapshot, int stage, int worker, String channel @@ -907,16 +917,16 @@ public class MSQTestBase extends BaseCalciteQueryTest final Map channelToCounters = counters.getMap(); expectedChannelToCounters.forEach( - (channel, counter) -> Assert.assertEquals( - StringUtils.format( - "Counter mismatch for stage [%d], worker [%d], channel [%s]", - stage, - worker, - channel - ), - counter, - channelToCounters.get(channel) - ) + (channel, counter) -> { + String errorMessageFormat = StringUtils.format( + "Counter mismatch for stage [%d], worker [%d], channel [%s]", + stage, + worker, + channel + ); + Assert.assertTrue(channelToCounters.containsKey(channel)); + counter.matchQuerySnapshot(errorMessageFormat, channelToCounters.get(channel)); + } ); }); });