Add assertions for counters from reports (#13726)

Adds assertions for counters to MSQ unit tests
This commit is contained in:
Adarsh Sanjeev 2023-02-08 16:33:37 +05:30 committed by GitHub
parent 34c04daa9f
commit d7a15be9bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 551 additions and 45 deletions

View File

@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.test.CounterSnapshotBuilder;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
@ -87,6 +87,10 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testInsertOnFoo1()
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1);
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
@ -98,7 +102,32 @@ public class MSQInsertTest extends MSQTestBase
.setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedFooRows())
.setExpectedResultRows(expectedRows)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.verifyResults();
}
@ -135,11 +164,30 @@ public class MSQInsertTest extends MSQTestBase
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L}))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
2, 0, "input0"
)
.verifyResults();
}
@ -166,6 +214,10 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testInsertOnFoo1WithTimeFunctionWithSequential()
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1);
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
@ -185,7 +237,32 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedRowSignature(rowSignature)
.setQueryContext(MSQInsertTest.this.context)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedFooRows())
.setExpectedResultRows(expectedRows)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.verifyResults();
}
@ -284,6 +361,10 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testRollUpOnFoo1UpOnFoo1()
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1);
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
@ -299,7 +380,32 @@ public class MSQInsertTest extends MSQTestBase
.addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt"))
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedFooRows())
.setExpectedResultRows(expectedRows)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.verifyResults();
}
@ -307,6 +413,10 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testRollUpOnFoo1WithTimeFunction()
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1);
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
@ -322,7 +432,32 @@ public class MSQInsertTest extends MSQTestBase
.addExpectedAggregatorFactory(new LongSumAggregatorFactory("cnt", "cnt"))
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedFooRows())
.setExpectedResultRows(expectedRows)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedCounterRows).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.verifyResults();
}
@ -410,6 +545,31 @@ public class MSQInsertTest extends MSQTestBase
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L}))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
2, 0, "input0"
)
.verifyResults();
}
@ -455,6 +615,31 @@ public class MSQInsertTest extends MSQTestBase
new Object[]{1466985600000L, "Wikipedia", 1L},
new Object[]{1466985600000L, "Википедия", 1L}
))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
2, 0, "input0"
)
.verifyResults();
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.test.CounterSnapshotBuilder;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
@ -105,6 +105,21 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{978480000000L, 6.0f}
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1, 1, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1, 1, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1),
1, 0, "input0"
)
.verifyResults();
}
@ -134,6 +149,21 @@ public class MSQReplaceTest extends MSQTestBase
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{946771200000L, 2.0f}))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "input0"
)
.verifyResults();
}
@ -190,11 +220,25 @@ public class MSQReplaceTest extends MSQTestBase
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
1, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1, 1, 1).frames(1, 1, 1),
1, 0, "shuffle"
)
.verifyResults();
}
@ -242,11 +286,20 @@ public class MSQReplaceTest extends MSQTestBase
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(4).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(4).frames(1),
1, 0, "input0"
)
.verifyResults();
}
@ -300,6 +353,21 @@ public class MSQReplaceTest extends MSQTestBase
)
)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0)))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
1, 0, "input0"
)
.verifyResults();
}
@ -341,6 +409,21 @@ public class MSQReplaceTest extends MSQTestBase
SegmentId.of("foo", Intervals.of("2001-01-01T/P1M"), "test", 0)
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3, 3).frames(1, 1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3, 3).frames(1, 1),
1, 0, "input0"
)
.verifyResults();
}
@ -380,6 +463,21 @@ public class MSQReplaceTest extends MSQTestBase
"test",
0
)))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2).frames(1),
1, 0, "input0"
)
.verifyResults();
}
@ -419,6 +517,21 @@ public class MSQReplaceTest extends MSQTestBase
"test",
0
)))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(2).frames(1),
1, 0, "input0"
)
.verifyResults();
}

View File

@ -33,7 +33,7 @@ import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.test.CounterSnapshotBuilder;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.query.InlineDataSource;
@ -163,6 +163,21 @@ public class MSQSelectTest extends MSQTestBase
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L, !useDefault ? "" : null},
new Object[]{1L, "10.1"},
@ -207,7 +222,23 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{1L, "en"},
new Object[]{1L, "ru"},
new Object[]{1L, "he"}
)).verifyResults();
))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@Test
@ -248,6 +279,21 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L}))
.setQueryContext(context)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@ -306,6 +352,21 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{1f, 1L}
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@ -345,6 +406,21 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{6L}))
.setQueryContext(context)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@ -476,6 +552,21 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(expectedResults)
.setQueryContext(context)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@ -594,6 +685,21 @@ public class MSQSelectTest extends MSQTestBase
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(expectedResults)
.setQueryContext(context)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@ -654,7 +760,23 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{2f, 2d},
new Object[]{1f, 1d}
)
).verifyResults();
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@Test
@ -711,7 +833,23 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{5f, 5d},
new Object[]{4f, 4d}
)
).verifyResults();
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@Test
@ -768,7 +906,23 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{5f, 5d},
new Object[]{4f, 4d}
)
).verifyResults();
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@Test
@ -838,11 +992,20 @@ public class MSQSelectTest extends MSQTestBase
.build()
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
CounterSnapshotMatcher
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}
@ -963,6 +1126,21 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{1L, "ru"},
new Object[]{1L, "he"}
))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(3).frames(1),
0, 0, "shuffle"
)
.verifyResults();
}

View File

@ -21,11 +21,12 @@ package org.apache.druid.msq.test;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.junit.Assert;
/**
* Utility class to build instances of {@link QueryCounterSnapshot} used in tests.
*/
public class CounterSnapshotBuilder
public class CounterSnapshotMatcher
{
private long[] rows;
private long[] bytes;
@ -33,43 +34,62 @@ public class CounterSnapshotBuilder
private long[] files;
private long[] totalFiles;
public static CounterSnapshotBuilder with()
public static CounterSnapshotMatcher with()
{
return new CounterSnapshotBuilder();
return new CounterSnapshotMatcher();
}
public CounterSnapshotBuilder rows(long... rows)
public CounterSnapshotMatcher rows(long... rows)
{
this.rows = rows;
return this;
}
public CounterSnapshotBuilder bytes(long... bytes)
public CounterSnapshotMatcher bytes(long... bytes)
{
this.bytes = bytes;
return this;
}
public CounterSnapshotBuilder frames(long... frames)
public CounterSnapshotMatcher frames(long... frames)
{
this.frames = frames;
return this;
}
public CounterSnapshotBuilder files(long... files)
public CounterSnapshotMatcher files(long... files)
{
this.files = files;
return this;
}
public CounterSnapshotBuilder totalFiles(long... totalFiles)
public CounterSnapshotMatcher totalFiles(long... totalFiles)
{
this.totalFiles = totalFiles;
return this;
}
public QueryCounterSnapshot buildChannelCounter()
/**
* Asserts that the matcher matches the queryCounterSnapshot parameter. If a parameter in this class is null, the
* match is not checked
*/
public void matchQuerySnapshot(String errorMessageFormat, QueryCounterSnapshot queryCounterSnapshot)
{
return new ChannelCounters.Snapshot(rows, bytes, frames, files, totalFiles);
ChannelCounters.Snapshot channelCountersSnapshot = (ChannelCounters.Snapshot) queryCounterSnapshot;
if (rows != null) {
Assert.assertArrayEquals(errorMessageFormat, rows, channelCountersSnapshot.getRows());
}
if (bytes != null) {
Assert.assertArrayEquals(errorMessageFormat, bytes, channelCountersSnapshot.getBytes());
}
if (frames != null) {
Assert.assertArrayEquals(errorMessageFormat, frames, channelCountersSnapshot.getFrames());
}
if (files != null) {
Assert.assertArrayEquals(errorMessageFormat, files, channelCountersSnapshot.getFiles());
}
if (totalFiles != null) {
Assert.assertArrayEquals(errorMessageFormat, totalFiles, channelCountersSnapshot.getTotalFiles());
}
}
}

View File

@ -529,6 +529,16 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
}
/**
* Creates an array of length and containing values decided by the parameters.
*/
protected long[] createExpectedFrameArray(int length, int value)
{
long[] array = new long[length];
Arrays.fill(array, value);
return array;
}
@Nonnull
private Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId segmentId)
{
@ -788,7 +798,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
protected Class<? extends MSQFault> expectedMSQFaultClass = null;
protected final Map<Integer, Map<Integer, Map<String, QueryCounterSnapshot>>>
protected final Map<Integer, Map<Integer, Map<String, CounterSnapshotMatcher>>>
expectedStageWorkerChannelToCounters = new HashMap<>();
private boolean hasRun = false;
@ -857,7 +867,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
public Builder setExpectedCountersForStageWorkerChannel(
QueryCounterSnapshot counterSnapshot,
CounterSnapshotMatcher counterSnapshot,
int stage,
int worker,
String channel
@ -907,16 +917,16 @@ public class MSQTestBase extends BaseCalciteQueryTest
final Map<String, QueryCounterSnapshot> channelToCounters = counters.getMap();
expectedChannelToCounters.forEach(
(channel, counter) -> Assert.assertEquals(
StringUtils.format(
(channel, counter) -> {
String errorMessageFormat = StringUtils.format(
"Counter mismatch for stage [%d], worker [%d], channel [%s]",
stage,
worker,
channel
),
counter,
channelToCounters.get(channel)
)
);
Assert.assertTrue(channelToCounters.containsKey(channel));
counter.matchQuerySnapshot(errorMessageFormat, channelToCounters.get(channel));
}
);
});
});