groupBy v2: Ignore timestamp completely when granularity = all, except for the final merge. (#3740)

* GroupByBenchmark: Add serde, spilling, all-gran benchmarks.

Also use more iterations.

* groupBy v2: Ignore timestamp completely when granularity = all, except for the final merge.

Specifically:

- Remove timestamp from RowBasedKey when not needed
- Set timestamp to null in MapBasedRows that are not part of the final merge.
This commit is contained in:
Gian Merlino 2016-12-06 16:17:32 -08:00 committed by Fangjin Yang
parent f995b1426f
commit b1bac9f2d3
6 changed files with 330 additions and 144 deletions

View File

@ -24,11 +24,11 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
@ -39,6 +39,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
@ -105,14 +106,14 @@ import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class GroupByBenchmark
{
@Param({"4"})
private int numSegments;
@Param({"4"})
@Param({"2", "4"})
private int numProcessingThreads;
@Param({"-1"})
@ -127,6 +128,9 @@ public class GroupByBenchmark
@Param({"v1", "v2"})
private String defaultStrategy;
@Param({"all", "day"})
private String queryGranularity;
private static final Logger log = new Logger(GroupByBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
@ -137,7 +141,7 @@ public class GroupByBenchmark
private IncrementalIndex anIncrementalIndex;
private List<QueryableIndex> queryableIndexes;
private QueryRunnerFactory factory;
private QueryRunnerFactory<Row, GroupByQuery> factory;
private BenchmarkSchemaInfo schemaInfo;
private GroupByQuery query;
@ -190,7 +194,7 @@ public class GroupByBenchmark
.setAggregatorSpecs(
queryAggs
)
.setGranularity(QueryGranularities.DAY)
.setGranularity(QueryGranularity.fromString(queryGranularity))
.build();
basicQueries.put("A", queryA);
@ -335,7 +339,7 @@ public class GroupByBenchmark
@Override
public long getMaxOnDiskStorage()
{
return 0L;
return 1_000_000_000L;
}
};
config.setSingleThreaded(false);
@ -475,29 +479,17 @@ public class GroupByBenchmark
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
),
(QueryToolChest) toolChest
);
Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
@ -507,4 +499,70 @@ public class GroupByBenchmark
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole) throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
),
(QueryToolChest) toolChest
);
final GroupByQuery spillingQuery = query.withOverriddenContext(
ImmutableMap.<String, Object>of("bufferGrouperMaxSize", 4000)
);
Sequence<Row> queryResult = theRunner.run(spillingQuery, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());
for (Row result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexWithSerde(Blackhole blackhole) throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
new SerializingQueryRunner<>(
new DefaultObjectMapper(new SmileFactory()),
Row.class,
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
)
)
),
(QueryToolChest) toolChest
);
Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());
for (Row result : results) {
blackhole.consume(result);
}
}
private List<QueryRunner<Row>> makeMultiRunners()
{
List<QueryRunner<Row>> runners = Lists.newArrayList();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
runners.add(factory.getToolchest().preMergeQueryDecoration(runner));
}
return runners;
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import java.util.Map;
public class SerializingQueryRunner<T> implements QueryRunner<T>
{
private final ObjectMapper smileMapper;
private final QueryRunner<T> baseRunner;
private final Class<T> clazz;
public SerializingQueryRunner(
ObjectMapper smileMapper,
Class<T> clazz,
QueryRunner<T> baseRunner
)
{
this.smileMapper = smileMapper;
this.clazz = clazz;
this.baseRunner = baseRunner;
}
@Override
public Sequence<T> run(
final Query<T> query,
final Map<String, Object> responseContext
)
{
return Sequences.map(
baseRunner.run(query, responseContext),
new Function<T, T>()
{
@Override
public T apply(T input)
{
try {
return smileMapper.readValue(smileMapper.writeValueAsBytes(input), clazz);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
);
}
}

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
@ -285,7 +286,18 @@ public class GroupByQuery extends BaseQuery<Row>
final Comparator<Row> timeComparator = getTimeComparator(granular);
if (sortByDimsFirst) {
if (timeComparator == null) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return compareDims(dimensions, lhs, rhs);
}
}
);
} else if (sortByDimsFirst) {
return Ordering.from(
new Comparator<Row>()
{
@ -323,7 +335,9 @@ public class GroupByQuery extends BaseQuery<Row>
private Comparator<Row> getTimeComparator(boolean granular)
{
if (granular) {
if (QueryGranularities.ALL.equals(granularity)) {
return null;
} else if (granular) {
return new Comparator<Row>()
{
@Override

View File

@ -19,7 +19,6 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
@ -135,12 +134,13 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
return false;
}
Preconditions.checkArgument(
keyBuffer.remaining() == keySize,
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}
int bucket = findBucket(
tableBuffer,

View File

@ -20,7 +20,7 @@
package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -86,9 +86,9 @@ public class RowBasedGrouperHelper
Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint");
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final DateTime fudgeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query);
final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null;
final Grouper.KeySerdeFactory<RowBasedKey> keySerdeFactory = new RowBasedKeySerdeFactory(
fudgeTimestamp,
includeTimestamp,
query.getContextSortByDimsFirst(),
query.getDimensions().size(),
querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint)
@ -150,29 +150,45 @@ public class RowBasedGrouperHelper
return null;
}
long timestamp = row.getTimestampFromEpoch();
columnSelectorFactory.setRow(row);
final int dimStart;
final Comparable[] key;
if (includeTimestamp) {
key = new Comparable[query.getDimensions().size() + 1];
final long timestamp;
if (isInputRaw) {
if (query.getGranularity() instanceof AllGranularity) {
timestamp = query.getIntervals().get(0).getStartMillis();
} else {
timestamp = query.getGranularity().truncate(timestamp);
timestamp = query.getGranularity().truncate(row.getTimestampFromEpoch());
}
} else {
timestamp = row.getTimestampFromEpoch();
}
columnSelectorFactory.setRow(row);
final String[] dimensions = new String[query.getDimensions().size()];
for (int i = 0; i < dimensions.length; i++) {
key[0] = timestamp;
dimStart = 1;
} else {
key = new Comparable[query.getDimensions().size()];
dimStart = 0;
}
for (int i = dimStart; i < key.length; i++) {
final String value;
if (isInputRaw) {
IndexedInts index = dimensionSelectors[i].getRow();
value = index.size() == 0 ? "" : dimensionSelectors[i].lookupName(index.get(0));
IndexedInts index = dimensionSelectors[i - dimStart].getRow();
value = index.size() == 0 ? "" : dimensionSelectors[i - dimStart].lookupName(index.get(0));
} else {
value = (String) row.getRaw(query.getDimensions().get(i).getOutputName());
value = (String) row.getRaw(query.getDimensions().get(i - dimStart).getOutputName());
}
dimensions[i] = Strings.nullToEmpty(value);
key[i] = Strings.nullToEmpty(value);
}
final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(timestamp, dimensions));
final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(key));
if (!didAggregate) {
// null return means grouping resources were exhausted.
return null;
@ -192,6 +208,8 @@ public class RowBasedGrouperHelper
final Closeable closeable
)
{
final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null;
return new CloseableGrouperIterator<>(
grouper,
true,
@ -202,11 +220,23 @@ public class RowBasedGrouperHelper
{
Map<String, Object> theMap = Maps.newLinkedHashMap();
// Get timestamp, maybe.
final DateTime timestamp;
final int dimStart;
if (includeTimestamp) {
timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0])));
dimStart = 1;
} else {
timestamp = null;
dimStart = 0;
}
// Add dimensions.
for (int i = 0; i < entry.getKey().getDimensions().length; i++) {
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
theMap.put(
query.getDimensions().get(i).getOutputName(),
Strings.emptyToNull(entry.getKey().getDimensions()[i])
query.getDimensions().get(i - dimStart).getOutputName(),
Strings.emptyToNull((String) entry.getKey().getKey()[i])
);
}
@ -215,10 +245,7 @@ public class RowBasedGrouperHelper
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
}
return new MapBasedRow(
query.getGranularity().toDateTime(entry.getKey().getTimestamp()),
theMap
);
return new MapBasedRow(timestamp, theMap);
}
},
closeable
@ -227,30 +254,28 @@ public class RowBasedGrouperHelper
static class RowBasedKey
{
private final long timestamp;
private final String[] dimensions;
private final Object[] key;
RowBasedKey(final Object[] key)
{
this.key = key;
}
@JsonCreator
public RowBasedKey(
// Using short key names to reduce serialized size when spilling to disk.
@JsonProperty("t") long timestamp,
@JsonProperty("d") String[] dimensions
)
public static RowBasedKey fromJsonArray(final Object[] key)
{
this.timestamp = timestamp;
this.dimensions = dimensions;
// Type info is lost during serde. We know we don't want ints as timestamps, so adjust.
if (key.length > 0 && key[0] instanceof Integer) {
key[0] = ((Integer) key[0]).longValue();
}
@JsonProperty("t")
public long getTimestamp()
{
return timestamp;
return new RowBasedKey(key);
}
@JsonProperty("d")
public String[] getDimensions()
@JsonValue
public Object[] getKey()
{
return dimensions;
return key;
}
@Override
@ -265,42 +290,32 @@ public class RowBasedGrouperHelper
RowBasedKey that = (RowBasedKey) o;
if (timestamp != that.timestamp) {
return false;
}
// Probably incorrect - comparing Object[] arrays with Arrays.equals
return Arrays.equals(dimensions, that.dimensions);
return Arrays.equals(key, that.key);
}
@Override
public int hashCode()
{
int result = (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + Arrays.hashCode(dimensions);
return result;
return Arrays.hashCode(key);
}
@Override
public String toString()
{
return "RowBasedKey{" +
"timestamp=" + timestamp +
", dimensions=" + Arrays.toString(dimensions) +
'}';
return Arrays.toString(key);
}
}
private static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
{
private final DateTime fudgeTimestamp;
private final boolean includeTimestamp;
private final boolean sortByDimsFirst;
private final int dimCount;
private final long maxDictionarySize;
public RowBasedKeySerdeFactory(DateTime fudgeTimestamp, boolean sortByDimsFirst, int dimCount, long maxDictionarySize)
RowBasedKeySerdeFactory(boolean includeTimestamp, boolean sortByDimsFirst, int dimCount, long maxDictionarySize)
{
this.fudgeTimestamp = fudgeTimestamp;
this.includeTimestamp = includeTimestamp;
this.sortByDimsFirst = sortByDimsFirst;
this.dimCount = dimCount;
this.maxDictionarySize = maxDictionarySize;
@ -309,52 +324,59 @@ public class RowBasedGrouperHelper
@Override
public Grouper.KeySerde<RowBasedKey> factorize()
{
return new RowBasedKeySerde(fudgeTimestamp, sortByDimsFirst, dimCount, maxDictionarySize);
return new RowBasedKeySerde(includeTimestamp, sortByDimsFirst, dimCount, maxDictionarySize);
}
@Override
public Comparator<RowBasedKey> objectComparator()
{
if (includeTimestamp) {
if (sortByDimsFirst) {
return new Comparator<RowBasedKey>()
{
@Override
public int compare(
RowBasedKey row1, RowBasedKey row2
)
public int compare(RowBasedKey key1, RowBasedKey key2)
{
final int cmp = compareDimsInRows(row1, row2);
final int cmp = compareDimsInRows(key1, key2, 1);
if (cmp != 0) {
return cmp;
}
return Longs.compare(row1.getTimestamp(), row2.getTimestamp());
return Longs.compare((long) key1.getKey()[0], (long) key2.getKey()[0]);
}
};
} else {
return new Comparator<RowBasedKey>()
{
@Override
public int compare(
RowBasedKey row1, RowBasedKey row2
)
public int compare(RowBasedKey key1, RowBasedKey key2)
{
final int timeCompare = Longs.compare(row1.getTimestamp(), row2.getTimestamp());
final int timeCompare = Longs.compare((long) key1.getKey()[0], (long) key2.getKey()[0]);
if (timeCompare != 0) {
return timeCompare;
}
return compareDimsInRows(row1, row2);
return compareDimsInRows(key1, key2, 1);
}
};
}
} else {
return new Comparator<RowBasedKey>()
{
@Override
public int compare(RowBasedKey key1, RowBasedKey key2)
{
return compareDimsInRows(key1, key2, 0);
}
};
}
}
private static int compareDimsInRows(RowBasedKey row1, RowBasedKey row2)
private static int compareDimsInRows(RowBasedKey key1, RowBasedKey key2, int dimStart)
{
for (int i = 0; i < row1.getDimensions().length; i++) {
final int cmp = row1.getDimensions()[i].compareTo(row2.getDimensions()[i]);
for (int i = dimStart; i < key1.getKey().length; i++) {
final int cmp = ((String) key1.getKey()[i]).compareTo((String) key2.getKey()[i]);
if (cmp != 0) {
return cmp;
}
@ -369,7 +391,7 @@ public class RowBasedGrouperHelper
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES;
private final DateTime fudgeTimestamp;
private final boolean includeTimestamp;
private final boolean sortByDimsFirst;
private final int dimCount;
private final int keySize;
@ -384,18 +406,18 @@ public class RowBasedGrouperHelper
// dictionary id -> its position if it were sorted by dictionary value
private int[] sortableIds = null;
public RowBasedKeySerde(
final DateTime fudgeTimestamp,
RowBasedKeySerde(
final boolean includeTimestamp,
final boolean sortByDimsFirst,
final int dimCount,
final long maxDictionarySize
)
{
this.fudgeTimestamp = fudgeTimestamp;
this.includeTimestamp = includeTimestamp;
this.sortByDimsFirst = sortByDimsFirst;
this.dimCount = dimCount;
this.maxDictionarySize = maxDictionarySize;
this.keySize = (fudgeTimestamp == null ? Longs.BYTES : 0) + dimCount * Ints.BYTES;
this.keySize = (includeTimestamp ? Longs.BYTES : 0) + dimCount * Ints.BYTES;
this.keyBuffer = ByteBuffer.allocate(keySize);
}
@ -416,12 +438,16 @@ public class RowBasedGrouperHelper
{
keyBuffer.rewind();
if (fudgeTimestamp == null) {
keyBuffer.putLong(key.getTimestamp());
final int dimStart;
if (includeTimestamp) {
keyBuffer.putLong((long) key.getKey()[0]);
dimStart = 1;
} else {
dimStart = 0;
}
for (int i = 0; i < key.getDimensions().length; i++) {
final int id = addToDictionary(key.getDimensions()[i]);
for (int i = dimStart; i < key.getKey().length; i++) {
final int id = addToDictionary((String) key.getKey()[i]);
if (id < 0) {
return null;
}
@ -435,13 +461,26 @@ public class RowBasedGrouperHelper
@Override
public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position)
{
final long timestamp = fudgeTimestamp == null ? buffer.getLong(position) : fudgeTimestamp.getMillis();
final String[] dimensions = new String[dimCount];
final int dimsPosition = fudgeTimestamp == null ? position + Longs.BYTES : position;
for (int i = 0; i < dimensions.length; i++) {
dimensions[i] = dictionary.get(buffer.getInt(dimsPosition + (Ints.BYTES * i)));
final int dimStart;
final Comparable[] key;
final int dimsPosition;
if (includeTimestamp) {
key = new Comparable[dimCount + 1];
key[0] = buffer.getLong(position);
dimsPosition = position + Longs.BYTES;
dimStart = 1;
} else {
key = new Comparable[dimCount];
dimsPosition = position;
dimStart = 0;
}
return new RowBasedKey(timestamp, dimensions);
for (int i = dimStart; i < key.length; i++) {
key[i] = dictionary.get(buffer.getInt(dimsPosition + (Ints.BYTES * (i - dimStart))));
}
return new RowBasedKey(key);
}
@Override
@ -459,8 +498,7 @@ public class RowBasedGrouperHelper
}
}
if (fudgeTimestamp == null) {
if (includeTimestamp) {
if (sortByDimsFirst) {
return new Grouper.KeyComparator()
{
@ -505,7 +543,6 @@ public class RowBasedGrouperHelper
}
};
}
} else {
return new Grouper.KeyComparator()
{

View File

@ -61,6 +61,7 @@ import java.util.Map;
public class GroupByStrategyV2 implements GroupByStrategy
{
public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
public static final String CTX_KEY_OUTERMOST = "groupByOutermost";
private final DruidProcessingConfig processingConfig;
private final Supplier<GroupByQueryConfig> configSupplier;
@ -158,7 +159,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
ImmutableMap.<String, Object>of(
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis())
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()),
CTX_KEY_OUTERMOST, false
)
),
responseContext
@ -168,9 +170,13 @@ public class GroupByStrategyV2 implements GroupByStrategy
@Override
public Row apply(final Row row)
{
// Maybe apply postAggregators.
// Apply postAggregators and fudgeTimestamp if present and if this is the outermost mergeResults.
if (query.getPostAggregatorSpecs().isEmpty()) {
if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)) {
return row;
}
if (query.getPostAggregatorSpecs().isEmpty() && fudgeTimestamp == null) {
return row;
}
@ -186,7 +192,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
return new MapBasedRow(row.getTimestamp(), newMap);
return new MapBasedRow(fudgeTimestamp != null ? fudgeTimestamp : row.getTimestamp(), newMap);
}
}
)