GroupBy: Reduce allocations by reusing entry and key holders. (#12474)

* GroupBy: Reduce allocations by reusing entry and key holders.

Two main changes:

1) Reuse Entry objects returned by various implementations of
   Grouper.iterator.

2) Reuse key objects contained within those Entry objects.

This is allowed by the contract, which states that entries must be
processed and immediately discarded. However, not all call sites
respected this, so this patch also updates those call sites.

One particularly sneaky way that the old code retained entries too long
is due to Guava's MergingIterator and CombiningIterator. Internally,
these both advance to the next value prior to returning the current
value. So, this patch addresses that in two ways:

1) For merging, we have our own implementation MergeIterator already,
   although it had the same problem. So, this patch updates our
   implementation to return the current item prior to advancing to the
   next item. It also adds a forbidden-api entry to ensure that this
   safer implementation is used instead of Guava's.

2) For combining, we address the problem in a different way: by copying
   the key when creating the new, combined entry.

* Attempt to fix test.

* Remove unused import.
This commit is contained in:
Gian Merlino 2022-04-28 23:21:13 -07:00 committed by GitHub
parent 42fa5c26e1
commit 529b983ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 1076 additions and 536 deletions

View File

@ -5,6 +5,7 @@ com.fasterxml.jackson.databind.ObjectMapper#writeValue(com.fasterxml.jackson.cor
com.fasterxml.jackson.core.JsonGenerator#writeObject(java.lang.Object) @ Use JacksonUtils#writeObjectUsingSerializerProvider to allow SerializerProvider reuse
com.google.common.base.Charsets @ Use java.nio.charset.StandardCharsets instead
com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator()
com.google.common.collect.Iterators#mergeSorted(java.lang.Iterable,java.util.Comparator) @ Use org.apache.druid.java.util.common.collect.Utils#mergeSorted()
com.google.common.collect.Lists#newArrayList() @ Create java.util.ArrayList directly
com.google.common.collect.Lists#newLinkedList() @ Use ArrayList or ArrayDeque instead
com.google.common.collect.Lists#newLinkedList(java.lang.Iterable) @ Use ArrayList or ArrayDeque instead

View File

@ -43,14 +43,13 @@ public class CombiningIterable<T> implements Iterable<T>
* @return An Iterable that is the merge of all Iterables from in such that there is only one instance of
* equivalent objects.
*/
@SuppressWarnings("unchecked")
public static <T> CombiningIterable<T> createSplatted(
Iterable<? extends Iterable<T>> in,
Comparator<T> comparator
)
{
return create(
new MergeIterable<>(comparator, (Iterable<Iterable<T>>) in),
new MergeIterable<>(in, comparator),
comparator,
GuavaUtils::firstNonNull
);

View File

@ -21,6 +21,7 @@ package org.apache.druid.java.util.common;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -52,7 +53,7 @@ public class CloseableIterators
final Closer closer = Closer.create();
iterators.forEach(closer::register);
final Iterator<T> innerIterator = Iterators.mergeSorted(iterators, comparator);
final Iterator<T> innerIterator = Utils.mergeSorted(iterators, comparator);
return wrap(innerIterator, closer);
}

View File

@ -20,9 +20,12 @@
package org.apache.druid.java.util.common.collect;
import org.apache.druid.java.util.common.guava.MergeIterator;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -89,4 +92,20 @@ public class Utils
return o.getClass().getName();
}
}
/**
* Like Guava's {@link com.google.common.collect.Iterators#mergeSorted(Iterable, Comparator)}, but avoids
* calling next() on any iterator prior to returning the value returned by the previous call to next(). This is
* important when merging iterators that reuse container objects across calls to next().
*
* If the Iterators are {@link org.apache.druid.java.util.common.parsers.CloseableIterator}, use
* {@link org.apache.druid.java.util.common.CloseableIterators#mergeSorted} instead.
*/
public static <T> Iterator<T> mergeSorted(
final Iterable<? extends Iterator<? extends T>> sortedIterators,
final Comparator<? super T> comparator
)
{
return new MergeIterator<>(sortedIterators, comparator);
}
}

View File

@ -28,12 +28,12 @@ import java.util.List;
*/
public class MergeIterable<T> implements Iterable<T>
{
private final Comparator<T> comparator;
private final Iterable<Iterable<T>> baseIterables;
private final Comparator<? super T> comparator;
private final Iterable<? extends Iterable<? extends T>> baseIterables;
public MergeIterable(
Comparator<T> comparator,
Iterable<Iterable<T>> baseIterables
Iterable<? extends Iterable<? extends T>> baseIterables,
Comparator<? super T> comparator
)
{
this.comparator = comparator;
@ -43,11 +43,11 @@ public class MergeIterable<T> implements Iterable<T>
@Override
public Iterator<T> iterator()
{
List<Iterator<T>> iterators = new ArrayList<>();
for (Iterable<T> baseIterable : baseIterables) {
List<Iterator<? extends T>> iterators = new ArrayList<>();
for (Iterable<? extends T> baseIterable : baseIterables) {
iterators.add(baseIterable.iterator());
}
return new MergeIterator<>(comparator, iterators);
return new MergeIterator<>(iterators, comparator);
}
}

View File

@ -24,34 +24,36 @@ import com.google.common.collect.PeekingIterator;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
/**
*/
* Iterator that merges a collection of sorted iterators using a comparator.
*
* Similar to Guava's MergingIterator, but avoids calling next() on any iterator prior to returning the value
* returned by the previous call to next(). This is important when merging iterators that reuse container objects
* across calls to next().
*
* Used by {@link org.apache.druid.java.util.common.collect.Utils#mergeSorted(Iterable, Comparator)}.
*/
public class MergeIterator<T> implements Iterator<T>
{
private static final int PRIORITY_QUEUE_INITIAL_CAPACITY = 16;
private final PriorityQueue<PeekingIterator<T>> pQueue;
private PeekingIterator<T> currentIterator = null;
public MergeIterator(
final Comparator<T> comparator,
List<Iterator<T>> iterators
final Iterable<? extends Iterator<? extends T>> sortedIterators,
final Comparator<? super T> comparator
)
{
pQueue = new PriorityQueue<>(
16,
new Comparator<PeekingIterator<T>>()
{
@Override
public int compare(PeekingIterator<T> lhs, PeekingIterator<T> rhs)
{
return comparator.compare(lhs.peek(), rhs.peek());
}
}
PRIORITY_QUEUE_INITIAL_CAPACITY,
(lhs, rhs) -> comparator.compare(lhs.peek(), rhs.peek())
);
for (Iterator<T> iterator : iterators) {
for (final Iterator<? extends T> iterator : sortedIterators) {
final PeekingIterator<T> iter = Iterators.peekingIterator(iterator);
if (iter != null && iter.hasNext()) {
@ -64,7 +66,7 @@ public class MergeIterator<T> implements Iterator<T>
@Override
public boolean hasNext()
{
return !pQueue.isEmpty();
return !pQueue.isEmpty() || (currentIterator != null && currentIterator.hasNext());
}
@Override
@ -74,13 +76,19 @@ public class MergeIterator<T> implements Iterator<T>
throw new NoSuchElementException();
}
if (currentIterator != null) {
if (currentIterator.hasNext()) {
pQueue.add(currentIterator);
}
currentIterator = null;
}
PeekingIterator<T> retIt = pQueue.remove();
T retVal = retIt.next();
if (retIt.hasNext()) {
pQueue.add(retIt);
}
// Save currentIterator and add it back later, to avoid calling next() prior to returning the current value.
currentIterator = retIt;
return retVal;
}

View File

@ -21,7 +21,7 @@ package org.apache.druid.java.util.common.guava;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
@ -34,12 +34,12 @@ public class MergeIteratorTest
public void testSanity()
{
MergeIterator<Integer> iter = new MergeIterator<>(
Ordering.natural(),
Lists.newArrayList(
Arrays.asList(1, 3, 5, 7, 9).iterator(),
Arrays.asList(2, 8).iterator(),
Arrays.asList(4, 6, 8).iterator()
)
),
Ordering.natural()
);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9), Lists.newArrayList(iter));
@ -49,12 +49,12 @@ public class MergeIteratorTest
public void testScrewsUpOnOutOfOrder()
{
MergeIterator<Integer> iter = new MergeIterator<>(
Ordering.natural(),
Lists.newArrayList(
Arrays.asList(1, 3, 5, 4, 7, 9).iterator(),
Arrays.asList(2, 8).iterator(),
Arrays.asList(4, 6).iterator()
)
),
Ordering.natural()
);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 4, 6, 7, 8, 9), Lists.newArrayList(iter));

View File

@ -31,6 +31,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.BufferHashGrouper;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil;
import org.apache.druid.query.groupby.epinephelinae.IntKey;
import org.apache.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
import org.junit.Assert;
import org.junit.Test;
@ -39,13 +40,13 @@ import java.nio.ByteBuffer;
public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
{
private static BufferHashGrouper<Integer> makeGrouper(
private static BufferHashGrouper<IntKey> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
int initialBuckets
)
{
final BufferHashGrouper<Integer> grouper = new BufferHashGrouper<>(
final BufferHashGrouper<IntKey> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(),
AggregatorAdapters.factorizeBuffered(
@ -68,7 +69,7 @@ public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
public void testGrowingBufferGrouper()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = makeGrouper(columnSelectorFactory, 100000, 2);
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 100000, 2);
try {
final int expectedMaxSize = 5;
@ -79,14 +80,14 @@ public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
updateSketch.update(3);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues();

View File

@ -138,10 +138,10 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
try {
return new MergeIterable<>(
ordering.nullsFirst(),
QueryContexts.hasTimeout(query) ?
future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) :
future.get()
future.get(),
ordering.nullsFirst()
).iterator();
}
catch (InterruptedException e) {

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorAdapters;
@ -76,8 +77,8 @@ public abstract class AbstractBufferHashGrouper<KeyType> implements Grouper<KeyT
/**
* Called to check if it's possible to skip aggregation for a row.
*
* @param bucketOffset Offset of the bucket containing this row's entry in the hash table,
* within the buffer returned by hashTable.getTableBuffer()
* @param bucketOffset Offset of the bucket containing this row's entry in the hash table,
* within the buffer returned by hashTable.getTableBuffer()
*
* @return true if aggregation can be skipped, false otherwise.
*/
@ -171,15 +172,29 @@ public abstract class AbstractBufferHashGrouper<KeyType> implements Grouper<KeyT
aggregators.close();
}
protected Entry<KeyType> bucketEntryForOffset(final int bucketOffset)
/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
protected Entry<KeyType> populateBucketEntryForOffset(
final ReusableEntry<KeyType> reusableEntry,
final int bucketOffset
)
{
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
final KeyType key = keySerde.fromByteBuffer(tableBuffer, bucketOffset + HASH_SIZE);
final Object[] values = new Object[aggregators.size()];
for (int i = 0; i < aggregators.size(); i++) {
values[i] = aggregators.get(tableBuffer, bucketOffset + baseAggregatorOffset, i);
keySerde.readFromByteBuffer(reusableEntry.getKey(), tableBuffer, bucketOffset + HASH_SIZE);
if (reusableEntry.getValues().length != aggregators.size()) {
throw new ISE(
"Expected entry with [%d] values but got [%d]",
aggregators.size(),
reusableEntry.getValues().length
);
}
return new Entry<>(key, values);
for (int i = 0; i < aggregators.size(); i++) {
reusableEntry.getValues()[i] = aggregators.get(tableBuffer, bucketOffset + baseAggregatorOffset, i);
}
return reusableEntry;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import javax.annotation.Nullable;
@ -283,11 +284,14 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public CloseableIterator<Entry<Memory>> iterator()
public CloseableIterator<Entry<MemoryPointer>> iterator()
{
final CloseableIterator<Entry<Integer>> iterator = iterator(false);
final CloseableIterator<Entry<IntKey>> iterator = iterator(false);
final WritableMemory keyMemory = WritableMemory.allocate(Integer.BYTES);
return new CloseableIterator<Entry<Memory>>()
final MemoryPointer reusableKey = new MemoryPointer(keyMemory, 0);
final ReusableEntry<MemoryPointer> reusableEntry = new ReusableEntry<>(reusableKey, new Object[aggregators.size()]);
return new CloseableIterator<Entry<MemoryPointer>>()
{
@Override
public boolean hasNext()
@ -296,11 +300,12 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public Entry<Memory> next()
public Entry<MemoryPointer> next()
{
final Entry<Integer> integerEntry = iterator.next();
keyMemory.putInt(0, integerEntry.getKey());
return new Entry<>(keyMemory, integerEntry.getValues());
final Entry<IntKey> integerEntry = iterator.next();
keyMemory.putInt(0, integerEntry.getKey().intValue());
reusableEntry.setValues(integerEntry.getValues());
return reusableEntry;
}
@Override
@ -312,14 +317,17 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public CloseableIterator<Entry<Integer>> iterator(boolean sorted)
public CloseableIterator<Entry<IntKey>> iterator(boolean sorted)
{
if (sorted) {
throw new UnsupportedOperationException("sorted iterator is not supported yet");
}
return new CloseableIterator<Entry<Integer>>()
return new CloseableIterator<Entry<IntKey>>()
{
final ReusableEntry<IntKey> reusableEntry =
new ReusableEntry<>(new IntKey(0), new Object[aggregators.size()]);
// initialize to the first used slot
private int next = findNext(-1);
@ -330,7 +338,7 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
}
@Override
public Entry<Integer> next()
public Entry<IntKey> next()
{
if (next < 0) {
throw new NoSuchElementException();
@ -339,14 +347,14 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
final int current = next;
next = findNext(current);
final Object[] values = new Object[aggregators.size()];
final int recordOffset = current * recordSize;
for (int i = 0; i < aggregators.size(); i++) {
values[i] = aggregators.get(valBuffer, recordOffset, i);
reusableEntry.getValues()[i] = aggregators.get(valBuffer, recordOffset, i);
}
// shift by -1 since values are initially shifted by +1 so they are all positive and
// GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE is -1
return new Entry<>(current - 1, values);
reusableEntry.getKey().setValue(current - 1);
return reusableEntry;
}
@Override

View File

@ -220,6 +220,8 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
return new CloseableIterator<Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry = ReusableEntry.create(keySerde, aggregators.size());
int curr = 0;
final int size = getSize();
@ -235,7 +237,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
if (curr >= size) {
throw new NoSuchElementException();
}
return bucketEntryForOffset(wrappedOffsets.get(curr++));
return populateBucketEntryForOffset(reusableEntry, wrappedOffsets.get(curr++));
}
@Override
@ -254,6 +256,8 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// Unsorted iterator
return new CloseableIterator<Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry = ReusableEntry.create(keySerde, aggregators.size());
int curr = 0;
final int size = getSize();
@ -270,7 +274,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
throw new NoSuchElementException();
}
final int offset = offsetList.get(curr);
final Entry<KeyType> entry = bucketEntryForOffset(offset);
final Entry<KeyType> entry = populateBucketEntryForOffset(reusableEntry, offset);
curr++;
return entry;

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.collections.CombiningIterator;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.CloseableIterators;
@ -316,21 +317,59 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
parallelSortAndGetGroupersIterator() :
getGroupersIterator(sorted);
// Parallel combine is used only when data is spilled. This is because ConcurrentGrouper uses two different modes
// depending on data is spilled or not. If data is not spilled, all inputs are completely aggregated and no more
// aggregation is required.
if (sorted && spilling && parallelCombiner != null) {
// First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same
// merged dictionary is used for all combining threads
final List<String> dictionary = tryMergeDictionary();
if (dictionary != null) {
return parallelCombiner.combine(sortedIterators, dictionary);
}
}
if (sorted) {
final boolean fullyCombined = !spilling;
return sorted ?
CloseableIterators.mergeSorted(sortedIterators, keyObjComparator) :
CloseableIterators.concat(sortedIterators);
// Parallel combine is used only when data is not fully merged.
if (!fullyCombined && parallelCombiner != null) {
// First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same
// merged dictionary is used for all combining threads. Otherwise, fall back to single-threaded merge.
final List<String> dictionary = tryMergeDictionary();
if (dictionary != null) {
// Parallel combiner both merges and combines. Return its result directly.
return parallelCombiner.combine(sortedIterators, dictionary);
}
}
// Single-threaded merge. Still needs to be combined.
final CloseableIterator<Entry<KeyType>> mergedIterator =
CloseableIterators.mergeSorted(sortedIterators, keyObjComparator);
if (fullyCombined) {
return mergedIterator;
} else {
final ReusableEntry<KeyType> reusableEntry =
ReusableEntry.create(keySerdeFactory.factorize(), aggregatorFactories.length);
return CloseableIterators.wrap(
new CombiningIterator<>(
mergedIterator,
keyObjComparator,
(entry1, entry2) -> {
if (entry2 == null) {
// Copy key and value because we cannot retain the originals. They may be updated in-place after
// this method returns.
reusableEntry.setKey(keySerdeFactory.copyKey(entry1.getKey()));
System.arraycopy(entry1.getValues(), 0, reusableEntry.getValues(), 0, entry1.getValues().length);
} else {
for (int i = 0; i < aggregatorFactories.length; i++) {
reusableEntry.getValues()[i] = aggregatorFactories[i].combine(
reusableEntry.getValues()[i],
entry2.getValues()[i]
);
}
}
return reusableEntry;
}
),
mergedIterator
);
}
} else {
// Cannot fully combine if the caller did not request a sorted iterator. Concat and return.
return CloseableIterators.concat(sortedIterators);
}
}
private boolean isParallelizable()

View File

@ -851,7 +851,7 @@ public class GroupByQueryEngineV2
}
}
private static class ArrayAggregateIterator extends GroupByEngineIterator<Integer>
private static class ArrayAggregateIterator extends GroupByEngineIterator<IntKey>
{
private final int cardinality;
@ -895,13 +895,13 @@ public class GroupByQueryEngineV2
}
@Override
protected void aggregateSingleValueDims(Grouper<Integer> grouper)
protected void aggregateSingleValueDims(Grouper<IntKey> grouper)
{
aggregateSingleValueDims((IntGrouper) grouper);
}
@Override
protected void aggregateMultiValueDims(Grouper<Integer> grouper)
protected void aggregateMultiValueDims(Grouper<IntKey> grouper)
{
aggregateMultiValueDims((IntGrouper) grouper);
}
@ -971,11 +971,12 @@ public class GroupByQueryEngineV2
}
@Override
protected void putToRow(Integer key, ResultRow resultRow)
protected void putToRow(IntKey key, ResultRow resultRow)
{
final int intKey = key.intValue();
if (dim != null) {
if (key != GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE) {
resultRow.set(dim.getResultRowPosition(), ((DimensionSelector) dim.getSelector()).lookupName(key));
if (intKey != GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE) {
resultRow.set(dim.getResultRowPosition(), ((DimensionSelector) dim.getSelector()).lookupName(intKey));
} else {
resultRow.set(dim.getResultRowPosition(), NullHandling.defaultStringValue());
}
@ -1019,6 +1020,12 @@ public class GroupByQueryEngineV2
return ImmutableList.of();
}
@Override
public ByteBuffer createKey()
{
return ByteBuffer.allocate(keySize);
}
@Override
public ByteBuffer toByteBuffer(ByteBuffer key)
{
@ -1026,11 +1033,14 @@ public class GroupByQueryEngineV2
}
@Override
public ByteBuffer fromByteBuffer(ByteBuffer buffer, int position)
public void readFromByteBuffer(ByteBuffer dstBuffer, ByteBuffer srcBuffer, int position)
{
final ByteBuffer dup = buffer.duplicate();
dup.position(position).limit(position + keySize);
return dup.slice();
dstBuffer.limit(keySize);
dstBuffer.position(0);
for (int i = 0; i < keySize; i++) {
dstBuffer.put(i, srcBuffer.get(position + i));
}
}
@Override

View File

@ -19,8 +19,6 @@
package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -28,7 +26,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.function.ToIntFunction;
@ -122,69 +119,11 @@ public interface Grouper<KeyType> extends Closeable
*/
CloseableIterator<Entry<KeyType>> iterator(boolean sorted);
class Entry<T>
interface Entry<T>
{
final T key;
final Object[] values;
T getKey();
@JsonCreator
public Entry(
@JsonProperty("k") T key,
@JsonProperty("v") Object[] values
)
{
this.key = key;
this.values = values;
}
@JsonProperty("k")
public T getKey()
{
return key;
}
@JsonProperty("v")
public Object[] getValues()
{
return values;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Entry<?> entry = (Entry<?>) o;
if (!key.equals(entry.key)) {
return false;
}
// Probably incorrect - comparing Object[] arrays with Arrays.equals
return Arrays.equals(values, entry.values);
}
@Override
public int hashCode()
{
int result = key.hashCode();
result = 31 * result + Arrays.hashCode(values);
return result;
}
@Override
public String toString()
{
return "Entry{" +
"key=" + key +
", values=" + Arrays.toString(values) +
'}';
}
Object[] getValues();
}
interface KeySerdeFactory<T>
@ -206,6 +145,12 @@ public interface Grouper<KeyType> extends Closeable
*/
KeySerde<T> factorizeWithDictionary(List<String> dictionary);
/**
* Copies a key. Required if the key from an {@link Entry} from {@link #iterator} will be retained past the
* following call to next().
*/
T copyKey(T key);
/**
* Return an object that knows how to compare two serialized key instances. Will be called by the
* {@link #iterator(boolean)} method if sorting is enabled.
@ -253,15 +198,19 @@ public interface Grouper<KeyType> extends Closeable
@Nullable
ByteBuffer toByteBuffer(T key);
/**
* Create a reusable key that can be passed to {@link #readFromByteBuffer}.
*/
T createKey();
/**
* Deserialize a key from a buffer. Will be called by the {@link #iterator(boolean)} method.
*
* @param key object from {@link #createKey()}
* @param buffer buffer containing the key
* @param position key start position in the buffer
*
* @return key object
*/
T fromByteBuffer(ByteBuffer buffer, int position);
void readFromByteBuffer(T key, ByteBuffer buffer, int position);
/**
* Return an object that knows how to compare two serialized keys. Will be called by the
@ -276,15 +225,15 @@ public interface Grouper<KeyType> extends Closeable
* using the bufferComparator.
*
* @param aggregatorFactories Array of aggregators from a GroupByQuery
* @param aggregatorOffsets Offsets for each aggregator in aggregatorFactories pointing to their location
* within the grouping key + aggs buffer.
* @param aggregatorOffsets Offsets for each aggregator in aggregatorFactories pointing to their location
* within the grouping key + aggs buffer.
*
* @return comparator for keys + aggs
*/
BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] aggregatorFactories, int[] aggregatorOffsets);
/**
* Reset the keySerde to its initial state. After this method is called, {@link #fromByteBuffer(ByteBuffer, int)}
* Reset the keySerde to its initial state. After this method is called, {@link #readFromByteBuffer}
* and {@link #bufferComparator()} may no longer work properly on previously-serialized keys.
*/
void reset();

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.groupby.epinephelinae.collection.HashTableUtils;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryOpenHashTable;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -207,7 +208,7 @@ public class HashVectorGrouper implements VectorGrouper
}
@Override
public CloseableIterator<Grouper.Entry<Memory>> iterator()
public CloseableIterator<Grouper.Entry<MemoryPointer>> iterator()
{
if (!initialized) {
// it's possible for iterator() to be called before initialization when
@ -217,8 +218,11 @@ public class HashVectorGrouper implements VectorGrouper
final IntIterator baseIterator = hashTable.bucketIterator();
return new CloseableIterator<Grouper.Entry<Memory>>()
return new CloseableIterator<Grouper.Entry<MemoryPointer>>()
{
final MemoryPointer reusableKey = new MemoryPointer();
final ReusableEntry<MemoryPointer> reusableEntry = new ReusableEntry<>(reusableKey, new Object[aggregators.size()]);
@Override
public boolean hasNext()
{
@ -226,23 +230,19 @@ public class HashVectorGrouper implements VectorGrouper
}
@Override
public Grouper.Entry<Memory> next()
public Grouper.Entry<MemoryPointer> next()
{
final int bucket = baseIterator.nextInt();
final int bucketPosition = hashTable.bucketMemoryPosition(bucket);
final Memory keyMemory = hashTable.memory().region(
bucketPosition + hashTable.bucketKeyOffset(),
hashTable.keySize()
);
reusableKey.set(hashTable.memory(), bucketPosition + hashTable.bucketKeyOffset());
final Object[] values = new Object[aggregators.size()];
final int aggregatorsOffset = bucketPosition + hashTable.bucketValueOffset();
for (int i = 0; i < aggregators.size(); i++) {
values[i] = aggregators.get(hashTable.memory().getByteBuffer(), aggregatorsOffset, i);
reusableEntry.getValues()[i] = aggregators.get(hashTable.memory().getByteBuffer(), aggregatorsOffset, i);
}
return new Grouper.Entry<>(keyMemory, values);
return reusableEntry;
}
@Override

View File

@ -26,7 +26,7 @@ import java.util.function.ToIntFunction;
/**
* {@link Grouper} specialized for the primitive int type
*/
public interface IntGrouper extends Grouper<Integer>
public interface IntGrouper extends Grouper<IntKey>
{
default AggregateResult aggregate(int key)
{
@ -42,7 +42,7 @@ public interface IntGrouper extends Grouper<Integer>
*/
@Deprecated
@Override
default AggregateResult aggregate(Integer key)
default AggregateResult aggregate(IntKey key)
{
Preconditions.checkNotNull(key);
return aggregate(key.intValue());
@ -55,7 +55,7 @@ public interface IntGrouper extends Grouper<Integer>
*/
@Deprecated
@Override
default AggregateResult aggregate(Integer key, int keyHash)
default AggregateResult aggregate(IntKey key, int keyHash)
{
Preconditions.checkNotNull(key);
return aggregateKeyHash(keyHash);
@ -64,10 +64,10 @@ public interface IntGrouper extends Grouper<Integer>
@Override
IntGrouperHashFunction hashFunction();
interface IntGrouperHashFunction extends ToIntFunction<Integer>
interface IntGrouperHashFunction extends ToIntFunction<IntKey>
{
@Override
default int applyAsInt(Integer value)
default int applyAsInt(IntKey value)
{
return apply(value.intValue());
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
public class IntKey
{
private int intValue;
@JsonCreator
public IntKey(final int intValue)
{
this.intValue = intValue;
}
@JsonValue
public int intValue()
{
return intValue;
}
public void setValue(final int intValue)
{
this.intValue = intValue;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IntKey intKey = (IntKey) o;
return intValue == intKey.intValue;
}
@Override
public int hashCode()
{
return intValue;
}
@Override
public String toString()
{
return "IntKey{" +
"intValue=" + intValue +
'}';
}
}

View File

@ -294,6 +294,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
return new CloseableIterator<Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry = ReusableEntry.create(keySerde, aggregators.size());
int curr = 0;
@Override
@ -305,7 +306,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
@Override
public Grouper.Entry<KeyType> next()
{
return bucketEntryForOffset(wrappedOffsets.get(curr++));
return populateBucketEntryForOffset(reusableEntry, wrappedOffsets.get(curr++));
}
@Override
@ -332,6 +333,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
offsetHeapIterableSize = initialHeapSize;
return new CloseableIterator<Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry = ReusableEntry.create(keySerde, aggregators.size());
int curr = 0;
@Override
@ -347,7 +349,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
throw new NoSuchElementException();
}
final int offset = offsetHeap.removeMin();
final Grouper.Entry<KeyType> entry = bucketEntryForOffset(offset);
final Grouper.Entry<KeyType> entry = populateBucketEntryForOffset(reusableEntry, offset);
curr++;
// write out offset to end of heap, which is no longer used after removing min
offsetHeap.setAt(initialHeapSize - curr, offset);
@ -371,6 +373,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
// subsequent iterations just walk the buffer backwards
return new CloseableIterator<Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry = ReusableEntry.create(keySerde, aggregators.size());
int curr = offsetHeapIterableSize - 1;
@Override
@ -386,7 +389,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
throw new NoSuchElementException();
}
final int offset = offsetHeap.getAt(curr);
final Grouper.Entry<KeyType> entry = bucketEntryForOffset(offset);
final Grouper.Entry<KeyType> entry = populateBucketEntryForOffset(reusableEntry, offset);
curr--;
return entry;

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
@ -119,6 +120,12 @@ public class LimitedTemporaryStorage implements Closeable
return maxBytesUsed;
}
@VisibleForTesting
long currentSize()
{
return bytesUsed.get();
}
@Override
public void close()
{

View File

@ -414,8 +414,8 @@ public class ParallelCombiner<KeyType>
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();
settableColumnSelectorFactory.set(next.values);
grouper.aggregate(next.key); // grouper always returns ok or throws an exception
settableColumnSelectorFactory.set(next.getValues());
grouper.aggregate(next.getKey()); // grouper always returns ok or throws an exception
settableColumnSelectorFactory.set(null);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.Arrays;
/**
* A {@link Grouper.Entry} implementation that is reusable. {@link Grouper#iterator} allows reuse of Entries, and
* so this class helps implementations taking advantage of that.
*/
public class ReusableEntry<KeyType> implements Grouper.Entry<KeyType>
{
private KeyType key;
@Nullable
private Object[] values;
@JsonCreator
public ReusableEntry(
@JsonProperty("k") @Nullable final KeyType key,
@JsonProperty("v") @Nullable final Object[] values
)
{
this.key = key;
this.values = values;
}
/**
* Create a new instance based on a particular {@link Grouper.KeySerde} and a particular Object values size.
*/
public static <T> ReusableEntry<T> create(final Grouper.KeySerde<T> keySerde, final int numValues)
{
return new ReusableEntry<>(keySerde.createKey(), new Object[numValues]);
}
/**
* Returns the key, which enables modifying the key.
*/
@Override
@JsonProperty("k")
public KeyType getKey()
{
return key;
}
/**
* Returns the values array, which enables setting individual values.
*/
@Override
@JsonProperty("v")
public Object[] getValues()
{
return values;
}
/**
* Replaces the key completely.
*/
public void setKey(final KeyType key)
{
this.key = key;
}
/**
* Replaces the values array completely.
*/
public void setValues(final Object[] values)
{
this.values = values;
}
@Override
public String toString()
{
return "ReusableEntry{" +
"key=" + key +
", values=" + Arrays.toString(values) +
'}';
}
}

View File

@ -873,6 +873,15 @@ public class RowBasedGrouperHelper
);
}
@Override
public RowBasedKey copyKey(RowBasedKey key)
{
final int keyLength = key.getKey().length;
final Object[] keyCopy = new Object[keyLength];
System.arraycopy(key.getKey(), 0, keyCopy, 0, keyLength);
return new RowBasedKey(keyCopy);
}
@Override
public Comparator<Grouper.Entry<RowBasedKey>> objectComparator(boolean forceDefaultOrder)
{
@ -1285,29 +1294,30 @@ public class RowBasedGrouperHelper
}
@Override
public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position)
public RowBasedKey createKey()
{
return new RowBasedKey(new Object[includeTimestamp ? dimCount + 1 : dimCount]);
}
@Override
public void readFromByteBuffer(final RowBasedKey key, final ByteBuffer buffer, final int position)
{
final int dimStart;
final Comparable[] key;
final int dimsPosition;
if (includeTimestamp) {
key = new Comparable[dimCount + 1];
key[0] = buffer.getLong(position);
key.getKey()[0] = buffer.getLong(position);
dimsPosition = position + Long.BYTES;
dimStart = 1;
} else {
key = new Comparable[dimCount];
dimsPosition = position;
dimStart = 0;
}
for (int i = dimStart; i < key.length; i++) {
for (int i = dimStart; i < key.getKey().length; i++) {
// Writes value from buffer to key[i]
serdeHelpers[i - dimStart].getFromByteBuffer(buffer, dimsPosition, i, key);
serdeHelpers[i - dimStart].getFromByteBuffer(buffer, dimsPosition, i, key.getKey());
}
return new RowBasedKey(key);
}
@Override
@ -1532,7 +1542,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
dimValues[dimValIdx] = listDictionary.get(buffer.getInt(initialOffset + keyBufferPosition));
}
@ -1584,7 +1594,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
dimValues[dimValIdx] = arrayDictionary.get(buffer.getInt(initialOffset + keyBufferPosition));
}
@ -1644,7 +1654,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
dimValues[dimValIdx] = dictionary.get(buffer.getInt(initialOffset + keyBufferPosition));
}
@ -1683,6 +1693,7 @@ public class RowBasedGrouperHelper
* this returns -1.
*
* @param s a string
*
* @return id for this string, or -1
*/
private int addToDictionary(final String s)
@ -1761,7 +1772,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
dimValues[dimValIdx] = buffer.getLong(initialOffset + keyBufferPosition);
}
@ -1806,7 +1817,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
dimValues[dimValIdx] = buffer.getFloat(initialOffset + keyBufferPosition);
}
@ -1851,7 +1862,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
dimValues[dimValIdx] = buffer.getDouble(initialOffset + keyBufferPosition);
}
@ -1903,7 +1914,7 @@ public class RowBasedGrouperHelper
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues)
{
if (buffer.get(initialOffset + keyBufferPosition) == NullHandling.IS_NULL_BYTE) {
dimValues[dimValIdx] = null;

View File

@ -57,7 +57,7 @@ interface RowBasedKeySerdeHelper
* @param dimValIdx Index within dimValues to store the value read from the buffer
* @param dimValues Output array containing grouping key values for a row
*/
void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues);
void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Object[] dimValues);
/**
* Return a {@link BufferComparator} to compare keys stored in ByteBuffer.

View File

@ -256,16 +256,20 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
final Closer closer = Closer.create();
for (final File file : files) {
final MappingIterator<Entry<KeyType>> fileIterator = read(file, keySerde.keyClazz());
iterators.add(
CloseableIterators.withEmptyBaggage(
Iterators.transform(
fileIterator,
new Function<Entry<KeyType>, Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry =
ReusableEntry.create(keySerde, aggregatorFactories.length);
@Override
public Entry<KeyType> apply(Entry<KeyType> entry)
{
final Object[] deserializedValues = new Object[entry.getValues().length];
final Object[] deserializedValues = reusableEntry.getValues();
for (int i = 0; i < deserializedValues.length; i++) {
deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]);
if (deserializedValues[i] instanceof Integer) {
@ -273,7 +277,8 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
deserializedValues[i] = ((Integer) deserializedValues[i]).longValue();
}
}
return new Entry<>(entry.getKey(), deserializedValues);
reusableEntry.setKey(entry.getKey());
return reusableEntry;
}
}
)
@ -327,7 +332,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
try {
return spillMapper.readValues(
spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(file))),
spillMapper.getTypeFactory().constructParametricType(Entry.class, keyClazz)
spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, keyClazz)
);
}
catch (IOException e) {

View File

@ -384,6 +384,8 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
return new CloseableIterator<Entry<KeyType>>()
{
final ReusableEntry<KeyType> reusableEntry = ReusableEntry.create(keySerde, aggregators.length);
{
// Wait for some data to be ready and initialize nextReadIndex.
increaseReadIndexTo(0);
@ -429,11 +431,10 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
// - an index of the array slot where the aggregation for the corresponding grouping key is done
// - an index of the array slot which is not read yet
final int recordOffset = recordSize * nextReadIndex;
final KeyType key = keySerde.fromByteBuffer(buffer, recordOffset);
keySerde.readFromByteBuffer(reusableEntry.getKey(), buffer, recordOffset);
final Object[] values = new Object[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
values[i] = aggregators[i].get(buffer, recordOffset + aggregatorOffsets[i]);
reusableEntry.getValues()[i] = aggregators[i].get(buffer, recordOffset + aggregatorOffsets[i]);
}
final int targetIndex = nextReadIndex == maxNumSlots - 1 ? 0 : nextReadIndex + 1;
@ -441,7 +442,7 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
// nextReadIndex.
increaseReadIndexTo(targetIndex);
return new Entry<>(key, values);
return reusableEntry;
}
/**

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.groupby.epinephelinae;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import java.io.Closeable;
@ -62,7 +63,7 @@ public interface VectorGrouper extends Closeable
void close();
/**
* Iterate through entry buckets. Each bucket's key is a {@link Memory} object in native byte order.
* Iterate through entry buckets. Each bucket's key is a {@link MemoryPointer} object in native byte order.
* <p>
* After you are done with the iterator returned by this method, you should either call {@link #close()} (if you are
* done with the VectorGrouper) or {@link #reset()} (if you want to reuse it).
@ -71,5 +72,5 @@ public interface VectorGrouper extends Closeable
*
* @return entry iterator
*/
CloseableIterator<Grouper.Entry<Memory>> iterator();
CloseableIterator<Grouper.Entry<MemoryPointer>> iterator();
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae.collection;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
/**
* Reusable pointer to a location in {@link Memory}. Allows returning slices of memory without using
* {@link Memory#region}, which leads to allocations.
*/
public class MemoryPointer
{
@Nullable
private Memory memory;
private long position;
public MemoryPointer()
{
this(null, 0);
}
public MemoryPointer(@Nullable Memory memory, long position)
{
this.memory = memory;
this.position = position;
}
public Memory memory()
{
if (memory == null) {
throw new ISE("Memory not set");
}
return memory;
}
public long position()
{
return position;
}
public void set(final Memory memory, final long position)
{
this.memory = memory;
this.position = position;
}
}

View File

@ -20,11 +20,11 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorObjectSelector;
import java.util.ArrayList;
@ -92,13 +92,13 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final int id = keyMemory.getInt(keyOffset);
final int id = keyMemory.memory().getInt(keyMemory.position() + keyOffset);
// GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
if (id != GROUP_BY_MISSING_VALUE) {
final String value = dictionary.get(id);

View File

@ -19,9 +19,9 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorValueSelector;
public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -63,13 +63,13 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset));
resultRow.set(resultRowPosition, keyMemory.memory().getDouble(keyMemory.position() + keyOffset));
}
@Override

View File

@ -19,9 +19,9 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorValueSelector;
public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -63,13 +63,13 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset));
resultRow.set(resultRowPosition, keyMemory.memory().getFloat(keyMemory.position() + keyOffset));
}
@Override

View File

@ -19,9 +19,9 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
/**
* Column processor for groupBy dimensions.
@ -64,11 +64,12 @@ public interface GroupByVectorColumnSelector
*
* @param keyMemory key memory
* @param keyOffset starting position for this key part within keyMemory
* (starting from {@link MemoryPointer#position})
* @param resultRow result row to receive key parts
* @param resultRowPosition position within the result row for this key part
*/
void writeKeyToResultRow(
Memory keyMemory,
MemoryPointer keyMemory,
int keyOffset,
ResultRow resultRow,
int resultRowPosition

View File

@ -19,9 +19,9 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorValueSelector;
public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -63,13 +63,13 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset));
resultRow.set(resultRowPosition, keyMemory.memory().getLong(keyMemory.position() + keyOffset));
}
@Override

View File

@ -19,9 +19,9 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
/**
* Treats all rows as null.
@ -49,7 +49,7 @@ public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelect
}
@Override
public void writeKeyToResultRow(Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition)
public void writeKeyToResultRow(MemoryPointer keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition)
{
resultRow.set(resultRowPosition, null);
}

View File

@ -19,10 +19,10 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorValueSelector;
public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -69,16 +69,16 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
if (keyMemory.getByte(keyOffset) == NullHandling.IS_NULL_BYTE) {
if (keyMemory.memory().getByte(keyMemory.position() + keyOffset) == NullHandling.IS_NULL_BYTE) {
resultRow.set(resultRowPosition, null);
} else {
resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset + 1));
resultRow.set(resultRowPosition, keyMemory.memory().getDouble(keyMemory.position() + keyOffset + 1));
}
}

View File

@ -19,10 +19,10 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorValueSelector;
public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -69,16 +69,16 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
if (keyMemory.getByte(keyOffset) == NullHandling.IS_NULL_BYTE) {
if (keyMemory.memory().getByte(keyMemory.position() + keyOffset) == NullHandling.IS_NULL_BYTE) {
resultRow.set(resultRowPosition, null);
} else {
resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset + 1));
resultRow.set(resultRowPosition, keyMemory.memory().getFloat(keyMemory.position() + keyOffset + 1));
}
}

View File

@ -19,10 +19,10 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.VectorValueSelector;
public class NullableLongGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -69,16 +69,16 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
if (keyMemory.getByte(keyOffset) == NullHandling.IS_NULL_BYTE) {
if (keyMemory.memory().getByte(keyMemory.position() + keyOffset) == NullHandling.IS_NULL_BYTE) {
resultRow.set(resultRowPosition, null);
} else {
resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset + 1));
resultRow.set(resultRowPosition, keyMemory.memory().getLong(keyMemory.position() + keyOffset + 1));
}
}

View File

@ -19,9 +19,9 @@
package org.apache.druid.query.groupby.epinephelinae.vector;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
public class SingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector
@ -63,13 +63,13 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect
@Override
public void writeKeyToResultRow(
final Memory keyMemory,
final MemoryPointer keyMemory,
final int keyOffset,
final ResultRow resultRow,
final int resultRowPosition
)
{
final int id = keyMemory.getInt(keyOffset);
final int id = keyMemory.memory().getInt(keyMemory.position() + keyOffset);
resultRow.set(resultRowPosition, selector.lookupName(id));
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.groupby.epinephelinae.vector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
@ -41,6 +40,7 @@ import org.apache.druid.query.groupby.epinephelinae.CloseableGrouperIterator;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnProcessors;
@ -252,7 +252,7 @@ public class VectorGroupByEngine
private long selectorInternalFootprint = 0;
@Nullable
private CloseableGrouperIterator<Memory, ResultRow> delegate = null;
private CloseableGrouperIterator<MemoryPointer, ResultRow> delegate = null;
VectorGroupByEngineIterator(
final GroupByQuery query,
@ -374,7 +374,7 @@ public class VectorGroupByEngine
return grouper;
}
private CloseableGrouperIterator<Memory, ResultRow> initNewDelegate()
private CloseableGrouperIterator<MemoryPointer, ResultRow> initNewDelegate()
{
// Method must not be called unless there's a current bucketInterval.
assert bucketInterval != null;

View File

@ -34,8 +34,8 @@ import java.util.stream.IntStream;
* "greater" than the previous, in terms of {@link TimeAndDimsPointer#compareTo}). Equivalent points from different
* input iterators are _not_ deduplicated.
*
* Conceptually MergingRowIterator is an equivalent to {@link com.google.common.collect.Iterators#mergeSorted}, but for
* {@link RowIterator}s rather than simple {@link java.util.Iterator}s.
* Conceptually MergingRowIterator is an equivalent to {@link org.apache.druid.java.util.common.guava.MergeIterator},
* but for {@link RowIterator}s rather than simple {@link java.util.Iterator}s.
*
* Implementation detail: this class uses binary heap priority queue algorithm to sort pointers, but it also memoizes
* and keeps some extra info along the heap slots (see javadoc of {@link #equalToChild} field), that is essential for

View File

@ -22,9 +22,6 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.StringUtils;
@ -38,7 +35,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class BufferArrayGrouperTest
@ -63,17 +59,15 @@ public class BufferArrayGrouperTest
grouper.aggregate(12);
grouper.aggregate(6);
final List<Entry<Integer>> expected = ImmutableList.of(
new Grouper.Entry<>(6, new Object[]{30L, 3L}),
new Grouper.Entry<>(10, new Object[]{10L, 1L}),
new Grouper.Entry<>(12, new Object[]{20L, 2L})
final List<Entry<IntKey>> expected = ImmutableList.of(
new ReusableEntry<>(new IntKey(6), new Object[]{30L, 3L}),
new ReusableEntry<>(new IntKey(10), new Object[]{10L, 1L}),
new ReusableEntry<>(new IntKey(12), new Object[]{20L, 2L})
);
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(false));
Assert.assertEquals(
expected,
Ordering.from((Comparator<Entry<Integer>>) (o1, o2) -> Ints.compare(o1.getKey(), o2.getKey()))
.sortedCopy(unsortedEntries)
GrouperTestUtil.assertEntriesEquals(
expected.iterator(),
grouper.iterator(false)
);
}

View File

@ -22,9 +22,6 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.query.aggregation.AggregatorAdapters;
@ -53,7 +50,7 @@ public class BufferHashGrouperTest
public void testSimple()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = new BufferHashGrouper<>(
final Grouper<IntKey> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(1000)),
GrouperTestUtil.intKeySerde(),
AggregatorAdapters.factorizeBuffered(
@ -71,34 +68,28 @@ public class BufferHashGrouperTest
grouper.init();
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(12);
grouper.aggregate(6);
grouper.aggregate(10);
grouper.aggregate(6);
grouper.aggregate(12);
grouper.aggregate(12);
grouper.aggregate(new IntKey(12));
grouper.aggregate(new IntKey(6));
grouper.aggregate(new IntKey(10));
grouper.aggregate(new IntKey(6));
grouper.aggregate(new IntKey(12));
grouper.aggregate(new IntKey(12));
final List<Grouper.Entry<Integer>> expected = ImmutableList.of(
new Grouper.Entry<>(6, new Object[]{20L, 2L}),
new Grouper.Entry<>(10, new Object[]{10L, 1L}),
new Grouper.Entry<>(12, new Object[]{30L, 3L})
final List<Grouper.Entry<IntKey>> expected = ImmutableList.of(
new ReusableEntry<>(new IntKey(6), new Object[]{20L, 2L}),
new ReusableEntry<>(new IntKey(10), new Object[]{10L, 1L}),
new ReusableEntry<>(new IntKey(12), new Object[]{30L, 3L})
);
final List<Grouper.Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(false));
final List<Grouper.Entry<Integer>> sortedEntries = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(expected, sortedEntries);
Assert.assertEquals(
expected,
Ordering.from(
new Comparator<Grouper.Entry<Integer>>()
{
@Override
public int compare(Grouper.Entry<Integer> o1, Grouper.Entry<Integer> o2)
{
return Ints.compare(o1.getKey(), o2.getKey());
}
}
).sortedCopy(unsortedEntries)
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
GrouperTestUtil.assertEntriesEquals(
expected.iterator(),
GrouperTestUtil.sortedEntries(
grouper.iterator(false) /* unsorted entries */,
k -> new IntKey(k.intValue()),
Comparator.comparing(IntKey::intValue)
).iterator()
);
}
@ -106,28 +97,28 @@ public class BufferHashGrouperTest
public void testGrowing()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = makeGrouper(columnSelectorFactory, 10000, 2, 0.75f);
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 10000, 2, 0.75f);
final int expectedMaxSize = NullHandling.replaceWithDefault() ? 219 : 210;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
// Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
final List<Grouper.Entry<Integer>> expected = new ArrayList<>();
final List<Grouper.Entry<IntKey>> expected = new ArrayList<>();
for (int i = 0; i < expectedMaxSize; i++) {
expected.add(new Grouper.Entry<>(i, new Object[]{21L, 2L}));
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L}));
}
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
}
@Test
@ -138,14 +129,14 @@ public class BufferHashGrouperTest
if (NullHandling.replaceWithDefault()) {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
// the buffer size below is chosen to test integer overflow in ByteBufferHashTable.adjustTableWhenFull().
final Grouper<Integer> grouper = makeGrouper(columnSelectorFactory, 1_900_000_000, 2, 0.3f);
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 1_900_000_000, 2, 0.3f);
final int expectedMaxSize = 15323979;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
}
}
@ -153,31 +144,31 @@ public class BufferHashGrouperTest
public void testNoGrowing()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = makeGrouper(columnSelectorFactory, 10000, Integer.MAX_VALUE, 0.75f);
final Grouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 10000, Integer.MAX_VALUE, 0.75f);
final int expectedMaxSize = NullHandling.replaceWithDefault() ? 267 : 258;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
// Aggregate slightly different row
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < expectedMaxSize; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
Assert.assertFalse(grouper.aggregate(expectedMaxSize).isOk());
Assert.assertFalse(grouper.aggregate(new IntKey(expectedMaxSize)).isOk());
final List<Grouper.Entry<Integer>> expected = new ArrayList<>();
final List<Grouper.Entry<IntKey>> expected = new ArrayList<>();
for (int i = 0; i < expectedMaxSize; i++) {
expected.add(new Grouper.Entry<>(i, new Object[]{21L, 2L}));
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{21L, 2L}));
}
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
GrouperTestUtil.assertEntriesEquals(expected.iterator(), grouper.iterator(true));
}
private BufferHashGrouper<Integer> makeGrouper(
private BufferHashGrouper<IntKey> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize,
int initialBuckets,
@ -186,7 +177,7 @@ public class BufferHashGrouperTest
{
final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
final BufferHashGrouper<Integer> grouper = new BufferHashGrouper<>(
final BufferHashGrouper<IntKey> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(buffer),
GrouperTestUtil.intKeySerde(),
AggregatorAdapters.factorizeBuffered(

View File

@ -19,13 +19,16 @@
package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -39,8 +42,10 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.junit.AfterClass;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -61,36 +66,61 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@RunWith(Parameterized.class)
public class ConcurrentGrouperTest
public class ConcurrentGrouperTest extends InitializedNullHandlingTest
{
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(256);
private static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final KeySerdeFactory<LongKey> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final ColumnSelectorFactory NULL_FACTORY = new TestColumnSelectorFactory();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private Supplier<ByteBuffer> bufferSupplier;
private final Supplier<ByteBuffer> bufferSupplier;
private final int concurrencyHint;
private final int parallelCombineThreads;
private final ExecutorService exec = Executors.newFixedThreadPool(8);
private final Closer closer = Closer.create();
@Parameters(name = "bufferSize={0}")
@Parameters(name = "bufferSize={0}, concurrencyHint={1}, parallelCombineThreads={2}")
public static Collection<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{1024 * 32},
new Object[]{1024 * 1024}
);
final List<Object[]> constructors = new ArrayList<>();
for (final int bufferSize : new int[]{1024, 1024 * 32, 1024 * 1024}) {
for (final int concurrencyHint : new int[]{1, 8}) {
for (final int parallelCombineThreads : new int[]{0, 8}) {
if (parallelCombineThreads <= concurrencyHint) {
constructors.add(new Object[]{bufferSize, concurrencyHint, parallelCombineThreads});
}
}
}
}
return constructors;
}
@AfterClass
public static void teardown()
@Before
public void setUp()
{
SERVICE.shutdown();
TEST_RESOURCE_HOLDER.taken = false;
}
public ConcurrentGrouperTest(int bufferSize)
@After
public void tearDown() throws IOException
{
bufferSupplier = new Supplier<ByteBuffer>()
exec.shutdownNow();
closer.close();
}
public ConcurrentGrouperTest(
int bufferSize,
int concurrencyHint,
int parallelCombineThreads
)
{
this.concurrencyHint = concurrencyHint;
this.parallelCombineThreads = parallelCombineThreads;
this.bufferSupplier = new Supplier<ByteBuffer>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
private ByteBuffer buffer;
@ -110,7 +140,81 @@ public class ConcurrentGrouperTest
@Test()
public void testAggregate() throws InterruptedException, ExecutionException, IOException
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryFolder.newFolder(),
1024 * 1024
);
final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
temporaryStorage,
new DefaultObjectMapper(),
concurrencyHint,
null,
false,
MoreExecutors.listeningDecorator(exec),
0,
false,
0,
4,
parallelCombineThreads
);
closer.register(grouper);
grouper.init();
final int numRows = 1000;
Future<?>[] futures = new Future[concurrencyHint];
for (int i = 0; i < concurrencyHint; i++) {
futures[i] = exec.submit(() -> {
for (long j = 0; j < numRows; j++) {
if (!grouper.aggregate(new LongKey(j)).isOk()) {
throw new ISE("Grouper is full");
}
}
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
final List<Entry<LongKey>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {
expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) concurrencyHint}));
}
final CloseableIterator<Entry<LongKey>> iterator = closer.register(grouper.iterator(true));
if (parallelCombineThreads > 1 && temporaryStorage.currentSize() > 0) {
// Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly
// configured, or due to spilling).
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
} else {
Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
}
@Test
public void testGrouperTimeout() throws Exception
{
if (concurrencyHint <= 1) {
// Can't parallel sort. Timeout is only applied during parallel sorting, so this test is not useful. Skip it.
return;
}
final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
@ -122,94 +226,28 @@ public class ConcurrentGrouperTest
1,
new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
new DefaultObjectMapper(),
8,
concurrencyHint,
null,
false,
MoreExecutors.listeningDecorator(SERVICE),
0,
false,
0,
4,
8
);
grouper.init();
final int numRows = 1000;
Future<?>[] futures = new Future[8];
for (int i = 0; i < 8; i++) {
futures[i] = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
for (long i = 0; i < numRows; i++) {
grouper.aggregate(i);
}
}
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
final CloseableIterator<Entry<Long>> iterator = grouper.iterator(true);
final List<Entry<Long>> actual = Lists.newArrayList(iterator);
iterator.close();
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
final List<Entry<Long>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {
expected.add(new Entry<>(i, new Object[]{8L}));
}
Assert.assertEquals(expected, actual);
grouper.close();
}
@Test
public void testGrouperTimeout() throws Exception
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
new DefaultObjectMapper(),
8,
null,
false,
MoreExecutors.listeningDecorator(SERVICE),
MoreExecutors.listeningDecorator(exec),
0,
true,
1,
4,
8
parallelCombineThreads
);
closer.register(grouper);
grouper.init();
final int numRows = 1000;
Future<?>[] futures = new Future[8];
Future<?>[] futures = new Future[concurrencyHint];
for (int i = 0; i < 8; i++) {
futures[i] = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
for (long i = 0; i < numRows; i++) {
grouper.aggregate(i);
for (int i = 0; i < concurrencyHint; i++) {
futures[i] = exec.submit(() -> {
for (long j = 0; j < numRows; j++) {
if (!grouper.aggregate(new LongKey(j)).isOk()) {
throw new ISE("Grouper is full");
}
}
});
@ -218,14 +256,13 @@ public class ConcurrentGrouperTest
for (Future eachFuture : futures) {
eachFuture.get();
}
try {
grouper.iterator(true);
}
catch (RuntimeException e) {
Assert.assertTrue(e instanceof QueryTimeoutException);
Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode());
}
grouper.close();
final QueryTimeoutException e = Assert.assertThrows(
QueryTimeoutException.class,
() -> closer.register(grouper.iterator(true))
);
Assert.assertEquals("Query timeout", e.getErrorCode());
}
static class TestResourceHolder extends ReferenceCountingResourceHolder<ByteBuffer>
@ -245,7 +282,56 @@ public class ConcurrentGrouperTest
}
}
static class TestKeySerdeFactory implements KeySerdeFactory<Long>
static class LongKey
{
private long longValue;
@JsonCreator
public LongKey(final long longValue)
{
this.longValue = longValue;
}
@JsonValue
public long longValue()
{
return longValue;
}
public void setValue(final long longValue)
{
this.longValue = longValue;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongKey longKey = (LongKey) o;
return longValue == longKey.longValue;
}
@Override
public int hashCode()
{
return Long.hashCode(longValue);
}
@Override
public String toString()
{
return "LongKey{" +
"longValue=" + longValue +
'}';
}
}
static class TestKeySerdeFactory implements KeySerdeFactory<LongKey>
{
@Override
public long getMaxDictionarySize()
@ -254,9 +340,9 @@ public class ConcurrentGrouperTest
}
@Override
public KeySerde<Long> factorize()
public KeySerde<LongKey> factorize()
{
return new KeySerde<Long>()
return new KeySerde<LongKey>()
{
final ByteBuffer buffer = ByteBuffer.allocate(8);
@ -267,9 +353,9 @@ public class ConcurrentGrouperTest
}
@Override
public Class<Long> keyClazz()
public Class<LongKey> keyClazz()
{
return Long.class;
return LongKey.class;
}
@Override
@ -279,18 +365,24 @@ public class ConcurrentGrouperTest
}
@Override
public ByteBuffer toByteBuffer(Long key)
public ByteBuffer toByteBuffer(LongKey key)
{
buffer.rewind();
buffer.putLong(key);
buffer.putLong(key.longValue());
buffer.position(0);
return buffer;
}
@Override
public Long fromByteBuffer(ByteBuffer buffer, int position)
public LongKey createKey()
{
return buffer.getLong(position);
return new LongKey(0);
}
@Override
public void readFromByteBuffer(LongKey key, ByteBuffer buffer, int position)
{
key.setValue(buffer.getLong(position));
}
@Override
@ -323,22 +415,21 @@ public class ConcurrentGrouperTest
}
@Override
public KeySerde<Long> factorizeWithDictionary(List<String> dictionary)
public KeySerde<LongKey> factorizeWithDictionary(List<String> dictionary)
{
return factorize();
}
@Override
public Comparator<Grouper.Entry<Long>> objectComparator(boolean forceDefaultOrder)
public LongKey copyKey(LongKey key)
{
return new Comparator<Grouper.Entry<Long>>()
{
@Override
public int compare(Grouper.Entry<Long> o1, Grouper.Entry<Long> o2)
{
return o1.getKey().compareTo(o2.getKey());
}
};
return new LongKey(key.longValue());
}
@Override
public Comparator<Grouper.Entry<LongKey>> objectComparator(boolean forceDefaultOrder)
{
return Comparator.comparingLong(o -> o.getKey().longValue());
}
}

View File

@ -19,6 +19,16 @@
package org.apache.druid.query.groupby.epinephelinae;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
public class GrouperTestUtil
{
private GrouperTestUtil()
@ -26,7 +36,7 @@ public class GrouperTestUtil
// No instantiation
}
public static Grouper.KeySerde<Integer> intKeySerde()
public static Grouper.KeySerde<IntKey> intKeySerde()
{
return IntKeySerde.INSTANCE;
}
@ -35,4 +45,68 @@ public class GrouperTestUtil
{
return new TestColumnSelectorFactory();
}
public static <T> List<Grouper.Entry<T>> sortedEntries(
final Iterator<Grouper.Entry<T>> entryIterator,
final Function<T, T> keyCopyFn,
final Comparator<T> keyComparator
)
{
final List<Grouper.Entry<T>> retVal = new ArrayList<>();
while (entryIterator.hasNext()) {
final Grouper.Entry<T> entry = entryIterator.next();
final Object[] valuesCopy = new Object[entry.getValues().length];
System.arraycopy(entry.getValues(), 0, valuesCopy, 0, entry.getValues().length);
final ReusableEntry<T> entryCopy = new ReusableEntry<>(keyCopyFn.apply(entry.getKey()), valuesCopy);
retVal.add(entryCopy);
}
retVal.sort(Comparator.comparing(Grouper.Entry::getKey, keyComparator));
return retVal;
}
public static <T> void assertEntriesEquals(
final Iterator<Grouper.Entry<T>> expectedEntries,
final Iterator<Grouper.Entry<T>> actualEntries
)
{
int i = 0;
while (expectedEntries.hasNext() && actualEntries.hasNext()) {
assertEntriesEqual(StringUtils.format("entry [%d]", i), expectedEntries.next(), actualEntries.next());
i++;
}
if (expectedEntries.hasNext()) {
Assert.fail(StringUtils.format("expected additional entry [%,d]", i));
}
if (actualEntries.hasNext()) {
Assert.fail(StringUtils.format("encountered too many entries [%,d]", i));
}
}
public static <T> void assertEntriesEqual(
final Grouper.Entry<T> expectedEntry,
final Grouper.Entry<T> actualEntry
)
{
assertEntriesEqual(null, expectedEntry, actualEntry);
}
public static <T> void assertEntriesEqual(
@Nullable final String message,
final Grouper.Entry<T> expectedEntry,
final Grouper.Entry<T> actualEntry
)
{
Assert.assertEquals(StringUtils.format("%s: key", message), expectedEntry.getKey(), actualEntry.getKey());
Assert.assertArrayEquals(
StringUtils.format("%s: values", message),
expectedEntry.getValues(),
actualEntry.getValues()
);
}
}

View File

@ -26,9 +26,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import java.nio.ByteBuffer;
import java.util.List;
public class IntKeySerde implements Grouper.KeySerde<Integer>
public class IntKeySerde implements Grouper.KeySerde<IntKey>
{
public static final Grouper.KeySerde<Integer> INSTANCE = new IntKeySerde();
public static final Grouper.KeySerde<IntKey> INSTANCE = new IntKeySerde();
public static final Grouper.BufferComparator KEY_COMPARATOR = new Grouper.BufferComparator()
{
@ -48,9 +48,9 @@ public class IntKeySerde implements Grouper.KeySerde<Integer>
}
@Override
public Class<Integer> keyClazz()
public Class<IntKey> keyClazz()
{
return Integer.class;
return IntKey.class;
}
@Override
@ -60,17 +60,23 @@ public class IntKeySerde implements Grouper.KeySerde<Integer>
}
@Override
public ByteBuffer toByteBuffer(Integer key)
public ByteBuffer toByteBuffer(IntKey key)
{
buf.putInt(0, key);
buf.putInt(0, key.intValue());
buf.position(0);
return buf;
}
@Override
public Integer fromByteBuffer(ByteBuffer buffer, int position)
public IntKey createKey()
{
return buffer.getInt(position);
return new IntKey(0);
}
@Override
public void readFromByteBuffer(IntKey key, ByteBuffer buffer, int position)
{
key.setValue(buffer.getInt(position));
}
@Override

View File

@ -26,6 +26,8 @@ import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -43,6 +45,8 @@ import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
@ -58,11 +62,11 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
public void testLimitAndBufferSwapping()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 20000);
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 20000);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new IntKey(i + KEY_BASE)).isOk());
}
if (NullHandling.replaceWithDefault()) {
// bucket size is hash(int) + key(int) + aggs(2 longs) + heap offset(int) = 28 bytes
@ -102,7 +106,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// First 100 of these new rows will be the expected results.
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
if (NullHandling.replaceWithDefault()) {
@ -128,14 +132,14 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
Assert.assertEquals(100, grouper.getLimit());
final List<Grouper.Entry<Integer>> expected = new ArrayList<>();
final List<Pair<Integer, List<Object>>> expected = new ArrayList<>();
for (int i = 0; i < LIMIT; i++) {
expected.add(new Grouper.Entry<>(i, new Object[]{11L, 1L}));
expected.add(Pair.of(i, ImmutableList.of(11L, 1L)));
}
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
Assert.assertEquals(expected, entriesToList(grouper.iterator(true)));
// iterate again, even though the min-max offset heap has been destroyed, it is replaced with a reverse sorted array
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
Assert.assertEquals(expected, entriesToList(grouper.iterator(true)));
}
@Test
@ -151,11 +155,11 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
public void testMinBufferSize()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 12120);
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 12120);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new IntKey(i + KEY_BASE)).isOk());
}
// With minimum buffer size, after the first swap, every new key added will result in a swap
@ -177,7 +181,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// First 100 of these new rows will be the expected results.
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
Assert.assertTrue(String.valueOf(i), grouper.aggregate(new IntKey(i)).isOk());
}
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(474, grouper.getGrowthCount());
@ -192,14 +196,14 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
}
Assert.assertEquals(100, grouper.getLimit());
final List<Grouper.Entry<Integer>> expected = new ArrayList<>();
final List<Pair<Integer, List<Object>>> expected = new ArrayList<>();
for (int i = 0; i < LIMIT; i++) {
expected.add(new Grouper.Entry<>(i, new Object[]{11L, 1L}));
expected.add(Pair.of(i, ImmutableList.of(11L, 1L)));
}
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
Assert.assertEquals(expected, entriesToList(grouper.iterator(true)));
// iterate again, even though the min-max offset heap has been destroyed, it is replaced with a reverse sorted array
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
Assert.assertEquals(expected, entriesToList(grouper.iterator(true)));
}
@Test
@ -209,24 +213,24 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
expectedException.expectMessage("attempted to add offset after grouper was iterated");
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 12120);
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouper(columnSelectorFactory, 12120);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < NUM_ROWS; i++) {
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new IntKey(i + KEY_BASE)).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
List<Grouper.Entry<IntKey>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
// an attempt to aggregate with a new key will explode after the grouper has been iterated
grouper.aggregate(KEY_BASE + NUM_ROWS + 1);
grouper.aggregate(new IntKey(KEY_BASE + NUM_ROWS + 1));
}
@Test
public void testIteratorOrderByDim()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"value",
OrderByColumnSpec.Direction.ASCENDING
@ -236,20 +240,29 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys and values both descending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", NUM_ROWS - i + KEY_BASE)));
Assert.assertTrue(String.valueOf(NUM_ROWS - i + KEY_BASE), grouper.aggregate(NUM_ROWS - i + KEY_BASE).isOk());
Assert.assertTrue(
String.valueOf(NUM_ROWS - i + KEY_BASE),
grouper.aggregate(new IntKey(NUM_ROWS - i + KEY_BASE)).isOk()
);
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals(KEY_BASE + i + 1L, iterated.get(i).getValues()[0]);
final CloseableIterator<Grouper.Entry<IntKey>> iterator = grouper.iterator(true);
int i = 0;
while (iterator.hasNext()) {
final Grouper.Entry<IntKey> entry = iterator.next();
Assert.assertEquals(KEY_BASE + i + 1L, entry.getValues()[0]);
i++;
}
Assert.assertEquals(LIMIT, i);
}
@Test
public void testIteratorOrderByDimDesc()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"value",
OrderByColumnSpec.Direction.DESCENDING
@ -259,12 +272,16 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys and values both ascending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", i + 1)));
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(i + KEY_BASE).isOk());
Assert.assertTrue(String.valueOf(i + KEY_BASE), grouper.aggregate(new IntKey(i + KEY_BASE)).isOk());
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals((long) NUM_ROWS - i, iterated.get(i).getValues()[0]);
final CloseableIterator<Grouper.Entry<IntKey>> iterator = grouper.iterator(true);
int i = 0;
while (iterator.hasNext()) {
final Grouper.Entry<IntKey> entry = iterator.next();
Assert.assertEquals((long) NUM_ROWS - i, entry.getValues()[0]);
i++;
}
}
@ -272,7 +289,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
public void testIteratorOrderByAggs()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"valueSum",
OrderByColumnSpec.Direction.ASCENDING
@ -282,20 +299,29 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys and values both descending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", NUM_ROWS - i)));
Assert.assertTrue(String.valueOf(NUM_ROWS - i + KEY_BASE), grouper.aggregate(NUM_ROWS - i + KEY_BASE).isOk());
Assert.assertTrue(
String.valueOf(NUM_ROWS - i + KEY_BASE),
grouper.aggregate(new IntKey(NUM_ROWS - i + KEY_BASE)).isOk()
);
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals(i + 1L, iterated.get(i).getValues()[0]);
final CloseableIterator<Grouper.Entry<IntKey>> iterator = grouper.iterator(true);
int i = 0;
while (iterator.hasNext()) {
final Grouper.Entry<IntKey> entry = iterator.next();
Assert.assertEquals(i + 1L, entry.getValues()[0]);
i++;
}
Assert.assertEquals(LIMIT, i);
}
@Test
public void testIteratorOrderByAggsDesc()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferHashGrouper<Integer> grouper = makeGrouperWithOrderBy(
final LimitedBufferHashGrouper<IntKey> grouper = makeGrouperWithOrderBy(
columnSelectorFactory,
"valueSum",
OrderByColumnSpec.Direction.DESCENDING
@ -305,21 +331,30 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
// limited grouper iterator will always sort by keys in ascending order, even if the heap was sorted by values
// so, we aggregate with keys descending and values asending so that the results are not re-ordered by key
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", i + 1)));
Assert.assertTrue(String.valueOf(NUM_ROWS - i + KEY_BASE), grouper.aggregate(NUM_ROWS - i + KEY_BASE).isOk());
Assert.assertTrue(
String.valueOf(NUM_ROWS - i + KEY_BASE),
grouper.aggregate(new IntKey(NUM_ROWS - i + KEY_BASE)).isOk()
);
}
List<Grouper.Entry<Integer>> iterated = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(LIMIT, iterated.size());
for (int i = 0; i < LIMIT; i++) {
Assert.assertEquals((long) NUM_ROWS - i, iterated.get(i).getValues()[0]);
final CloseableIterator<Grouper.Entry<IntKey>> iterator = grouper.iterator(true);
int i = 0;
while (iterator.hasNext()) {
final Grouper.Entry<IntKey> entry = iterator.next();
Assert.assertEquals((long) NUM_ROWS - i, entry.getValues()[0]);
i++;
}
Assert.assertEquals(LIMIT, i);
}
private static LimitedBufferHashGrouper<Integer> makeGrouper(
private static LimitedBufferHashGrouper<IntKey> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize
)
{
LimitedBufferHashGrouper<Integer> grouper = new LimitedBufferHashGrouper<>(
LimitedBufferHashGrouper<IntKey> grouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(),
AggregatorAdapters.factorizeBuffered(
@ -340,7 +375,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
return grouper;
}
private static LimitedBufferHashGrouper<Integer> makeGrouperWithOrderBy(
private static LimitedBufferHashGrouper<IntKey> makeGrouperWithOrderBy(
TestColumnSelectorFactory columnSelectorFactory,
String orderByColumn,
OrderByColumnSpec.Direction direction
@ -360,7 +395,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
.limit(LIMIT)
.build();
LimitedBufferHashGrouper<Integer> grouper = new LimitedBufferHashGrouper<>(
LimitedBufferHashGrouper<IntKey> grouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(12120)),
new GroupByIshKeySerde(orderBy),
AggregatorAdapters.factorizeBuffered(
@ -381,6 +416,23 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
return grouper;
}
private static List<Pair<Integer, List<Object>>> entriesToList(final Iterator<Grouper.Entry<IntKey>> entryIterator)
{
final List<Pair<Integer, List<Object>>> retVal = new ArrayList<>();
while (entryIterator.hasNext()) {
final Grouper.Entry<IntKey> entry = entryIterator.next();
retVal.add(
Pair.of(
entry.getKey().intValue(),
ImmutableList.copyOf(Arrays.asList(entry.getValues()))
)
);
}
return retVal;
}
/**
* key serde for more realistic ordering tests, similar to the {@link GroupByQueryEngineV2.GroupByEngineKeySerde} or
* {@link RowBasedGrouperHelper.RowBasedKeySerde} which are likely to be used in practice by the group-by engine,
@ -402,7 +454,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
false,
false,
1,
new Grouper.BufferComparator[] {KEY_COMPARATOR}
new Grouper.BufferComparator[]{KEY_COMPARATOR}
);
}
@ -417,7 +469,7 @@ public class LimitedBufferHashGrouperTest extends InitializedNullHandlingTest
aggregatorOffsets,
orderBy,
ImmutableList.of(DefaultDimensionSpec.of("value")),
new Grouper.BufferComparator[] {KEY_COMPARATOR},
new Grouper.BufferComparator[]{KEY_COMPARATOR},
false,
false,
Integer.BYTES

View File

@ -28,6 +28,7 @@ import org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouperTest.TestKe
import org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouperTest.TestResourceHolder;
import org.apache.druid.query.groupby.epinephelinae.Grouper.Entry;
import org.apache.druid.query.groupby.epinephelinae.Grouper.KeySerdeFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
@ -38,19 +39,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
public class ParallelCombinerTest
public class ParallelCombinerTest extends InitializedNullHandlingTest
{
private static final int THREAD_NUM = 8;
private static final ExecutorService SERVICE = Execs.multiThreaded(THREAD_NUM, "parallel-combiner-test-%d");
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(512);
private static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final KeySerdeFactory<ConcurrentGrouperTest.LongKey> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final class TestIterator implements CloseableIterator<Entry<Long>>
private static final class TestIterator implements CloseableIterator<Entry<ConcurrentGrouperTest.LongKey>>
{
private final Iterator<Entry<Long>> innerIterator;
private final Iterator<Entry<ConcurrentGrouperTest.LongKey>> innerIterator;
private boolean closed;
TestIterator(Iterator<Entry<Long>> innerIterator)
TestIterator(Iterator<Entry<ConcurrentGrouperTest.LongKey>> innerIterator)
{
this.innerIterator = innerIterator;
}
@ -62,7 +63,7 @@ public class ParallelCombinerTest
}
@Override
public Entry<Long> next()
public Entry<ConcurrentGrouperTest.LongKey> next()
{
return innerIterator.next();
}
@ -90,7 +91,7 @@ public class ParallelCombinerTest
@Test
public void testCombine() throws IOException
{
final ParallelCombiner<Long> combiner = new ParallelCombiner<>(
final ParallelCombiner<ConcurrentGrouperTest.LongKey> combiner = new ParallelCombiner<>(
TEST_RESOURCE_HOLDER,
new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()},
KEY_SERDE_FACTORY,
@ -103,9 +104,9 @@ public class ParallelCombinerTest
);
final int numRows = 1000;
final List<Entry<Long>> baseIterator = new ArrayList<>(numRows);
final List<Entry<ConcurrentGrouperTest.LongKey>> baseIterator = new ArrayList<>(numRows);
for (long i = 0; i < numRows; i++) {
baseIterator.add(new Entry<>(i, new Object[]{i * 10}));
baseIterator.add(new ReusableEntry<>(new ConcurrentGrouperTest.LongKey(i), new Object[]{i * 10}));
}
final int leafNum = 8;
@ -114,10 +115,19 @@ public class ParallelCombinerTest
iterators.add(new TestIterator(baseIterator.iterator()));
}
try (final CloseableIterator<Entry<Long>> iterator = combiner.combine(iterators, new ArrayList<>())) {
try (final CloseableIterator<Entry<ConcurrentGrouperTest.LongKey>> iterator =
combiner.combine(iterators, new ArrayList<>())) {
long expectedKey = 0;
while (iterator.hasNext()) {
Assert.assertEquals(new Entry<>(expectedKey, new Object[]{expectedKey++ * leafNum * 10}), iterator.next());
final Entry<ConcurrentGrouperTest.LongKey> expectedEntry = new ReusableEntry<>(
new ConcurrentGrouperTest.LongKey(expectedKey),
new Object[]{expectedKey * leafNum * 10}
);
final Entry<ConcurrentGrouperTest.LongKey> actualEntry = iterator.next();
GrouperTestUtil.assertEntriesEqual(expectedEntry, actualEntry);
expectedKey++;
}
}

View File

@ -22,12 +22,10 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -56,28 +54,27 @@ public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
public void testAggregate()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
final StreamingMergeSortedGrouper<IntKey> grouper = newGrouper(columnSelectorFactory, 1024);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(6);
grouper.aggregate(6);
grouper.aggregate(6);
grouper.aggregate(10);
grouper.aggregate(12);
grouper.aggregate(12);
grouper.aggregate(new IntKey(6));
grouper.aggregate(new IntKey(6));
grouper.aggregate(new IntKey(6));
grouper.aggregate(new IntKey(10));
grouper.aggregate(new IntKey(12));
grouper.aggregate(new IntKey(12));
grouper.finish();
final List<Entry<Integer>> expected = ImmutableList.of(
new Grouper.Entry<>(6, new Object[]{30L, 3L}),
new Grouper.Entry<>(10, new Object[]{10L, 1L}),
new Grouper.Entry<>(12, new Object[]{20L, 2L})
final List<Entry<IntKey>> expected = ImmutableList.of(
new ReusableEntry<>(new IntKey(6), new Object[]{30L, 3L}),
new ReusableEntry<>(new IntKey(10), new Object[]{10L, 1L}),
new ReusableEntry<>(new IntKey(12), new Object[]{20L, 2L})
);
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(
expected,
unsortedEntries
GrouperTestUtil.assertEntriesEquals(
expected.iterator(),
grouper.iterator(true)
);
}
@ -85,7 +82,7 @@ public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
public void testEmptyIterator()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
final StreamingMergeSortedGrouper<IntKey> grouper = newGrouper(columnSelectorFactory, 1024);
grouper.finish();
@ -108,11 +105,11 @@ public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
{
final ExecutorService exec = Execs.multiThreaded(2, "merge-sorted-grouper-test-%d");
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, bufferSize);
final StreamingMergeSortedGrouper<IntKey> grouper = newGrouper(columnSelectorFactory, bufferSize);
final List<Entry<Integer>> expected = new ArrayList<>(1024);
final List<Entry<IntKey>> expected = new ArrayList<>(1024);
for (int i = 0; i < 1024; i++) {
expected.add(new Entry<>(i, new Object[]{100L, 10L}));
expected.add(new ReusableEntry<>(new IntKey(i), new Object[]{100L, 10L}));
}
try {
@ -121,24 +118,26 @@ public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
for (int i = 0; i < 1024; i++) {
for (int j = 0; j < 10; j++) {
grouper.aggregate(i);
grouper.aggregate(new IntKey(i));
}
}
grouper.finish();
});
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(true));
final List<Entry<Integer>> actual = Ordering.from((Comparator<Entry<Integer>>) (o1, o2) -> Ints.compare(
o1.getKey(),
o2.getKey()
))
.sortedCopy(unsortedEntries);
final CloseableIterator<Entry<IntKey>> grouperIterator = grouper.iterator();
if (!actual.equals(expected)) {
future.get(); // Check there is an exception occured
Assert.fail();
}
GrouperTestUtil.assertEntriesEquals(
expected.iterator(),
GrouperTestUtil.sortedEntries(
grouperIterator,
k -> new IntKey(k.intValue()),
Comparator.comparing(IntKey::intValue)
).iterator()
);
// Ensure future resolves successfully.
future.get();
}
finally {
exec.shutdownNow();
@ -164,22 +163,22 @@ public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
expectedException.expect(QueryTimeoutException.class);
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 100);
final StreamingMergeSortedGrouper<IntKey> grouper = newGrouper(columnSelectorFactory, 100);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(6);
grouper.aggregate(new IntKey(6));
grouper.iterator();
}
private StreamingMergeSortedGrouper<Integer> newGrouper(
private StreamingMergeSortedGrouper<IntKey> newGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize
)
{
final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
final StreamingMergeSortedGrouper<Integer> grouper = new StreamingMergeSortedGrouper<>(
final StreamingMergeSortedGrouper<IntKey> grouper = new StreamingMergeSortedGrouper<>(
Suppliers.ofInstance(buffer),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,

View File

@ -19,10 +19,10 @@
package org.apache.druid.segment;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
@ -36,7 +36,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class MergingRowIteratorTest
public class MergingRowIteratorTest extends InitializedNullHandlingTest
{
@Test
public void testEmpty()
@ -117,7 +117,7 @@ public class MergingRowIteratorTest
MergingRowIterator mergingRowIterator = new MergingRowIterator(
Stream.of(timestampSequences).map(TestRowIterator::new).collect(Collectors.toList())
);
UnmodifiableIterator<Long> mergedTimestamps = Iterators.mergeSorted(
Iterator<Long> mergedTimestamps = Utils.mergeSorted(
Stream.of(timestampSequences).map(List::iterator).collect(Collectors.toList()),
Comparator.naturalOrder()
);

View File

@ -2250,9 +2250,6 @@ public class CachingClusteredClientTest
for (int i = 0; i < numTimesToQuery; ++i) {
TestHelper.assertExpectedResults(
new MergeIterable(
query instanceof GroupByQuery
? ((GroupByQuery) query).getResultOrdering()
: Comparators.naturalNullsFirst(),
FunctionalIterable
.create(new RangeIterable(expectedResultsRangeStart, expectedResultsRangeEnd))
.transformCat(
@ -2273,7 +2270,10 @@ public class CachingClusteredClientTest
return retVal;
}
}
)
),
query instanceof GroupByQuery
? ((GroupByQuery) query).getResultOrdering()
: Comparators.naturalNullsFirst()
),
runner.run(
QueryPlus.wrap(