Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled (#4704)

* Add steaming grouper

* Fix doc

* Use a single dictionary while combining

* Revert GroupByBenchmark

* Removed unused code

* More cleanup

* Remove unused config

* Fix some typos and bugs

* Refactor Groupers.mergeIterators()

* Add comments for combining tree

* Refactor buildCombineTree

* Refactor iterator

* Add ParallelCombiner

* Add ParallelCombinerTest

* Handle InterruptedException

* use AbstractPrioritizedCallable

* Address comments

* [maven-release-plugin] prepare release druid-0.11.0-sg

* [maven-release-plugin] prepare for next development iteration

* Address comments

* Revert "[maven-release-plugin] prepare for next development iteration"

This reverts commit 5c6b31e488.

* Revert "[maven-release-plugin] prepare release druid-0.11.0-sg"

This reverts commit 0f5c3a8b82.

* Fix build failure

* Change list to array

* rename sortableIds

* Address comments

* change to foreach loop

* Fix comment

* Revert keyEquals()

* Remove loop

* Address comments

* Fix build fail

* Address comments

* Remove unused imports

* Fix method name

* Split intermediate and leaf combine degrees

* Add comments to StreamingMergeSortedGrouper

* Add more comments and fix overflow

* Address comments

* ConcurrentGrouperTest cleanup

* add thread number configuration for parallel combining

* improve doc

* address comments

* fix build
This commit is contained in:
Jihoon Son 2017-10-18 15:24:08 +09:00 committed by Jonathan Wei
parent 43051829f2
commit 52d7f74226
26 changed files with 2745 additions and 603 deletions

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.utils;
public class IntArrayUtils
{
/**
* Inverses the values of the given array with their indexes.
* For example, the result for [2, 0, 1] is [1, 2, 0] because
*
* a[0]: 2 => a[2]: 0
* a[1]: 0 => a[0]: 1
* a[2]: 1 => a[1]: 2
*/
public static void inverse(int[] a)
{
for (int i = 0; i < a.length; i++) {
if (a[i] >= 0) {
inverseLoop(a, i);
}
}
for (int i = 0; i < a.length; i++) {
a[i] = ~a[i];
}
}
private static void inverseLoop(int[] a, int startValue)
{
final int startIndex = a[startValue];
int nextIndex = startIndex;
int nextValue = startValue;
do {
final int curIndex = nextIndex;
final int curValue = nextValue;
nextValue = curIndex;
nextIndex = a[curIndex];
a[curIndex] = ~curValue;
} while (nextIndex != startIndex);
}
private IntArrayUtils() {}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.utils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class IntArrayUtilsTest
{
@Test
public void testInverse()
{
final int numVals = 10000;
final Random random = new Random(System.currentTimeMillis());
final int[] inverted = new int[numVals];
final int[] original = new int[numVals];
final List<Integer> ints = IntStream.range(0, numVals).boxed().collect(Collectors.toList());
Collections.shuffle(ints, random);
for (int i = 0; i < numVals; i++) {
inverted[i] = ints.get(i);
original[i] = inverted[i];
}
IntArrayUtils.inverse(inverted);
for (int i = 0; i < numVals; i++) {
Assert.assertEquals(i, inverted[original[i]]);
}
}
}

View File

@ -34,6 +34,7 @@ A useful formula for estimating direct memory usage follows:
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`
The `+1` is a fuzzy parameter meant to account for the decompression and dictionary merging buffers and may need to be adjusted based on the characteristics of the data being ingested/queried.
Operators can ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command line.
## What is the intermediate computation buffer?
The intermediate computation buffer specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed. The default size is 1073741824 bytes (1GB).

View File

@ -49,9 +49,9 @@ An example groupBy query object is shown below:
],
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ],
"having": {
"type": "greaterThan",
"aggregation": "total_usage",
"value": 100
"type": "greaterThan",
"aggregation": "total_usage",
"value": 100
}
}
```
@ -180,7 +180,7 @@ disk space.
With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers).
druid.processing.numMergeBuffers). See [How much direct memory does Druid use?](../operations/performance-faq.html) for more details.
When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed
@ -188,6 +188,31 @@ this limit will fail with a "Resource limit exceeded" error indicating they exce
operators should make sure that the on-heap aggregations will not exceed available JVM heap space for the expected
concurrent query load.
#### Performance tuning for groupBy v2
##### Limit pushdown optimization
Druid pushes down the `limit` spec in groupBy queries to the segments on historicals wherever possible to early prune unnecessary intermediate results and minimize the amount of data transferred to brokers. By default, this technique is applied only when all fields in the `orderBy` spec is a subset of the grouping keys. This is because the `limitPushDown` doesn't guarantee the exact results if the `orderBy` spec includes any fields that are not in the grouping keys. However, you can enable this technique even in such cases if you can sacrifice some accuracy for fast query processing like in topN queries. See `forceLimitPushDown` in [advanced groupBy v2 configurations](#groupby-v2-configurations).
##### Optimizing hash table
The groupBy v2 engine uses an open addressing hash table for aggregation. The hash table is initalized with a given initial bucket number and gradually grows on buffer full. On hash collisions, the linear probing technique is used.
The default number of initial buckets is 1024 and the default max load factor of the hash table is 0.7. If you can see too many collisions in the hash table, you can adjust these numbers. See `bufferGrouperInitialBuckets` and `bufferGrouperMaxLoadFactor` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
##### Parallel combine
Once a historical finishes aggregation using the hash table, it sorts aggregates and merge them before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging aggregates which is an http thread to send data to brokers.
This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible.
However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).
Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
#### Alternatives
There are some situations where other query types may be a better choice than groupBy.
@ -208,55 +233,87 @@ indexing mechanism, and runs the outer query on these materialized results. "v2"
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
strategy perform the outer query on the broker in a single-threaded fashion.
#### Server configuration
#### Configurations
When using the "v2" strategy, the following runtime properties apply:
This section describes the configurations for groupBy queries. You can set system-wide configurations by adding them to runtime properties or query-specific configurations by adding them to query contexts. All runtime properties are prefixed by `druid.query.groupBy`.
#### Commonly tuned configurations
##### Configurations for groupBy v2
Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
line.
Supported query contexts:
When using the "v1" strategy, the following runtime properties apply:
|Key|Description|
|---|-----------|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
#### Advanced configurations
##### Common configuragions for all groupBy strategies
Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
#### Query context
Supported query contexts:
When using the "v2" strategy, the following query context parameters apply:
|Property|Description|
|--------|-----------|
|Key|Description|
|---|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`groupByIsSingleThreaded`|Overrides the value of `druid.query.groupBy.singleThreaded` for this query.|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|
|`forceHashAggregation`|Force to use hash-based aggregation.|
When using the "v1" strategy, the following query context parameters apply:
|Property|Description|
|--------|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`groupByIsSingleThreaded`|Overrides the value of `druid.query.groupBy.singleThreaded` for this query.|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|
##### GroupBy v2 configurations
Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default (1024).|0|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default (0.7).|0|
|`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false|
|`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need less threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)|
Supported query contexts:
|Key|Description|Default|
|---|-----------|-------|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|None|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|None|
|`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None|
|`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None|
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
##### GroupBy v1 configurations
Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
Supported query contexts:
|Key|Description|Default|
|---|-----------|-------|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false|

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
public class CloseableIterators
{
public static <T> CloseableIterator<T> concat(List<? extends CloseableIterator<? extends T>> iterators)
{
final Closer closer = Closer.create();
iterators.forEach(closer::register);
final Iterator<T> innerIterator = Iterators.concat(iterators.iterator());
return wrap(innerIterator, closer);
}
public static <T> CloseableIterator<T> mergeSorted(
List<? extends CloseableIterator<? extends T>> iterators,
Comparator<T> comparator
)
{
Preconditions.checkNotNull(comparator);
final Closer closer = Closer.create();
iterators.forEach(closer::register);
final Iterator<T> innerIterator = Iterators.mergeSorted(iterators, comparator);
return wrap(innerIterator, closer);
}
public static <T> CloseableIterator<T> wrap(Iterator<T> innerIterator, @Nullable Closeable closeable)
{
return new CloseableIterator<T>()
{
private boolean closed;
@Override
public boolean hasNext()
{
return innerIterator.hasNext();
}
@Override
public T next()
{
return innerIterator.next();
}
@Override
public void close() throws IOException
{
if (!closed) {
if (closeable != null) {
closeable.close();
}
closed = true;
}
}
};
}
public static <T> CloseableIterator<T> withEmptyBaggage(Iterator<T> innerIterator)
{
return wrap(innerIterator, null);
}
private CloseableIterators() {}
}

View File

@ -38,6 +38,8 @@ public class GroupByQueryConfig
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
@ -75,6 +77,12 @@ public class GroupByQueryConfig
@JsonProperty
private boolean forceHashAggregation = false;
@JsonProperty
private int intermediateCombineDegree = 8;
@JsonProperty
private int numParallelCombineThreads = 1;
public String getDefaultStrategy()
{
return defaultStrategy;
@ -144,7 +152,17 @@ public class GroupByQueryConfig
{
return forceHashAggregation;
}
public int getIntermediateCombineDegree()
{
return intermediateCombineDegree;
}
public int getNumParallelCombineThreads()
{
return numParallelCombineThreads;
}
public GroupByQueryConfig withOverrides(final GroupByQuery query)
{
final GroupByQueryConfig newConfig = new GroupByQueryConfig();
@ -180,6 +198,14 @@ public class GroupByQueryConfig
);
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
newConfig.intermediateCombineDegree = query.getContextValue(
CTX_KEY_INTERMEDIATE_COMBINE_DEGREE,
getIntermediateCombineDegree()
);
newConfig.numParallelCombineThreads = query.getContextValue(
CTX_KEY_NUM_PARALLEL_COMBINE_THREADS,
getNumParallelCombineThreads()
);
return newConfig;
}
@ -198,6 +224,8 @@ public class GroupByQueryConfig
", maxOnDiskStorage=" + maxOnDiskStorage +
", forcePushDownLimit=" + forcePushDownLimit +
", forceHashAggregation=" + forceHashAggregation +
", intermediateCombineDegree=" + intermediateCombineDegree +
", numParallelCombineThreads=" + numParallelCombineThreads +
'}';
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
@ -28,9 +29,9 @@ import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import io.druid.segment.ColumnSelectorFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
@ -225,13 +226,13 @@ public class BufferArrayGrouper implements IntGrouper
}
@Override
public Iterator<Entry<Integer>> iterator(boolean sorted)
public CloseableIterator<Entry<Integer>> iterator(boolean sorted)
{
if (sorted) {
throw new UnsupportedOperationException("sorted iterator is not supported yet");
}
return new Iterator<Entry<Integer>>()
return new CloseableIterator<Entry<Integer>>()
{
int cur = -1;
boolean findNext = false;
@ -276,6 +277,12 @@ public class BufferArrayGrouper implements IntGrouper
}
return new Entry<>(cur - 1, values);
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}
}

View File

@ -22,16 +22,18 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -167,12 +169,12 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
}
@Override
public Iterator<Entry<KeyType>> iterator(boolean sorted)
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
if (!initialized) {
// it's possible for iterator() to be called before initialization when
// a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest)
return Iterators.<Entry<KeyType>>emptyIterator();
return CloseableIterators.withEmptyBaggage(Iterators.<Entry<KeyType>>emptyIterator());
}
if (sorted) {
@ -225,7 +227,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
}
);
return new Iterator<Entry<KeyType>>()
return new CloseableIterator<Entry<KeyType>>()
{
int curr = 0;
final int size = getSize();
@ -250,10 +252,16 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
// do nothing
}
};
} else {
// Unsorted iterator
return new Iterator<Entry<KeyType>>()
return new CloseableIterator<Entry<KeyType>>()
{
int curr = 0;
final int size = getSize();
@ -282,6 +290,12 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}
}

View File

@ -19,29 +19,33 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
public class CloseableGrouperIterator<KeyType, T> implements Iterator<T>, Closeable
public class CloseableGrouperIterator<KeyType, T> implements CloseableIterator<T>
{
private final Function<Grouper.Entry<KeyType>, T> transformer;
private final Closeable closer;
private final Iterator<Grouper.Entry<KeyType>> iterator;
private final Function<Entry<KeyType>, T> transformer;
private final CloseableIterator<Entry<KeyType>> iterator;
private final Closer closer;
public CloseableGrouperIterator(
final Grouper<KeyType> grouper,
final boolean sorted,
final Function<Grouper.Entry<KeyType>, T> transformer,
final Closeable closer
final Closeable closeable
)
{
this.transformer = transformer;
this.closer = closer;
this.iterator = grouper.iterator(sorted);
this.closer = Closer.create();
closer.register(iterator);
closer.register(closeable);
}
@Override
@ -65,13 +69,11 @@ public class CloseableGrouperIterator<KeyType, T> implements Iterator<T>, Closea
@Override
public void close()
{
if (closer != null) {
try {
closer.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -23,21 +23,29 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -75,16 +83,64 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private final DefaultLimitSpec limitSpec;
private final boolean sortHasNonGroupingFields;
private final Comparator<Grouper.Entry<KeyType>> keyObjComparator;
private final ListeningExecutorService grouperSorter;
private final ListeningExecutorService executor;
private final int priority;
private final boolean hasQueryTimeout;
private final long queryTimeoutAt;
private final long maxDictionarySizeForCombiner;
@Nullable
private final ParallelCombiner<KeyType> parallelCombiner;
private volatile boolean initialized = false;
public ConcurrentGrouper(
final GroupByQueryConfig groupByQueryConfig,
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> combineKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
final int concurrencyHint,
final DefaultLimitSpec limitSpec,
final boolean sortHasNonGroupingFields,
final ListeningExecutorService executor,
final int priority,
final boolean hasQueryTimeout,
final long queryTimeoutAt
)
{
this(
bufferSupplier,
combineBufferSupplier,
keySerdeFactory,
combineKeySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
groupByQueryConfig.getBufferGrouperMaxSize(),
groupByQueryConfig.getBufferGrouperMaxLoadFactor(),
groupByQueryConfig.getBufferGrouperInitialBuckets(),
temporaryStorage,
spillMapper,
concurrencyHint,
limitSpec,
sortHasNonGroupingFields,
executor,
priority,
hasQueryTimeout,
queryTimeoutAt,
groupByQueryConfig.getIntermediateCombineDegree(),
groupByQueryConfig.getNumParallelCombineThreads()
);
}
ConcurrentGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> combineKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
final int bufferGrouperMaxSize,
@ -95,24 +151,24 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
final int concurrencyHint,
final DefaultLimitSpec limitSpec,
final boolean sortHasNonGroupingFields,
final ListeningExecutorService grouperSorter,
final ListeningExecutorService executor,
final int priority,
final boolean hasQueryTimeout,
final long queryTimeoutAt,
final int mergeBufferSize
final int intermediateCombineDegree,
final int numParallelCombineThreads
)
{
Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0");
Preconditions.checkArgument(
concurrencyHint >= numParallelCombineThreads,
"numParallelCombineThreads[%s] cannot larger than concurrencyHint[%s]",
numParallelCombineThreads,
concurrencyHint
);
this.groupers = new ArrayList<>(concurrencyHint);
this.threadLocalGrouper = new ThreadLocal<SpillingGrouper<KeyType>>()
{
@Override
protected SpillingGrouper<KeyType> initialValue()
{
return groupers.get(threadNumber.getAndIncrement());
}
};
this.threadLocalGrouper = ThreadLocal.withInitial(() -> groupers.get(threadNumber.getAndIncrement()));
this.bufferSupplier = bufferSupplier;
this.columnSelectorFactory = columnSelectorFactory;
@ -127,10 +183,27 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
this.limitSpec = limitSpec;
this.sortHasNonGroupingFields = sortHasNonGroupingFields;
this.keyObjComparator = keySerdeFactory.objectComparator(sortHasNonGroupingFields);
this.grouperSorter = Preconditions.checkNotNull(grouperSorter);
this.executor = Preconditions.checkNotNull(executor);
this.priority = priority;
this.hasQueryTimeout = hasQueryTimeout;
this.queryTimeoutAt = queryTimeoutAt;
this.maxDictionarySizeForCombiner = combineKeySerdeFactory.getMaxDictionarySize();
if (numParallelCombineThreads > 1) {
this.parallelCombiner = new ParallelCombiner<>(
combineBufferSupplier,
getCombiningFactories(aggregatorFactories),
combineKeySerdeFactory,
executor,
sortHasNonGroupingFields,
Math.min(numParallelCombineThreads, concurrencyHint),
priority,
queryTimeoutAt,
intermediateCombineDegree
);
} else {
this.parallelCombiner = null;
}
}
@Override
@ -143,11 +216,9 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
final int sliceSize = (buffer.capacity() / concurrencyHint);
for (int i = 0; i < concurrencyHint; i++) {
final ByteBuffer slice = buffer.duplicate();
slice.position(sliceSize * i);
slice.limit(slice.position() + sliceSize);
final ByteBuffer slice = Groupers.getSlice(buffer, sliceSize, i);
final SpillingGrouper<KeyType> grouper = new SpillingGrouper<>(
Suppliers.ofInstance(slice.slice()),
Suppliers.ofInstance(slice),
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
@ -222,15 +293,11 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
throw new ISE("Grouper is closed");
}
for (Grouper<KeyType> grouper : groupers) {
synchronized (grouper) {
grouper.reset();
}
}
groupers.forEach(Grouper::reset);
}
@Override
public Iterator<Entry<KeyType>> iterator(final boolean sorted)
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
@ -240,28 +307,43 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
throw new ISE("Grouper is closed");
}
return Groupers.mergeIterators(
sorted && isParallelSortAvailable() ? parallelSortAndGetGroupersIterator() : getGroupersIterator(sorted),
sorted ? keyObjComparator : null
);
final List<CloseableIterator<Entry<KeyType>>> sortedIterators = sorted && isParallelizable() ?
parallelSortAndGetGroupersIterator() :
getGroupersIterator(sorted);
// Parallel combine is used only when data is spilled. This is because ConcurrentGrouper uses two different modes
// depending on data is spilled or not. If data is not spilled, all inputs are completely aggregated and no more
// aggregation is required.
if (sorted && spilling && parallelCombiner != null) {
// First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same
// merged dictionary is used for all combining threads
final List<String> dictionary = tryMergeDictionary();
if (dictionary != null) {
return parallelCombiner.combine(sortedIterators, dictionary);
}
}
return sorted ?
CloseableIterators.mergeSorted(sortedIterators, keyObjComparator) :
CloseableIterators.concat(sortedIterators);
}
private boolean isParallelSortAvailable()
private boolean isParallelizable()
{
return concurrencyHint > 1;
}
private List<Iterator<Entry<KeyType>>> parallelSortAndGetGroupersIterator()
private List<CloseableIterator<Entry<KeyType>>> parallelSortAndGetGroupersIterator()
{
// The number of groupers is same with the number of processing threads in grouperSorter
final ListenableFuture<List<Iterator<Entry<KeyType>>>> future = Futures.allAsList(
// The number of groupers is same with the number of processing threads in the executor
final ListenableFuture<List<CloseableIterator<Entry<KeyType>>>> future = Futures.allAsList(
groupers.stream()
.map(grouper ->
grouperSorter.submit(
new AbstractPrioritizedCallable<Iterator<Entry<KeyType>>>(priority)
executor.submit(
new AbstractPrioritizedCallable<CloseableIterator<Entry<KeyType>>>(priority)
{
@Override
public Iterator<Entry<KeyType>> call() throws Exception
public CloseableIterator<Entry<KeyType>> call() throws Exception
{
return grouper.iterator(true);
}
@ -287,21 +369,47 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
}
}
private List<Iterator<Entry<KeyType>>> getGroupersIterator(boolean sorted)
private List<CloseableIterator<Entry<KeyType>>> getGroupersIterator(boolean sorted)
{
return groupers.stream()
.map(grouper -> grouper.iterator(sorted))
.collect(Collectors.toList());
}
/**
* Merge dictionaries of {@link Grouper.KeySerde}s of {@link Grouper}s. The result dictionary contains unique string
* keys.
*
* @return merged dictionary if its size does not exceed max dictionary size. Otherwise null.
*/
@Nullable
private List<String> tryMergeDictionary()
{
final Set<String> mergedDictionary = new HashSet<>();
long totalDictionarySize = 0L;
for (SpillingGrouper<KeyType> grouper : groupers) {
final List<String> dictionary = grouper.mergeAndGetDictionary();
for (String key : dictionary) {
if (mergedDictionary.add(key)) {
totalDictionarySize += RowBasedGrouperHelper.estimateStringKeySize(key);
if (totalDictionarySize > maxDictionarySizeForCombiner) {
return null;
}
}
}
}
return ImmutableList.copyOf(mergedDictionary);
}
@Override
public void close()
{
closed = true;
for (Grouper<KeyType> grouper : groupers) {
synchronized (grouper) {
grouper.close();
}
if (!closed) {
closed = true;
groupers.forEach(Grouper::close);
}
}
@ -309,4 +417,11 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
{
return keyHash % groupers.size();
}
private AggregatorFactory[] getCombiningFactories(AggregatorFactory[] aggregatorFactories)
{
final AggregatorFactory[] combiningFactories = new AggregatorFactory[aggregatorFactories.length];
Arrays.setAll(combiningFactories, i -> aggregatorFactories[i].getCombiningFactory());
return combiningFactories;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -33,8 +34,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.BlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser;
import io.druid.collections.ResourceHolder;
import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
@ -80,6 +83,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
private final ListeningExecutorService exec;
private final QueryWatcher queryWatcher;
private final int concurrencyHint;
private final NonBlockingPool<ByteBuffer> processingBufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper spillMapper;
private final String processingTmpDir;
@ -91,6 +95,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
QueryWatcher queryWatcher,
Iterable<QueryRunner<Row>> queryables,
int concurrencyHint,
NonBlockingPool<ByteBuffer> processingBufferPool,
BlockingPool<ByteBuffer> mergeBufferPool,
int mergeBufferSize,
ObjectMapper spillMapper,
@ -102,6 +107,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.concurrencyHint = concurrencyHint;
this.processingBufferPool = processingBufferPool;
this.mergeBufferPool = mergeBufferPool;
this.spillMapper = spillMapper;
this.processingTmpDir = processingTmpDir;
@ -154,6 +160,22 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
final boolean hasTimeout = QueryContexts.hasTimeout(query);
final long timeoutAt = System.currentTimeMillis() + queryTimeout;
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier = new Supplier<ResourceHolder<ByteBuffer>>()
{
private boolean initialized;
private ResourceHolder<ByteBuffer> buffer;
@Override
public ResourceHolder<ByteBuffer> get()
{
if (!initialized) {
buffer = processingBufferPool.take();
initialized = true;
}
return buffer;
}
};
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
{
@ -194,6 +216,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
null,
config,
Suppliers.ofInstance(mergeBufferHolder.get()),
combineBufferSupplier,
concurrencyHint,
temporaryStorage,
spillMapper,

View File

@ -22,6 +22,7 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.ResourceHolder;
@ -732,6 +733,12 @@ public class GroupByQueryEngineV2
return ByteBuffer.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(ByteBuffer key)
{

View File

@ -22,13 +22,14 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.ToIntFunction;
/**
@ -100,10 +101,10 @@ public interface Grouper<KeyType> extends Closeable
/**
* Iterate through entries.
* <p>
* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this
* method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you
* want to reuse it), or {@link #iterator(boolean)} again if you want another iterator. This method is not thread-safe
* and must not be called by multiple threads concurrently.
* Some implementations allow writes even after this method is called. After you are done with the iterator
* returned by this method, you should either call {@link #close()} (if you are done with the Grouper) or
* {@link #reset()} (if you want to reuse it). Some implementations allow calling {@link #iterator(boolean)} again if
* you want another iterator. But, this method must not be called by multiple threads concurrently.
* <p>
* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on
* deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you
@ -116,7 +117,7 @@ public interface Grouper<KeyType> extends Closeable
*
* @return entry iterator
*/
Iterator<Entry<KeyType>> iterator(boolean sorted);
CloseableIterator<Entry<KeyType>> iterator(boolean sorted);
class Entry<T>
{
@ -186,10 +187,22 @@ public interface Grouper<KeyType> extends Closeable
interface KeySerdeFactory<T>
{
/**
* Create a new KeySerde, which may be stateful.
* Return max dictionary size threshold.
*
* @return max dictionary size
*/
long getMaxDictionarySize();
/**
* Create a new {@link KeySerde}, which may be stateful.
*/
KeySerde<T> factorize();
/**
* Create a new {@link KeySerde} with the given dictionary.
*/
KeySerde<T> factorizeWithDictionary(List<String> dictionary);
/**
* Return an object that knows how to compare two serialized key instances. Will be called by the
* {@link #iterator(boolean)} method if sorting is enabled.
@ -217,6 +230,11 @@ public interface Grouper<KeyType> extends Closeable
*/
Class<T> keyClazz();
/**
* Return the dictionary of this KeySerde. The return value should not be null.
*/
List<String> getDictionary();
/**
* Serialize a key. This will be called by the {@link #aggregate(Comparable)} method. The buffer will not
* be retained after the aggregate method returns, so reusing buffers is OK.

View File

@ -19,10 +19,7 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.collect.Iterators;
import java.util.Comparator;
import java.util.Iterator;
import java.nio.ByteBuffer;
public class Groupers
{
@ -72,25 +69,11 @@ public class Groupers
return keyHash | 0x80000000;
}
public static <KeyType> Iterator<Grouper.Entry<KeyType>> mergeIterators(
final Iterable<Iterator<Grouper.Entry<KeyType>>> iterators,
final Comparator<Grouper.Entry<KeyType>> keyTypeComparator
)
public static ByteBuffer getSlice(ByteBuffer buffer, int sliceSize, int i)
{
if (keyTypeComparator != null) {
return Iterators.mergeSorted(
iterators,
new Comparator<Grouper.Entry<KeyType>>()
{
@Override
public int compare(Grouper.Entry<KeyType> lhs, Grouper.Entry<KeyType> rhs)
{
return keyTypeComparator.compare(lhs, rhs);
}
}
);
} else {
return Iterators.concat(iterators.iterator());
}
final ByteBuffer slice = buffer.duplicate();
slice.position(sliceSize * i);
slice.limit(slice.position() + sliceSize);
return slice.slice();
}
}

View File

@ -22,16 +22,18 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -199,13 +201,13 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
}
@Override
public Iterator<Entry<KeyType>> iterator(boolean sorted)
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
if (!initialized) {
// it's possible for iterator() to be called before initialization when
// a nested groupBy's subquery has an empty result set (see testEmptySubqueryWithLimitPushDown()
// in GroupByQueryRunnerTest)
return Iterators.<Entry<KeyType>>emptyIterator();
return CloseableIterators.withEmptyBaggage(Iterators.<Entry<KeyType>>emptyIterator());
}
if (sortHasNonGroupingFields) {
@ -251,7 +253,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
}
}
private Iterator<Grouper.Entry<KeyType>> makeDefaultOrderingIterator()
private CloseableIterator<Entry<KeyType>> makeDefaultOrderingIterator()
{
final int size = offsetHeap.getHeapSize();
@ -299,7 +301,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
}
);
return new Iterator<Grouper.Entry<KeyType>>()
return new CloseableIterator<Entry<KeyType>>()
{
int curr = 0;
@ -320,13 +322,19 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}
private Iterator<Grouper.Entry<KeyType>> makeHeapIterator()
private CloseableIterator<Entry<KeyType>> makeHeapIterator()
{
final int initialHeapSize = offsetHeap.getHeapSize();
return new Iterator<Grouper.Entry<KeyType>>()
return new CloseableIterator<Entry<KeyType>>()
{
int curr = 0;
@ -354,6 +362,12 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}

View File

@ -0,0 +1,489 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import io.druid.query.groupby.epinephelinae.Grouper.KeySerdeFactory;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.ColumnCapabilities;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* ParallelCombiner builds a combining tree which asynchronously aggregates input entries. Each node of the combining
* tree is a combining task executed in parallel which aggregates inputs from the child nodes.
*/
public class ParallelCombiner<KeyType>
{
// The combining tree created by this class can have two different degrees for intermediate nodes.
// The "leaf combine degree (LCD)" is the number of leaf nodes combined together, while the "intermediate combine
// degree (ICD)" is the number of non-leaf nodes combined together. The below picture shows an example where LCD = 2
// and ICD = 4.
//
// o <- non-leaf node
// / / \ \ <- ICD = 4
// o o o o <- non-leaf nodes
// / \ / \ / \ / \ <- LCD = 2
// o o o o o o o o <- leaf nodes
//
// The reason why we need two different degrees is to optimize the number of non-leaf nodes which are run by
// different threads at the same time. Note that the leaf nodes are sorted iterators of SpillingGroupers which
// generally returns multiple rows of the same grouping key which in turn should be combined, while the non-leaf nodes
// are iterators of StreamingMergeSortedGroupers and always returns a single row per grouping key. Generally, the
// performance will get better as LCD becomes low while ICD is some value larger than LCD because the amount of work
// each thread has to do can be properly tuned. The optimal values for LCD and ICD may vary with query and data. Here,
// we use a simple heuristic to avoid complex optimization. That is, ICD is fixed as a user-configurable value and the
// minimum LCD satisfying the memory restriction is searched. See findLeafCombineDegreeAndNumBuffers() for more
// details.
private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2;
private final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier;
private final AggregatorFactory[] combiningFactories;
private final KeySerdeFactory<KeyType> combineKeySerdeFactory;
private final ListeningExecutorService executor;
private final Comparator<Entry<KeyType>> keyObjComparator;
private final int concurrencyHint;
private final int priority;
private final long queryTimeoutAt;
// The default value is 8 which comes from an experiment. A non-leaf node will combine up to intermediateCombineDegree
// rows for the same grouping key.
private final int intermediateCombineDegree;
public ParallelCombiner(
Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
AggregatorFactory[] combiningFactories,
KeySerdeFactory<KeyType> combineKeySerdeFactory,
ListeningExecutorService executor,
boolean sortHasNonGroupingFields,
int concurrencyHint,
int priority,
long queryTimeoutAt,
int intermediateCombineDegree
)
{
this.combineBufferSupplier = combineBufferSupplier;
this.combiningFactories = combiningFactories;
this.combineKeySerdeFactory = combineKeySerdeFactory;
this.executor = executor;
this.keyObjComparator = combineKeySerdeFactory.objectComparator(sortHasNonGroupingFields);
this.concurrencyHint = concurrencyHint;
this.priority = priority;
this.intermediateCombineDegree = intermediateCombineDegree;
this.queryTimeoutAt = queryTimeoutAt;
}
/**
* Build a combining tree for the input iterators which combine input entries asynchronously. Each node in the tree
* is a combining task which iterates through child iterators, aggregates the inputs from those iterators, and returns
* an iterator for the result of aggregation.
* <p>
* This method is called when data is spilled and thus streaming combine is preferred to avoid too many disk accesses.
*
* @return an iterator of the root grouper of the combining tree
*/
public CloseableIterator<Entry<KeyType>> combine(
List<? extends CloseableIterator<Entry<KeyType>>> sortedIterators,
List<String> mergedDictionary
)
{
// CombineBuffer is initialized when this method is called and closed after the result iterator is done
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
final ByteBuffer combineBuffer = combineBufferHolder.get();
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
combineKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
combiningFactories
);
// We want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size
// required by StreamingMergeSortedGrouper. Here, we find the leafCombineDegree of the cominbing tree and the
// required number of buffers maximizing the parallelism.
final Pair<Integer, Integer> degreeAndNumBuffers = findLeafCombineDegreeAndNumBuffers(
combineBuffer,
minimumRequiredBufferCapacity,
concurrencyHint,
sortedIterators.size()
);
final int leafCombineDegree = degreeAndNumBuffers.lhs;
final int numBuffers = degreeAndNumBuffers.rhs;
final int sliceSize = combineBuffer.capacity() / numBuffers;
final Supplier<ByteBuffer> bufferSupplier = createCombineBufferSupplier(combineBuffer, numBuffers, sliceSize);
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> combineIteratorAndFutures = buildCombineTree(
sortedIterators,
bufferSupplier,
combiningFactories,
leafCombineDegree,
mergedDictionary
);
final CloseableIterator<Entry<KeyType>> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs);
final List<Future> combineFutures = combineIteratorAndFutures.rhs;
final Closer closer = Closer.create();
closer.register(combineBufferHolder);
closer.register(() -> checkCombineFutures(combineFutures));
return CloseableIterators.wrap(combineIterator, closer);
}
private static void checkCombineFutures(List<Future> combineFutures)
{
for (Future future : combineFutures) {
try {
if (!future.isDone()) {
// Cancel futures if close() for the iterator is called early due to some reason (e.g., test failure)
future.cancel(true);
} else {
future.get();
}
}
catch (InterruptedException | CancellationException e) {
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
private static Supplier<ByteBuffer> createCombineBufferSupplier(
ByteBuffer combineBuffer,
int numBuffers,
int sliceSize
)
{
return new Supplier<ByteBuffer>()
{
private int i = 0;
@Override
public ByteBuffer get()
{
if (i < numBuffers) {
return Groupers.getSlice(combineBuffer, sliceSize, i++);
} else {
throw new ISE("Requested number[%d] of buffer slices exceeds the planned one[%d]", i++, numBuffers);
}
}
};
}
/**
* Find a minimum size of the buffer slice and corresponding leafCombineDegree and number of slices. Note that each
* node in the combining tree is executed by different threads. This method assumes that combining the leaf nodes
* requires threads as many as possible, while combining intermediate nodes is not. See the comment on
* {@link #MINIMUM_LEAF_COMBINE_DEGREE} for more details.
*
* @param combineBuffer entire buffer used for combining tree
* @param requiredMinimumBufferCapacity minimum buffer capacity for {@link StreamingMergeSortedGrouper}
* @param numAvailableThreads number of available threads
* @param numLeafNodes number of leaf nodes of combining tree
*
* @return a pair of leafCombineDegree and number of buffers if found.
*/
private Pair<Integer, Integer> findLeafCombineDegreeAndNumBuffers(
ByteBuffer combineBuffer,
int requiredMinimumBufferCapacity,
int numAvailableThreads,
int numLeafNodes
)
{
for (int leafCombineDegree = MINIMUM_LEAF_COMBINE_DEGREE; leafCombineDegree <= numLeafNodes; leafCombineDegree++) {
final int requiredBufferNum = computeRequiredBufferNum(numLeafNodes, leafCombineDegree);
if (requiredBufferNum <= numAvailableThreads) {
final int expectedSliceSize = combineBuffer.capacity() / requiredBufferNum;
if (expectedSliceSize >= requiredMinimumBufferCapacity) {
return Pair.of(leafCombineDegree, requiredBufferNum);
}
}
}
throw new ISE(
"Cannot find a proper leaf combine degree for the combining tree. "
+ "Each node of the combining tree requires a buffer of [%d] bytes. "
+ "Try increasing druid.processing.buffer.sizeBytes for larger buffer or "
+ "druid.query.groupBy.intermediateCombineDegree for a smaller tree",
requiredMinimumBufferCapacity
);
}
/**
* Recursively compute the number of required buffers for a combining tree in a bottom-up manner. Since each node of
* the combining tree represents a combining task and each combining task requires one buffer, the number of required
* buffers is the number of nodes of the combining tree.
*
* @param numChildNodes number of child nodes
* @param combineDegree combine degree for the current level
*
* @return minimum number of buffers required for combining tree
*
* @see #buildCombineTree(List, Supplier, AggregatorFactory[], int, List)
*/
private int computeRequiredBufferNum(int numChildNodes, int combineDegree)
{
// numChildrenForLastNode used to determine that the last node is needed for the current level.
// Please see buildCombineTree() for more details.
final int numChildrenForLastNode = numChildNodes % combineDegree;
final int numCurLevelNodes = numChildNodes / combineDegree + (numChildrenForLastNode > 1 ? 1 : 0);
final int numChildOfParentNodes = numCurLevelNodes + (numChildrenForLastNode == 1 ? 1 : 0);
if (numChildOfParentNodes == 1) {
return numCurLevelNodes;
} else {
return numCurLevelNodes +
computeRequiredBufferNum(numChildOfParentNodes, intermediateCombineDegree);
}
}
/**
* Recursively build a combining tree in a bottom-up manner. Each node of the tree is a task that combines input
* iterators asynchronously.
*
* @param childIterators all iterators of the child level
* @param bufferSupplier combining buffer supplier
* @param combiningFactories array of combining aggregator factories
* @param combineDegree combining degree for the current level
* @param dictionary merged dictionary
*
* @return a pair of a list of iterators of the current level in the combining tree and a list of futures of all
* executed combining tasks
*/
private Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> buildCombineTree(
List<? extends CloseableIterator<Entry<KeyType>>> childIterators,
Supplier<ByteBuffer> bufferSupplier,
AggregatorFactory[] combiningFactories,
int combineDegree,
List<String> dictionary
)
{
final int numChildLevelIterators = childIterators.size();
final List<CloseableIterator<Entry<KeyType>>> childIteratorsOfNextLevel = new ArrayList<>();
final List<Future> combineFutures = new ArrayList<>();
// The below algorithm creates the combining nodes of the current level. It first checks that the number of children
// to be combined together is 1. If it is, the intermediate combining node for that child is not needed. Instead, it
// can be directly connected to a node of the parent level. Here is an example of generated tree when
// numLeafNodes = 6 and leafCombineDegree = intermediateCombineDegree = 2. See the description of
// MINIMUM_LEAF_COMBINE_DEGREE for more details about leafCombineDegree and intermediateCombineDegree.
//
// o
// / \
// o \
// / \ \
// o o o
// / \ / \ / \
// o o o o o o
//
// We can expect that the aggregates can be combined as early as possible because the tree is built in a bottom-up
// manner.
for (int i = 0; i < numChildLevelIterators; i += combineDegree) {
if (i < numChildLevelIterators - 1) {
final List<? extends CloseableIterator<Entry<KeyType>>> subIterators = childIterators.subList(
i,
Math.min(i + combineDegree, numChildLevelIterators)
);
final Pair<CloseableIterator<Entry<KeyType>>, Future> iteratorAndFuture = runCombiner(
subIterators,
bufferSupplier.get(),
combiningFactories,
dictionary
);
childIteratorsOfNextLevel.add(iteratorAndFuture.lhs);
combineFutures.add(iteratorAndFuture.rhs);
} else {
// If there remains one child, it can be directly connected to a node of the parent level.
childIteratorsOfNextLevel.add(childIterators.get(i));
}
}
if (childIteratorsOfNextLevel.size() == 1) {
// This is the root
return Pair.of(childIteratorsOfNextLevel, combineFutures);
} else {
// Build the parent level iterators
final Pair<List<CloseableIterator<Entry<KeyType>>>, List<Future>> parentIteratorsAndFutures =
buildCombineTree(
childIteratorsOfNextLevel,
bufferSupplier,
combiningFactories,
intermediateCombineDegree,
dictionary
);
combineFutures.addAll(parentIteratorsAndFutures.rhs);
return Pair.of(parentIteratorsAndFutures.lhs, combineFutures);
}
}
private Pair<CloseableIterator<Entry<KeyType>>, Future> runCombiner(
List<? extends CloseableIterator<Entry<KeyType>>> iterators,
ByteBuffer combineBuffer,
AggregatorFactory[] combiningFactories,
List<String> dictionary
)
{
final SettableColumnSelectorFactory settableColumnSelectorFactory =
new SettableColumnSelectorFactory(combiningFactories);
final StreamingMergeSortedGrouper<KeyType> grouper = new StreamingMergeSortedGrouper<>(
Suppliers.ofInstance(combineBuffer),
combineKeySerdeFactory.factorizeWithDictionary(dictionary),
settableColumnSelectorFactory,
combiningFactories,
queryTimeoutAt
);
grouper.init(); // init() must be called before iterator(), so cannot be called inside the below callable.
final ListenableFuture future = executor.submit(
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Void call() throws Exception
{
try (
CloseableIterator<Entry<KeyType>> mergedIterator = CloseableIterators.mergeSorted(
iterators,
keyObjComparator
)
) {
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();
settableColumnSelectorFactory.set(next.values);
grouper.aggregate(next.key); // grouper always returns ok or throws an exception
settableColumnSelectorFactory.set(null);
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
grouper.finish();
return null;
}
}
);
return new Pair<>(grouper.iterator(), future);
}
private static class SettableColumnSelectorFactory implements ColumnSelectorFactory
{
private static final int UNKNOWN_COLUMN_INDEX = -1;
private final Object2IntMap<String> columnIndexMap;
private Object[] values;
SettableColumnSelectorFactory(AggregatorFactory[] aggregatorFactories)
{
columnIndexMap = new Object2IntArrayMap<>(aggregatorFactories.length);
columnIndexMap.defaultReturnValue(UNKNOWN_COLUMN_INDEX);
for (int i = 0; i < aggregatorFactories.length; i++) {
columnIndexMap.put(aggregatorFactories[i].getName(), i);
}
}
public void set(Object[] values)
{
this.values = values;
}
private int checkAndGetColumnIndex(String columnName)
{
final int columnIndex = columnIndexMap.getInt(columnName);
Preconditions.checkState(
columnIndex != UNKNOWN_COLUMN_INDEX,
"Cannot find a proper column index for column[%s]",
columnName
);
return columnIndex;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
throw new UnsupportedOperationException();
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
return new ObjectColumnSelector()
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// do nothing
}
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object getObject()
{
return values[checkAndGetColumnIndex(columnName)];
}
};
}
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import io.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
import java.nio.ByteBuffer;
interface RowBasedKeySerdeHelper
{
/**
* @return The size in bytes for a value of the column handled by this SerdeHelper.
*/
int getKeyBufferValueSize();
/**
* Read a value from RowBasedKey at `idx` and put the value at the current position of RowBasedKeySerde's keyBuffer.
* advancing the position by the size returned by getKeyBufferValueSize().
*
* If an internal resource limit has been reached and the value could not be added to the keyBuffer,
* (e.g., maximum dictionary size exceeded for Strings), this method returns false.
*
* @param key RowBasedKey containing the grouping key values for a row.
* @param idx Index of the grouping key column within that this SerdeHelper handles
*
* @return true if the value was added to the key, false otherwise
*/
boolean putToKeyBuffer(RowBasedKey key, int idx);
/**
* Read a value from a ByteBuffer containing a grouping key in the same format as RowBasedKeySerde's keyBuffer and
* put the value in `dimValues` at `dimValIdx`.
*
* The value to be read resides in the buffer at position (`initialOffset` + the SerdeHelper's keyBufferPosition).
*
* @param buffer ByteBuffer containing an array of grouping keys for a row
* @param initialOffset Offset where non-timestamp grouping key columns start, needed because timestamp is not
* always included in the buffer.
* @param dimValIdx Index within dimValues to store the value read from the buffer
* @param dimValues Output array containing grouping key values for a row
*/
void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues);
/**
* Return a {@link BufferComparator} to compare keys stored in ByteBuffer.
*/
BufferComparator getBufferComparator();
}

View File

@ -27,8 +27,10 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.BaseQuery;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
@ -36,15 +38,16 @@ import io.druid.segment.ColumnSelectorFactory;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Grouper based around a single underlying {@link BufferHashGrouper}. Not thread-safe.
@ -67,7 +70,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
private final Comparator<Grouper.Entry<KeyType>> defaultOrderKeyObjComparator;
private final List<File> files = Lists.newArrayList();
private final List<Closeable> closeables = Lists.newArrayList();
private final List<File> dictionaryFiles = Lists.newArrayList();
private final boolean sortHasNonGroupingFields;
private boolean spillingAllowed = false;
@ -164,7 +167,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
{
final AggregateResult result = grouper.aggregate(key, keyHash);
if (result.isOk() || temporaryStorage.maxSize() <= 0 || !spillingAllowed) {
if (result.isOk() || !spillingAllowed || temporaryStorage.maxSize() <= 0) {
return result;
} else {
// Warning: this can potentially block up a processing thread for a while.
@ -197,71 +200,115 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
deleteFiles();
}
/**
* Returns a dictionary of string keys added to this grouper. Note that the dictionary of keySerde is spilled on
* local storage whenever the inner grouper is spilled. If there are spilled dictionaries, this method loads them
* from disk and returns a merged dictionary.
*
* @return a dictionary which is a list of unique strings
*/
public List<String> mergeAndGetDictionary()
{
final Set<String> mergedDictionary = new HashSet<>();
mergedDictionary.addAll(keySerde.getDictionary());
for (File dictFile : dictionaryFiles) {
try (
final MappingIterator<String> dictIterator = spillMapper.readValues(
spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(dictFile))),
spillMapper.getTypeFactory().constructType(String.class)
)
) {
while (dictIterator.hasNext()) {
mergedDictionary.add(dictIterator.next());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return new ArrayList<>(mergedDictionary);
}
public void setSpillingAllowed(final boolean spillingAllowed)
{
this.spillingAllowed = spillingAllowed;
}
@Override
public Iterator<Entry<KeyType>> iterator(final boolean sorted)
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
final List<Iterator<Entry<KeyType>>> iterators = new ArrayList<>(1 + files.size());
final List<CloseableIterator<Entry<KeyType>>> iterators = new ArrayList<>(1 + files.size());
iterators.add(grouper.iterator(sorted));
final Closer closer = Closer.create();
for (final File file : files) {
final MappingIterator<Entry<KeyType>> fileIterator = read(file, keySerde.keyClazz());
iterators.add(
Iterators.transform(
fileIterator,
new Function<Entry<KeyType>, Entry<KeyType>>()
{
@Override
public Entry<KeyType> apply(Entry<KeyType> entry)
{
final Object[] deserializedValues = new Object[entry.getValues().length];
for (int i = 0; i < deserializedValues.length; i++) {
deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]);
if (deserializedValues[i] instanceof Integer) {
// Hack to satisfy the groupBy unit tests; perhaps we could do better by adjusting Jackson config.
deserializedValues[i] = ((Integer) deserializedValues[i]).longValue();
CloseableIterators.withEmptyBaggage(
Iterators.transform(
fileIterator,
new Function<Entry<KeyType>, Entry<KeyType>>()
{
@Override
public Entry<KeyType> apply(Entry<KeyType> entry)
{
final Object[] deserializedValues = new Object[entry.getValues().length];
for (int i = 0; i < deserializedValues.length; i++) {
deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]);
if (deserializedValues[i] instanceof Integer) {
// Hack to satisfy the groupBy unit tests; perhaps we could do better by adjusting Jackson config.
deserializedValues[i] = ((Integer) deserializedValues[i]).longValue();
}
}
return new Entry<>(entry.getKey(), deserializedValues);
}
}
return new Entry<>(entry.getKey(), deserializedValues);
}
}
)
)
);
closeables.add(fileIterator);
closer.register(fileIterator);
}
final Iterator<Entry<KeyType>> baseIterator;
if (sortHasNonGroupingFields) {
return Groupers.mergeIterators(iterators, defaultOrderKeyObjComparator);
baseIterator = CloseableIterators.mergeSorted(iterators, defaultOrderKeyObjComparator);
} else {
return Groupers.mergeIterators(iterators, sorted ? keyObjComparator : null);
baseIterator = sorted ?
CloseableIterators.mergeSorted(iterators, keyObjComparator) :
CloseableIterators.concat(iterators);
}
return CloseableIterators.wrap(baseIterator, closer);
}
private void spill() throws IOException
{
final File outFile;
try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
files.add(spill(iterator));
dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
grouper.reset();
}
}
private <T> File spill(Iterator<T> iterator) throws IOException
{
try (
final LimitedTemporaryStorage.LimitedOutputStream out = temporaryStorage.createFile();
final LZ4BlockOutputStream compressedOut = new LZ4BlockOutputStream(out);
final JsonGenerator jsonGenerator = spillMapper.getFactory().createGenerator(compressedOut)
) {
outFile = out.getFile();
final Iterator<Entry<KeyType>> it = grouper.iterator(true);
while (it.hasNext()) {
while (iterator.hasNext()) {
BaseQuery.checkInterrupted();
jsonGenerator.writeObject(it.next());
jsonGenerator.writeObject(iterator.next());
}
}
files.add(outFile);
grouper.reset();
return out.getFile();
}
}
private MappingIterator<Entry<KeyType>> read(final File file, final Class<KeyType> keyClazz)
@ -279,10 +326,6 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
private void deleteFiles()
{
for (Closeable closeable : closeables) {
// CloseQuietly is OK on readable streams
CloseQuietly.close(closeable);
}
for (final File file : files) {
temporaryStorage.delete(file);
}

View File

@ -0,0 +1,516 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.QueryContexts;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A streaming grouper which can aggregate sorted inputs. This grouper can aggregate while its iterator is being
* consumed. The aggregation thread and the iterating thread can be different.
*
* This grouper is backed by an off-heap circular array. The reading thread is able to read data from an array slot
* only when aggregation for the grouping key correspoing to that slot is finished. Since the reading and writing
* threads cannot access the same array slot at the same time, they can read/write data without contention.
*
* This class uses the spinlock for waiting for at least one slot to become available when the array is empty or full.
* If the array is empty, the reading thread waits for the aggregation for an array slot is finished. If the array is
* full, the writing thread waits for the reading thread to read at least one aggregate from the array.
*/
public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
{
private static final Logger LOG = new Logger(StreamingMergeSortedGrouper.class);
private static final long DEFAULT_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(5); // default timeout for spinlock
// Threashold time for spinlocks in increaseWriteIndex() and increaseReadIndex(). The waiting thread calls
// Thread.yield() after this threadhold time elapses.
private static final long SPIN_FOR_TIMEOUT_THRESHOLD_NS = 1000L;
private final Supplier<ByteBuffer> bufferSupplier;
private final KeySerde<KeyType> keySerde;
private final BufferAggregator[] aggregators;
private final int[] aggregatorOffsets;
private final int keySize;
private final int recordSize; // size of (key + all aggregates)
// Timeout for the current query.
// The query must fail with a timeout exception if System.nanoTime() >= queryTimeoutAtNs. This is used in the
// spinlocks to prevent the writing thread from being blocked if the iterator of this grouper is not consumed due to
// some failures which potentially makes the whole system being paused.
private final long queryTimeoutAtNs;
private final boolean hasQueryTimeout;
// Below variables are initialized when init() is called.
private ByteBuffer buffer;
private int maxNumSlots;
private boolean initialized;
/**
* Indicate that this grouper consumed the last input or not. The writing thread must set this value to true by
* calling {@link #finish()} when it's done. This variable is always set by the writing thread and read by the
* reading thread.
*/
private volatile boolean finished;
/**
* Current write index of the array. This points to the array slot where the aggregation is currently performed. Its
* initial value is -1 which means any data are not written yet. Since it's assumed that the input is sorted by the
* grouping key, this variable is moved to the next slot whenever a new grouping key is found. Once it reaches the
* last slot of the array, it moves to the first slot.
*
* This is always moved ahead of {@link #nextReadIndex}. If the array is full, this variable
* cannot be moved until {@link #nextReadIndex} is moved. See {@link #increaseWriteIndex()} for more details. This
* variable is always incremented by the writing thread and read by both the writing and the reading threads.
*/
private volatile int curWriteIndex;
/**
* Next read index of the array. This points to the array slot which the reading thread will read next. Its initial
* value is -1 which means any data are not read yet. This variable can point an array slot only when the aggregation
* for that slot is finished. Once it reaches the last slot of the array, it moves to the first slot.
*
* This always follows {@link #curWriteIndex}. If the array is empty, this variable cannot be moved until the
* aggregation for at least one grouping key is finished which in turn {@link #curWriteIndex} is moved. See
* {@link #iterator()} for more details. This variable is always incremented by the reading thread and read by both
* the writing and the reading threads.
*/
private volatile int nextReadIndex;
/**
* Returns the minimum buffer capacity required for this grouper. This grouper keeps track read/write indexes
* and they cannot point the same array slot at the same time. Since the read/write indexes move circularly, one
* extra slot is needed in addition to the read/write slots. Finally, the required minimum buffer capacity is
* 3 * record size.
*
* @return required minimum buffer capacity
*/
public static <KeyType> int requiredBufferCapacity(
KeySerde<KeyType> keySerde,
AggregatorFactory[] aggregatorFactories
)
{
int recordSize = keySerde.keySize();
for (AggregatorFactory aggregatorFactory : aggregatorFactories) {
recordSize += aggregatorFactory.getMaxIntermediateSize();
}
return recordSize * 3;
}
StreamingMergeSortedGrouper(
final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
final long queryTimeoutAtMs
)
{
this.bufferSupplier = bufferSupplier;
this.keySerde = keySerde;
this.aggregators = new BufferAggregator[aggregatorFactories.length];
this.aggregatorOffsets = new int[aggregatorFactories.length];
this.keySize = keySerde.keySize();
int offset = keySize;
for (int i = 0; i < aggregatorFactories.length; i++) {
aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory);
aggregatorOffsets[i] = offset;
offset += aggregatorFactories[i].getMaxIntermediateSize();
}
this.recordSize = offset;
// queryTimeoutAtMs comes from System.currentTimeMillis(), but we should use System.nanoTime() to check timeout in
// this class. See increaseWriteIndex() and increaseReadIndex().
this.hasQueryTimeout = queryTimeoutAtMs != QueryContexts.NO_TIMEOUT;
final long timeoutNs = hasQueryTimeout ?
TimeUnit.MILLISECONDS.toNanos(queryTimeoutAtMs - System.currentTimeMillis()) :
QueryContexts.NO_TIMEOUT;
this.queryTimeoutAtNs = System.nanoTime() + timeoutNs;
}
@Override
public void init()
{
if (!initialized) {
buffer = bufferSupplier.get();
maxNumSlots = buffer.capacity() / recordSize;
Preconditions.checkState(
maxNumSlots > 2,
"Buffer[%s] should be large enough to store at least three records[%s]",
buffer.capacity(),
recordSize
);
reset();
initialized = true;
}
}
@Override
public boolean isInitialized()
{
return initialized;
}
@Override
public AggregateResult aggregate(KeyType key, int notUsed)
{
return aggregate(key);
}
@Override
public AggregateResult aggregate(KeyType key)
{
try {
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}
final int prevRecordOffset = curWriteIndex * recordSize;
if (curWriteIndex == -1 || !keyEquals(keyBuffer, buffer, prevRecordOffset)) {
// Initialize a new slot for the new key. This may be potentially blocked if the array is full until at least
// one slot becomes available.
initNewSlot(keyBuffer);
}
final int curRecordOffset = curWriteIndex * recordSize;
for (int i = 0; i < aggregatorOffsets.length; i++) {
aggregators[i].aggregate(buffer, curRecordOffset + aggregatorOffsets[i]);
}
return AggregateResult.ok();
}
catch (RuntimeException e) {
finished = true;
throw e;
}
}
/**
* Checks two keys contained in the given buffers are same.
*
* @param curKeyBuffer the buffer for the given key from {@link #aggregate(Object)}
* @param buffer the whole array buffer
* @param bufferOffset the key offset of the buffer
*
* @return true if the two buffers are same.
*/
private boolean keyEquals(ByteBuffer curKeyBuffer, ByteBuffer buffer, int bufferOffset)
{
// Since this method is frequently called per each input row, the compare performance matters.
int i = 0;
for (; i + Long.BYTES <= keySize; i += Long.BYTES) {
if (curKeyBuffer.getLong(i) != buffer.getLong(bufferOffset + i)) {
return false;
}
}
if (i + Integer.BYTES <= keySize) {
// This can be called at most once because we already compared using getLong() in the above.
if (curKeyBuffer.getInt(i) != buffer.getInt(bufferOffset + i)) {
return false;
}
i += Integer.BYTES;
}
for (; i < keySize; i++) {
if (curKeyBuffer.get(i) != buffer.get(bufferOffset + i)) {
return false;
}
}
return true;
}
/**
* Initialize a new slot for a new grouping key. This may be potentially blocked if the array is full until at least
* one slot becomes available.
*/
private void initNewSlot(ByteBuffer newKey)
{
// Wait if the array is full and increase curWriteIndex
increaseWriteIndex();
final int recordOffset = recordSize * curWriteIndex;
buffer.position(recordOffset);
buffer.put(newKey);
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].init(buffer, recordOffset + aggregatorOffsets[i]);
}
}
/**
* Wait for {@link #nextReadIndex} to be moved if necessary and move {@link #curWriteIndex}.
*/
private void increaseWriteIndex()
{
final long startAtNs = System.nanoTime();
final long queryTimeoutAtNs = getQueryTimeoutAtNs(startAtNs);
final long spinTimeoutAtNs = startAtNs + SPIN_FOR_TIMEOUT_THRESHOLD_NS;
long timeoutNs = queryTimeoutAtNs - startAtNs;
long spinTimeoutNs = SPIN_FOR_TIMEOUT_THRESHOLD_NS;
// In the below, we check that the array is full and wait for at least one slot to become available.
//
// nextReadIndex is a volatile variable and the changes on it are continuously checked until they are seen in
// the while loop. See the following links.
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-8.html#jls-8.3.1.4
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5
// * https://stackoverflow.com/questions/11761552/detailed-semantics-of-volatile-regarding-timeliness-of-visibility
if (curWriteIndex == maxNumSlots - 1) {
// We additionally check that nextReadIndex is -1 here because the writing thread should wait for the reading
// thread to start reading only when the writing thread tries to overwrite the first slot for the first time.
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
Thread.yield();
}
long now = System.nanoTime();
timeoutNs = queryTimeoutAtNs - now;
spinTimeoutNs = spinTimeoutAtNs - now;
}
// Changes on nextReadIndex happens-before changing curWriteIndex.
curWriteIndex = 0;
} else {
final int nextWriteIndex = curWriteIndex + 1;
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextWriteIndex == nextReadIndex) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
Thread.yield();
}
long now = System.nanoTime();
timeoutNs = queryTimeoutAtNs - now;
spinTimeoutNs = spinTimeoutAtNs - now;
}
// Changes on nextReadIndex happens-before changing curWriteIndex.
curWriteIndex = nextWriteIndex;
}
}
@Override
public void reset()
{
curWriteIndex = -1;
nextReadIndex = -1;
finished = false;
}
@Override
public void close()
{
for (BufferAggregator aggregator : aggregators) {
try {
aggregator.close();
}
catch (Exception e) {
LOG.warn(e, "Could not close aggregator [%s], skipping.", aggregator);
}
}
}
/**
* Signal that no more inputs are added. Must be called after {@link #aggregate(Object)} is called for the last input.
*/
public void finish()
{
increaseWriteIndex();
// Once finished is set, curWriteIndex must not be changed. This guarantees that the remaining number of items in
// the array is always decreased as the reading thread proceeds. See hasNext() and remaining() below.
finished = true;
}
/**
* Return a sorted iterator. This method can be called safely while writing, and the iterating thread and the writing
* thread can be different. The result iterator always returns sorted results. This method should be called only one
* time per grouper.
*
* @return a sorted iterator
*/
public CloseableIterator<Entry<KeyType>> iterator()
{
if (!initialized) {
throw new ISE("Grouper should be initialized first");
}
return new CloseableIterator<Entry<KeyType>>()
{
{
// Wait for some data to be ready and initialize nextReadIndex.
increaseReadIndexTo(0);
}
@Override
public boolean hasNext()
{
// If setting finished happens-before the below check, curWriteIndex isn't changed anymore and thus remainig()
// can be computed safely because nextReadIndex is changed only by the reading thread.
// Otherwise, hasNext() always returns true.
//
// The below line can be executed between increasing curWriteIndex and setting finished in
// StreamingMergeSortedGrouper.finish(), but it is also a valid case because there should be at least one slot
// which is not read yet before finished is set.
return !finished || remaining() > 0;
}
/**
* Calculate the number of remaining items in the array. Must be called only when
* {@link StreamingMergeSortedGrouper#finished} is true.
*
* @return the number of remaining items
*/
private int remaining()
{
if (curWriteIndex >= nextReadIndex) {
return curWriteIndex - nextReadIndex;
} else {
return (maxNumSlots - nextReadIndex) + curWriteIndex;
}
}
@Override
public Entry<KeyType> next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
// Here, nextReadIndex should be valid which means:
// - a valid array index which should be >= 0 and < maxNumSlots
// - an index of the array slot where the aggregation for the corresponding grouping key is done
// - an index of the array slot which is not read yet
final int recordOffset = recordSize * nextReadIndex;
final KeyType key = keySerde.fromByteBuffer(buffer, recordOffset);
final Object[] values = new Object[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
values[i] = aggregators[i].get(buffer, recordOffset + aggregatorOffsets[i]);
}
final int targetIndex = nextReadIndex == maxNumSlots - 1 ? 0 : nextReadIndex + 1;
// Wait if the array is empty until at least one slot becomes available for read, and then increase
// nextReadIndex.
increaseReadIndexTo(targetIndex);
return new Entry<>(key, values);
}
/**
* Wait for {@link StreamingMergeSortedGrouper#curWriteIndex} to be moved if necessary and move
* {@link StreamingMergeSortedGrouper#nextReadIndex}.
*
* @param target the target index {@link StreamingMergeSortedGrouper#nextReadIndex} will move to
*/
private void increaseReadIndexTo(int target)
{
// Check that the array is empty and wait for at least one slot to become available.
//
// curWriteIndex is a volatile variable and the changes on it are continuously checked until they are seen in
// the while loop. See the following links.
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-8.html#jls-8.3.1.4
// * http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5
// * https://stackoverflow.com/questions/11761552/detailed-semantics-of-volatile-regarding-timeliness-of-visibility
final long startAtNs = System.nanoTime();
final long queryTimeoutAtNs = getQueryTimeoutAtNs(startAtNs);
final long spinTimeoutAtNs = startAtNs + SPIN_FOR_TIMEOUT_THRESHOLD_NS;
long timeoutNs = queryTimeoutAtNs - startAtNs;
long spinTimeoutNs = SPIN_FOR_TIMEOUT_THRESHOLD_NS;
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((curWriteIndex == -1 || target == curWriteIndex) &&
!finished && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
throw new RuntimeException(new TimeoutException());
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
Thread.yield();
}
long now = System.nanoTime();
timeoutNs = queryTimeoutAtNs - now;
spinTimeoutNs = spinTimeoutAtNs - now;
}
// Changes on curWriteIndex happens-before changing nextReadIndex.
nextReadIndex = target;
}
@Override
public void close() throws IOException
{
// do nothing
}
};
}
private long getQueryTimeoutAtNs(long startAtNs)
{
return hasQueryTimeout ? queryTimeoutAtNs : startAtNs + DEFAULT_TIMEOUT_NS;
}
/**
* Return a sorted iterator. This method can be called safely while writing and iterating thread and writing thread
* can be different. The result iterator always returns sorted results. This method should be called only one time
* per grouper.
*
* @param sorted not used
*
* @return a sorted iterator
*/
@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
return iterator();
}
}

View File

@ -338,6 +338,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
queryWatcher,
queryRunners,
processingConfig.getNumThreads(),
bufferPool,
mergeBufferPool,
processingConfig.intermediateComputeSizeBytes(),
spillMapper,

View File

@ -296,6 +296,26 @@ public class GroupByQueryRunnerTest
return "v2SmallDictionary";
}
};
final GroupByQueryConfig v2ParallelCombineConfig = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
@Override
public int getNumParallelCombineThreads()
{
return DEFAULT_PROCESSING_CONFIG.getNumThreads();
}
@Override
public String toString()
{
return "v2ParallelCombine";
}
};
v1Config.setMaxIntermediateRows(10000);
v1SingleThreadedConfig.setMaxIntermediateRows(10000);
@ -305,7 +325,8 @@ public class GroupByQueryRunnerTest
v1SingleThreadedConfig,
v2Config,
v2SmallBufferConfig,
v2SmallDictionaryConfig
v2SmallDictionaryConfig,
v2ParallelCombineConfig
);
}

View File

@ -20,14 +20,19 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.collections.ResourceHolder;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import io.druid.query.groupby.epinephelinae.Grouper.KeySerde;
import io.druid.query.groupby.epinephelinae.Grouper.KeySerdeFactory;
import io.druid.segment.ColumnSelectorFactory;
@ -35,44 +40,171 @@ import io.druid.segment.ColumnValueSelector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.column.ColumnCapabilities;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@RunWith(Parameterized.class)
public class ConcurrentGrouperTest
{
private static final ExecutorService service = Executors.newFixedThreadPool(8);
private static final int BYTE_BUFFER_SIZE = 192;
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(256);
private static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final Supplier<ResourceHolder<ByteBuffer>> COMBINE_BUFFER_SUPPLIER = new TestBufferSupplier();
private static final ColumnSelectorFactory NULL_FACTORY = new TestColumnSelectorFactory();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private Supplier<ByteBuffer> bufferSupplier;
@Parameters(name = "bufferSize={0}")
public static Collection<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{1024 * 32},
new Object[]{1024 * 1024}
);
}
@AfterClass
public static void teardown()
{
service.shutdown();
SERVICE.shutdown();
}
private static final Supplier<ByteBuffer> bufferSupplier = new Supplier<ByteBuffer>()
public ConcurrentGrouperTest(int bufferSize)
{
private final AtomicBoolean called = new AtomicBoolean(false);
bufferSupplier = new Supplier<ByteBuffer>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
private ByteBuffer buffer;
@Override
public ByteBuffer get()
{
if (called.compareAndSet(false, true)) {
buffer = ByteBuffer.allocate(bufferSize);
}
return buffer;
}
};
}
@Test()
public void testAggregate() throws InterruptedException, ExecutionException, IOException
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
COMBINE_BUFFER_SUPPLIER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
new DefaultObjectMapper(),
8,
null,
false,
MoreExecutors.listeningDecorator(SERVICE),
0,
false,
0,
4,
8
);
grouper.init();
final int numRows = 1000;
Future<?>[] futures = new Future[8];
for (int i = 0; i < 8; i++) {
futures[i] = SERVICE.submit(new Runnable()
{
@Override
public void run()
{
for (long i = 0; i < numRows; i++) {
grouper.aggregate(i);
}
}
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
final CloseableIterator<Entry<Long>> iterator = grouper.iterator(true);
final List<Entry<Long>> actual = Lists.newArrayList(iterator);
iterator.close();
Assert.assertTrue(!TEST_RESOURCE_HOLDER.taken || TEST_RESOURCE_HOLDER.closed);
final List<Entry<Long>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {
expected.add(new Entry<>(i, new Object[]{8L}));
}
Assert.assertEquals(expected, actual);
grouper.close();
}
static class TestResourceHolder implements ResourceHolder<ByteBuffer>
{
private boolean taken;
private boolean closed;
private ByteBuffer buffer;
TestResourceHolder(int bufferSize)
{
buffer = ByteBuffer.allocate(bufferSize);
}
@Override
public ByteBuffer get()
{
if (called.compareAndSet(false, true)) {
return ByteBuffer.allocate(BYTE_BUFFER_SIZE);
} else {
throw new IAE("should be called once");
}
taken = true;
return buffer;
}
};
private static final KeySerdeFactory<Long> keySerdeFactory = new KeySerdeFactory<Long>()
@Override
public void close()
{
closed = true;
}
}
static class TestKeySerdeFactory implements KeySerdeFactory<Long>
{
@Override
public long getMaxDictionarySize()
{
return 0;
}
@Override
public KeySerde<Long> factorize()
{
@ -92,6 +224,12 @@ public class ConcurrentGrouperTest
return Long.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(Long key)
{
@ -134,6 +272,12 @@ public class ConcurrentGrouperTest
};
}
@Override
public KeySerde<Long> factorizeWithDictionary(List<String> dictionary)
{
return factorize();
}
@Override
public Comparator<Grouper.Entry<Long>> objectComparator(boolean forceDefaultOrder)
{
@ -146,9 +290,24 @@ public class ConcurrentGrouperTest
}
};
}
};
}
private static final ColumnSelectorFactory null_factory = new ColumnSelectorFactory()
private static class TestBufferSupplier implements Supplier<ResourceHolder<ByteBuffer>>
{
private final AtomicBoolean called = new AtomicBoolean(false);
@Override
public ResourceHolder<ByteBuffer> get()
{
if (called.compareAndSet(false, true)) {
return TEST_RESOURCE_HOLDER;
} else {
throw new IAE("should be called once");
}
}
}
private static class TestColumnSelectorFactory implements ColumnSelectorFactory
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
@ -167,51 +326,5 @@ public class ConcurrentGrouperTest
{
return null;
}
};
@Test(timeout = 5000L)
public void testAggregate() throws InterruptedException, ExecutionException
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
keySerdeFactory,
null_factory,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
24,
0.7f,
1,
null,
null,
8,
null,
false,
MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "concurrent-grouper-test-%d")),
0,
false,
0,
BYTE_BUFFER_SIZE
);
Future<?>[] futures = new Future[8];
for (int i = 0; i < 8; i++) {
futures[i] = service.submit(new Runnable()
{
@Override
public void run()
{
grouper.init();
for (long i = 0; i < 100; i++) {
grouper.aggregate(0L);
}
}
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
grouper.close();
}
}

View File

@ -19,11 +19,13 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.druid.query.aggregation.AggregatorFactory;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class IntKeySerde implements Grouper.KeySerde<Integer>
{
@ -66,6 +68,12 @@ public class IntKeySerde implements Grouper.KeySerde<Integer>
return Integer.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(Integer key)
{

View File

@ -0,0 +1,147 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.groupby.epinephelinae.ConcurrentGrouperTest.TestKeySerdeFactory;
import io.druid.query.groupby.epinephelinae.ConcurrentGrouperTest.TestResourceHolder;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import io.druid.query.groupby.epinephelinae.Grouper.KeySerdeFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
public class ParallelCombinerTest
{
private static final int THREAD_NUM = 8;
private static final ExecutorService SERVICE = Execs.multiThreaded(THREAD_NUM, "parallel-combiner-test-%d");
private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(512);
private static final KeySerdeFactory<Long> KEY_SERDE_FACTORY = new TestKeySerdeFactory();
private static final Supplier<ResourceHolder<ByteBuffer>> COMBINE_BUFFER_SUPPLIER =
new Supplier<ResourceHolder<ByteBuffer>>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@Override
public ResourceHolder<ByteBuffer> get()
{
if (called.compareAndSet(false, true)) {
return TEST_RESOURCE_HOLDER;
} else {
throw new IAE("should be called once");
}
}
};
private static final class TestIterator implements CloseableIterator<Entry<Long>>
{
private final Iterator<Entry<Long>> innerIterator;
private boolean closed;
TestIterator(Iterator<Entry<Long>> innerIterator)
{
this.innerIterator = innerIterator;
}
@Override
public boolean hasNext()
{
return innerIterator.hasNext();
}
@Override
public Entry<Long> next()
{
return innerIterator.next();
}
public boolean isClosed()
{
return closed;
}
@Override
public void close() throws IOException
{
if (!closed) {
closed = true;
}
}
}
@AfterClass
public static void teardown()
{
SERVICE.shutdownNow();
}
@Test
public void testCombine() throws IOException
{
final ParallelCombiner<Long> combiner = new ParallelCombiner<>(
COMBINE_BUFFER_SUPPLIER,
new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()},
KEY_SERDE_FACTORY,
MoreExecutors.listeningDecorator(SERVICE),
false,
THREAD_NUM,
0, // default priority
0, // default timeout
4
);
final int numRows = 1000;
final List<Entry<Long>> baseIterator = new ArrayList<>(numRows);
for (long i = 0; i < numRows; i++) {
baseIterator.add(new Entry<>(i, new Object[]{i * 10}));
}
final int leafNum = 8;
final List<TestIterator> iterators = new ArrayList<>(leafNum);
for (int i = 0; i < leafNum; i++) {
iterators.add(new TestIterator(baseIterator.iterator()));
}
try (final CloseableIterator<Entry<Long>> iterator = combiner.combine(iterators, new ArrayList<>())) {
long expectedKey = 0;
while (iterator.hasNext()) {
Assert.assertEquals(new Entry<>(expectedKey, new Object[]{expectedKey++ * leafNum * 10}), iterator.next());
}
}
iterators.forEach(it -> Assert.assertTrue(it.isClosed()));
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.data.input.MapBasedRow;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
public class StreamingMergeSortedGrouperTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testAggregate()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(6);
grouper.aggregate(6);
grouper.aggregate(6);
grouper.aggregate(10);
grouper.aggregate(12);
grouper.aggregate(12);
grouper.finish();
final List<Entry<Integer>> expected = ImmutableList.of(
new Grouper.Entry<>(6, new Object[]{30L, 3L}),
new Grouper.Entry<>(10, new Object[]{10L, 1L}),
new Grouper.Entry<>(12, new Object[]{20L, 2L})
);
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(true));
Assert.assertEquals(
expected,
unsortedEntries
);
}
@Test(timeout = 5000L)
public void testEmptyIterator()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
grouper.finish();
Assert.assertTrue(!grouper.iterator(true).hasNext());
}
@Test(timeout = 5000L)
public void testStreamingAggregateWithLargeBuffer() throws ExecutionException, InterruptedException
{
testStreamingAggregate(1024);
}
@Test(timeout = 5000L)
public void testStreamingAggregateWithMinimumBuffer() throws ExecutionException, InterruptedException
{
testStreamingAggregate(60);
}
private void testStreamingAggregate(int bufferSize) throws ExecutionException, InterruptedException
{
final ExecutorService exec = Execs.multiThreaded(2, "merge-sorted-grouper-test-%d");
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, bufferSize);
final List<Entry<Integer>> expected = new ArrayList<>(1024);
for (int i = 0; i < 1024; i++) {
expected.add(new Entry<>(i, new Object[]{100L, 10L}));
}
try {
final Future future = exec.submit(() -> {
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
for (int i = 0; i < 1024; i++) {
for (int j = 0; j < 10; j++) {
grouper.aggregate(i);
}
}
grouper.finish();
});
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(true));
final List<Entry<Integer>> actual = Ordering.from((Comparator<Entry<Integer>>) (o1, o2) -> Ints.compare(o1.getKey(), o2.getKey()))
.sortedCopy(unsortedEntries);
if (!actual.equals(expected)) {
future.get(); // Check there is an exception occured
Assert.fail();
}
}
finally {
exec.shutdownNow();
}
}
@Test
public void testNotEnoughBuffer()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Buffer[50] should be large enough to store at least three records[20]");
newGrouper(GrouperTestUtil.newColumnSelectorFactory(), 50);
}
@Test
public void testTimeout()
{
expectedException.expect(RuntimeException.class);
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper<Integer> grouper = newGrouper(columnSelectorFactory, 60);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(6);
grouper.iterator();
}
private StreamingMergeSortedGrouper<Integer> newGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize
)
{
final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
final StreamingMergeSortedGrouper<Integer> grouper = new StreamingMergeSortedGrouper<>(
Suppliers.ofInstance(buffer),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,
new AggregatorFactory[]{
new LongSumAggregatorFactory("valueSum", "value"),
new CountAggregatorFactory("count")
},
System.currentTimeMillis() + 1000L
);
grouper.init();
return grouper;
}
}