more tests for LimitedBufferHashGrouper (#11654)

* more tests for LimitedBufferHashGrouper

* fix style
This commit is contained in:
Clint Wylie 2021-09-08 16:31:34 -07:00 committed by GitHub
parent fe1d8c206a
commit bbb86c8731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 215 additions and 42 deletions

View File

@ -127,7 +127,7 @@ public class GrouperBufferComparatorUtils
Grouper.BufferComparator[] dimComparators,
boolean includeTimestamp,
boolean sortByDimsFirst,
int keyBufferTotalSize
int keySize
)
{
int dimCount = dimensions.size();
@ -150,7 +150,7 @@ public class GrouperBufferComparatorUtils
final StringComparator stringComparator = orderSpec.getDimensionComparator();
final ValueType valueType = aggregatorFactories[aggIndex].getType();
// Aggregators start after dimensions
final int aggOffset = keyBufferTotalSize + aggregatorOffsets[aggIndex];
final int aggOffset = keySize + aggregatorOffsets[aggIndex];
aggCount++;
@ -158,10 +158,12 @@ public class GrouperBufferComparatorUtils
throw new IAE("Cannot order by a non-numeric aggregator[%s]", orderSpec);
}
comparators.add(makeNullHandlingBufferComparatorForNumericData(
aggOffset,
makeNumericBufferComparator(valueType, aggOffset, true, stringComparator)
));
comparators.add(
makeNullHandlingBufferComparatorForNumericData(
aggOffset,
makeNumericBufferComparator(valueType, aggOffset, true, stringComparator)
)
);
needsReverses.add(needsReverse);
}
}

View File

@ -30,12 +30,7 @@ public class IntKeySerde implements Grouper.KeySerde<Integer>
{
public static final Grouper.KeySerde<Integer> INSTANCE = new IntKeySerde();
private IntKeySerde()
{
// No instantiation
}
private static final Grouper.BufferComparator KEY_COMPARATOR = new Grouper.BufferComparator()
public static final Grouper.BufferComparator KEY_COMPARATOR = new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)

View File

@ -27,8 +27,14 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
@ -41,21 +47,22 @@ import java.util.List;
public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
{
static final int LIMIT = 100;
static final int KEY_BASE = 100000;
static final int NUM_ROWS = 1000;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testLimitAndBufferSwapping()
{
final int limit = 100;
final int keyBase = 100000;
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 20000, 2, limit);
final int numRows = 1000;
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 20000);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < numRows; i++) {
Assert.assertTrue(String.valueOf(i + keyBase), grouper.aggregate(i + keyBase).isOk());
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
}
if (NullHandling.replaceWithDefault()) {
// bucket size is hash(int) + key(int) + aggs(2 longs) + heap offset(int) = 28 bytes
@ -94,7 +101,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// Since these keys are smaller, they will evict the previous 100 top entries
// First 100 of these new rows will be the expected results.
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < numRows; i++) {
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
}
@ -122,7 +129,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
Assert.assertEquals(100, grouper.getLimit());
final List<Grouper.Entry<Integer>> expected = new ArrayList<>();
for (int i = 0; i < limit; i++) {
for (int i = 0; i < LIMIT; i++) {
expected.add(new Grouper.Entry<>(i, new Object[]{11L, 1L}));
}
@ -137,21 +144,18 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
expectedException.expect(IAE.class);
expectedException.expectMessage("LimitedBufferHashGrouper initialized with insufficient buffer capacity");
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
makeGrouper(columnSelectorFactory, 10, 2, 100);
makeGrouper(columnSelectorFactory, 10);
}
@Test
public void testMinBufferSize()
{
final int limit = 100;
final int keyBase = 100000;
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 12120, 2, limit);
final int numRows = 1000;
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 12120);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < numRows; i++) {
Assert.assertTrue(String.valueOf(i + keyBase), grouper.aggregate(i + keyBase).isOk());
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
}
// With minimum buffer size, after the first swap, every new key added will result in a swap
@ -172,7 +176,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// Since these keys are smaller, they will evict the previous 100 top entries
// First 100 of these new rows will be the expected results.
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < numRows; i++) {
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
}
if (NullHandling.replaceWithDefault()) {
@ -189,7 +193,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
Assert.assertEquals(100, grouper.getLimit());
final List<Grouper.Entry<Integer>> expected = new ArrayList<>();
for (int i = 0; i < limit; i++) {
for (int i = 0; i < LIMIT; i++) {
expected.add(new Grouper.Entry<>(i, new Object[]{11L, 1L}));
}
@ -204,28 +208,115 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("attempted to add offset after grouper was iterated");
final int limit = 100;
final int keyBase = 100000;
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 12120, 2, limit);
final int numRows = 1000;
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 12120);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < numRows; i++) {
Assert.assertTrue(String.valueOf(i + keyBase), grouper.aggregate(i + keyBase).isOk());
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(limit, iterated.size());
Assert.assertEquals(LIMIT, iterated.size());
// an attempt to aggregate with a new key will explode after the grouper has been iterated
grouper.aggregate(keyBase + numRows + 1);
grouper.aggregate(KEY_BASE + NUM_ROWS + 1);
}
@Test
public void testIteratorOrderByDim()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"value",
OrderByColumnSpec.Direction.ASCENDING
);
for (int i = 0; i < NUM_ROWS; i++) {
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys and values both descending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", NUM_ROWS - i + KEY_BASE)));
Assert.assertTrue(String.valueOf(NUM_ROWS - i + KEY_BASE), grouper.aggregate(NUM_ROWS - i + KEY_BASE).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals(KEY_BASE + i + 1L, iterated.get(i).getValues()[0]);
}
}
@Test
public void testIteratorOrderByDimDesc()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"value",
OrderByColumnSpec.Direction.DESCENDING
);
for (int i = 0; i < NUM_ROWS; i++) {
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys and values both ascending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", i + 1)));
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals((long) NUM_ROWS - i, iterated.get(i).getValues()[0]);
}
}
@Test
public void testIteratorOrderByAggs()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"valueSum",
OrderByColumnSpec.Direction.ASCENDING
);
for (int i = 0; i < NUM_ROWS; i++) {
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys and values both descending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", NUM_ROWS - i)));
Assert.assertTrue(String.valueOf(NUM_ROWS - i + KEY_BASE), grouper.aggregate(NUM_ROWS - i + KEY_BASE).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals(i + 1L, iterated.get(i).getValues()[0]);
}
}
@Test
public void testIteratorOrderByAggsDesc()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"valueSum",
OrderByColumnSpec.Direction.DESCENDING
);
for (int i = 0; i < NUM_ROWS; i++) {
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys descending and values asending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", i + 1)));
Assert.assertTrue(String.valueOf(NUM_ROWS - i + KEY_BASE), grouper.aggregate(NUM_ROWS - i + KEY_BASE).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals((long) NUM_ROWS - i, iterated.get(i).getValues()[0]);
}
}
private static LimitedBufferHashGrouper<Integer> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
int initialBuckets,
int limit
int bufferSize
)
{
LimitedBufferHashGrouper<Integer> grouper = new LimitedBufferHashGrouper<>(
@ -240,12 +331,97 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
),
Integer.MAX_VALUE,
0.5f,
initialBuckets,
limit,
2,
LIMIT,
false
);
grouper.init();
return grouper;
}
private static LimitedBufferHashGrouper<Integer> makeGrouperWithOrderBy(
TestColumnSelectorFactory columnSelectorFactory,
String orderByColumn,
OrderByColumnSpec.Direction direction
)
{
final StringComparator stringComparator = "value".equals(orderByColumn)
? StringComparators.LEXICOGRAPHIC
: StringComparators.NUMERIC;
final DefaultLimitSpec orderBy = DefaultLimitSpec.builder()
.orderBy(
new OrderByColumnSpec(
orderByColumn,
direction,
stringComparator
)
)
.limit(LIMIT)
.build();
LimitedBufferHashGrouper<Integer> grouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(12120)),
new GroupByIshKeySerde(orderBy),
AggregatorAdapters.factorizeBuffered(
columnSelectorFactory,
ImmutableList.of(
new LongSumAggregatorFactory("valueSum", "value"),
new CountAggregatorFactory("count")
)
),
Integer.MAX_VALUE,
0.5f,
2,
LIMIT,
!orderBy.getColumns().get(0).getDimension().equals("value")
);
grouper.init();
return grouper;
}
/**
* key serde for more realistic ordering tests, similar to the {@link GroupByQueryEngineV2.GroupByEngineKeySerde} or
* {@link RowBasedGrouperHelper.RowBasedKeySerde} which are likely to be used in practice by the group-by engine,
* which also both use {@link GrouperBufferComparatorUtils} to make comparators
*/
private static class GroupByIshKeySerde extends IntKeySerde
{
private final DefaultLimitSpec orderBy;
public GroupByIshKeySerde(DefaultLimitSpec orderBy)
{
this.orderBy = orderBy;
}
@Override
public Grouper.BufferComparator bufferComparator()
{
return GrouperBufferComparatorUtils.bufferComparator(
false,
false,
1,
new Grouper.BufferComparator[] {KEY_COMPARATOR}
);
}
@Override
public Grouper.BufferComparator bufferComparatorWithAggregators(
AggregatorFactory[] aggregatorFactories,
int[] aggregatorOffsets
)
{
return GrouperBufferComparatorUtils.bufferComparatorWithAggregators(
aggregatorFactories,
aggregatorOffsets,
orderBy,
ImmutableList.of(DefaultDimensionSpec.of("value")),
new Grouper.BufferComparator[] {KEY_COMPARATOR},
false,
false,
Integer.BYTES
);
}
}
}