groupBy: Omit timestamp from merge key when granularity = all. (#3416)

Fixes #3412.
This commit is contained in:
Gian Merlino 2016-09-01 09:02:54 -07:00 committed by Slim
parent 6d25c5e053
commit 8ed1894488
2 changed files with 98 additions and 45 deletions

View File

@ -41,6 +41,7 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
@ -49,6 +50,7 @@ import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
import java.io.Closeable;
import java.io.IOException;
@ -78,7 +80,9 @@ public class RowBasedGrouperHelper
Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint");
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final DateTime fudgeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query);
final Grouper.KeySerdeFactory<RowBasedKey> keySerdeFactory = new RowBasedKeySerdeFactory(
fudgeTimestamp,
query.getDimensions().size(),
querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint)
);
@ -295,13 +299,15 @@ public class RowBasedGrouperHelper
}
}
static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
private static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
{
private final DateTime fudgeTimestamp;
private final int dimCount;
private final long maxDictionarySize;
public RowBasedKeySerdeFactory(int dimCount, long maxDictionarySize)
public RowBasedKeySerdeFactory(DateTime fudgeTimestamp, int dimCount, long maxDictionarySize)
{
this.fudgeTimestamp = fudgeTimestamp;
this.dimCount = dimCount;
this.maxDictionarySize = maxDictionarySize;
}
@ -309,15 +315,16 @@ public class RowBasedGrouperHelper
@Override
public Grouper.KeySerde<RowBasedKey> factorize()
{
return new RowBasedKeySerde(dimCount, maxDictionarySize);
return new RowBasedKeySerde(fudgeTimestamp, dimCount, maxDictionarySize);
}
}
static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES;
private final DateTime fudgeTimestamp;
private final int dimCount;
private final int keySize;
private final ByteBuffer keyBuffer;
@ -331,11 +338,16 @@ public class RowBasedGrouperHelper
// dictionary id -> its position if it were sorted by dictionary value
private int[] sortableIds = null;
public RowBasedKeySerde(final int dimCount, final long maxDictionarySize)
public RowBasedKeySerde(
final DateTime fudgeTimestamp,
final int dimCount,
final long maxDictionarySize
)
{
this.fudgeTimestamp = fudgeTimestamp;
this.dimCount = dimCount;
this.maxDictionarySize = maxDictionarySize;
this.keySize = Longs.BYTES + dimCount * Ints.BYTES;
this.keySize = (fudgeTimestamp == null ? Longs.BYTES : 0) + dimCount * Ints.BYTES;
this.keyBuffer = ByteBuffer.allocate(keySize);
}
@ -355,7 +367,11 @@ public class RowBasedGrouperHelper
public ByteBuffer toByteBuffer(RowBasedKey key)
{
keyBuffer.rewind();
keyBuffer.putLong(key.getTimestamp());
if (fudgeTimestamp == null) {
keyBuffer.putLong(key.getTimestamp());
}
for (int i = 0; i < key.getDimensions().length; i++) {
final int id = addToDictionary(key.getDimensions()[i]);
if (id < 0) {
@ -363,6 +379,7 @@ public class RowBasedGrouperHelper
}
keyBuffer.putInt(id);
}
keyBuffer.flip();
return keyBuffer;
}
@ -370,10 +387,11 @@ public class RowBasedGrouperHelper
@Override
public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position)
{
final long timestamp = buffer.getLong(position);
final long timestamp = fudgeTimestamp == null ? buffer.getLong(position) : fudgeTimestamp.getMillis();
final String[] dimensions = new String[dimCount];
final int dimsPosition = fudgeTimestamp == null ? position + Longs.BYTES : position;
for (int i = 0; i < dimensions.length; i++) {
dimensions[i] = dictionary.get(buffer.getInt(position + Longs.BYTES + (Ints.BYTES * i)));
dimensions[i] = dictionary.get(buffer.getInt(dimsPosition + (Ints.BYTES * i)));
}
return new RowBasedKey(timestamp, dimensions);
}
@ -393,30 +411,52 @@ public class RowBasedGrouperHelper
}
}
return new Grouper.KeyComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
if (fudgeTimestamp == null) {
return new Grouper.KeyComparator()
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
for (int i = 0; i < dimCount; i++) {
final int cmp = Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))],
sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))]
);
if (cmp != 0) {
return cmp;
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
}
return 0;
}
};
for (int i = 0; i < dimCount; i++) {
final int cmp = Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))],
sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))]
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
};
} else {
return new Grouper.KeyComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
for (int i = 0; i < dimCount; i++) {
final int cmp = Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + (Ints.BYTES * i))],
sortableIds[rhsBuffer.getInt(rhsPosition + (Ints.BYTES * i))]
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
};
}
}
@Override

View File

@ -35,13 +35,12 @@ import io.druid.collections.BlockingPool;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Merging;
import io.druid.guice.annotations.Smile;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
@ -88,6 +87,29 @@ public class GroupByStrategyV2 implements GroupByStrategy
this.queryWatcher = queryWatcher;
}
/**
* If "query" has a single universal timestamp, return it. Otherwise return null. This is useful
* for keeping timestamps in sync across partial queries that may have different intervals.
*
* @param query the query
*
* @return universal timestamp, or null
*/
public static DateTime getUniversalTimestamp(final GroupByQuery query)
{
final QueryGranularity gran = query.getGranularity();
final String timestampStringFromContext = query.getContextValue(CTX_KEY_FUDGE_TIMESTAMP, "");
if (!timestampStringFromContext.isEmpty()) {
return new DateTime(Long.parseLong(timestampStringFromContext));
} else if (QueryGranularities.ALL.equals(gran)) {
final long timeStart = query.getIntervals().get(0).getStartMillis();
return new DateTime(gran.iterable(timeStart, timeStart + 1).iterator().next());
} else {
return null;
}
}
@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
@ -113,17 +135,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
};
// Fudge timestamp, maybe. Necessary to keep timestamps in sync across partial queries.
final QueryGranularity gran = query.getGranularity();
final String fudgeTimestamp;
if (query.getContextValue(CTX_KEY_FUDGE_TIMESTAMP, "").isEmpty() && gran instanceof AllGranularity) {
final long timeStart = query.getIntervals().get(0).getStartMillis();
fudgeTimestamp = String.valueOf(
new DateTime(gran.iterable(timeStart, timeStart + 1).iterator().next()).getMillis()
);
} else {
fudgeTimestamp = query.getContextValue(CTX_KEY_FUDGE_TIMESTAMP, "");
}
// Fudge timestamp, maybe.
final DateTime fudgeTimestamp = getUniversalTimestamp(query);
return query.applyLimit(
Sequences.map(
@ -132,7 +145,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getDimFilter(),
gran,
query.getGranularity(),
query.getDimensions(),
query.getAggregatorSpecs(),
// Don't do post aggs until the end of this method.
@ -145,7 +158,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
ImmutableMap.<String, Object>of(
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis())
)
),
responseContext