Merge pull request #2107 from jon-wei/fix_smq

More efficient SegmentMetadataQuery
This commit is contained in:
Fangjin Yang 2015-12-18 16:40:47 -08:00
commit 7019d3c421
11 changed files with 86 additions and 17 deletions

View File

@ -19,6 +19,7 @@
package io.druid.common.guava;
import com.google.common.base.Function;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
@ -37,25 +38,29 @@ public class CombiningSequence<T> implements Sequence<T>
public static <T> CombiningSequence<T> create(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
BinaryFn<T, T, T> mergeFn,
Function transformFn
)
{
return new CombiningSequence<T>(baseSequence, ordering, mergeFn);
return new CombiningSequence<T>(baseSequence, ordering, mergeFn, transformFn);
}
private final Sequence<T> baseSequence;
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;
private final Function transformFn;
public CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
BinaryFn<T, T, T> mergeFn,
Function transformFn
)
{
this.baseSequence = baseSequence;
this.ordering = ordering;
this.mergeFn = mergeFn;
this.transformFn = transformFn;
}
@Override
@ -117,6 +122,9 @@ public class CombiningSequence<T> implements Sequence<T>
@Override
public OutType get()
{
if (transformFn != null) {
return (OutType) transformFn.apply(retVal);
}
return retVal;
}

View File

@ -214,7 +214,8 @@ public class CombiningSequenceTest
return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
}
},
null
);
List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList());

View File

@ -30,7 +30,7 @@ There are several main parts to a segment metadata query:
|toInclude|A JSON Object representing what columns should be included in the result. Defaults to "all".|no|
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.html)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size"]. See section [analysisTypes](#analysistypes) for more details.|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size", "interval"]. See section [analysisTypes](#analysistypes) for more details.|no|
The format of the result is:
@ -96,7 +96,7 @@ This is a list of properties that determines the amount of information returned
By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.
There are 2 types of column analyses:
There are 3 types of column analyses:
#### cardinality
@ -107,3 +107,7 @@ There are 2 types of column analyses:
* Estimated byte size for the segment columns if they were stored in a flat format
* Estimated total segment byte size in if it was stored in a flat format
#### interval
* If present, the SegmentMetadataQuery will return the list of intervals associated with the queried segments.

View File

@ -40,7 +40,7 @@ public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRu
@Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context)
{
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query), null);
}
protected abstract Ordering<T> makeOrdering(Query<T> query);

View File

@ -281,4 +281,8 @@ public class SegmentAnalyzer
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
}
private boolean analysisHasInterva(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) {
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.INTERVAL);
}
}

View File

@ -34,7 +34,9 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.common.guava.CombiningSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Row;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
@ -77,6 +79,36 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
{
private Function<SegmentAnalysis, SegmentAnalysis> transformFn = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
JodaUtils.condenseIntervals(analysis.getIntervals()),
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
}
};
@Override
public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner,
Query<SegmentAnalysis> query,
Map<String, Object> context
)
{
return CombiningSequence.create(
baseRunner.run(query, context),
makeOrdering(query),
createMergeFn(query),
transformFn
);
}
@Override
protected Ordering<SegmentAnalysis> makeOrdering(Query<SegmentAnalysis> query)
{
@ -115,9 +147,11 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
return arg1;
}
List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
List<Interval> newIntervals = null;
if (query.hasInterval()) {
newIntervals = arg1.getIntervals();
newIntervals.addAll(arg2.getIntervals());
}
final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();

View File

@ -45,9 +45,11 @@ import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -117,12 +119,13 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
columns.put(columnName, column);
}
}
List<Interval> retIntervals = query.hasInterval() ? Arrays.asList(segment.getDataInterval()) : null;
return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
segment.getIdentifier(),
Arrays.asList(segment.getDataInterval()),
retIntervals,
columns,
totalSize,
numRows

View File

@ -50,7 +50,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
public enum AnalysisType
{
CARDINALITY,
SIZE;
SIZE,
INTERVAL;
@JsonValue
@Override
@ -77,7 +78,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
public static final EnumSet<AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
AnalysisType.CARDINALITY,
AnalysisType.SIZE
AnalysisType.SIZE,
AnalysisType.INTERVAL
);
private final ColumnIncluderator toInclude;
@ -163,6 +165,11 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
return analysisTypes.contains(AnalysisType.SIZE);
}
public boolean hasInterval()
{
return analysisTypes.contains(AnalysisType.INTERVAL);
}
public byte[] getAnalysisTypesCacheKey()
{
int size = 1;
@ -259,6 +266,10 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
if (usingDefaultInterval != that.usingDefaultInterval) {
return false;
}
if (!analysisTypes.equals(that.analysisTypes)) {
return false;
}
return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null);
}
@ -270,6 +281,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0);
result = 31 * result + (merge ? 1 : 0);
result = 31 * result + (usingDefaultInterval ? 1 : 0);
result = 31 * result + analysisTypes.hashCode();
return result;
}
}

View File

@ -54,7 +54,7 @@ public class SegmentMetadataQueryQueryToolChestTest
new SegmentMetadataQueryQueryToolChest(null).getCacheStrategy(query);
// Test cache key generation
byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01};
byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01, 0x02};
byte[] actualKey = strategy.computeCacheKey(query);
Assert.assertArrayEquals(expectedKey, actualKey);

View File

@ -1025,6 +1025,7 @@ public class RealtimePlumber implements Plumber
final File persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
interval,
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec

View File

@ -396,21 +396,23 @@ public class RealtimePlumberSchoolTest
Map<Long, Sink> sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(new Long(0)));
DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z");
Interval expectedInterval = new Interval(startTime, new DateTime("1971-01-01T00:00:00.000Z"));
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")),
expectedInterval,
hydrants.get(0).getSegment().getDataInterval()
);
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")),
expectedInterval,
hydrants.get(1).getSegment().getDataInterval()
);
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")),
expectedInterval,
hydrants.get(2).getSegment().getDataInterval()
);