diff --git a/common/src/main/java/io/druid/common/guava/CombiningSequence.java b/common/src/main/java/io/druid/common/guava/CombiningSequence.java index 568c73f76dc..7b463a67f2e 100644 --- a/common/src/main/java/io/druid/common/guava/CombiningSequence.java +++ b/common/src/main/java/io/druid/common/guava/CombiningSequence.java @@ -19,6 +19,7 @@ package io.druid.common.guava; +import com.google.common.base.Function; import com.google.common.collect.Ordering; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -37,25 +38,29 @@ public class CombiningSequence implements Sequence public static CombiningSequence create( Sequence baseSequence, Ordering ordering, - BinaryFn mergeFn + BinaryFn mergeFn, + Function transformFn ) { - return new CombiningSequence(baseSequence, ordering, mergeFn); + return new CombiningSequence(baseSequence, ordering, mergeFn, transformFn); } private final Sequence baseSequence; private final Ordering ordering; private final BinaryFn mergeFn; + private final Function transformFn; public CombiningSequence( Sequence baseSequence, Ordering ordering, - BinaryFn mergeFn + BinaryFn mergeFn, + Function transformFn ) { this.baseSequence = baseSequence; this.ordering = ordering; this.mergeFn = mergeFn; + this.transformFn = transformFn; } @Override @@ -117,6 +122,9 @@ public class CombiningSequence implements Sequence @Override public OutType get() { + if (transformFn != null) { + return (OutType) transformFn.apply(retVal); + } return retVal; } diff --git a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java index c280f15d991..2485e68436b 100644 --- a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java +++ b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java @@ -214,7 +214,8 @@ public class CombiningSequenceTest return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs); } - } + }, + null ); List> merged = Sequences.toList(seq, Lists.>newArrayList()); diff --git a/docs/content/querying/segmentmetadataquery.md b/docs/content/querying/segmentmetadataquery.md index 5ae3649814c..7aa0ee8f6eb 100644 --- a/docs/content/querying/segmentmetadataquery.md +++ b/docs/content/querying/segmentmetadataquery.md @@ -30,7 +30,7 @@ There are several main parts to a segment metadata query: |toInclude|A JSON Object representing what columns should be included in the result. Defaults to "all".|no| |merge|Merge all individual segment metadata results into a single result|no| |context|See [Context](../querying/query-context.html)|no| -|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size"]. See section [analysisTypes](#analysistypes) for more details.|no| +|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size", "interval"]. See section [analysisTypes](#analysistypes) for more details.|no| The format of the result is: @@ -96,7 +96,7 @@ This is a list of properties that determines the amount of information returned By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query. -There are 2 types of column analyses: +There are 3 types of column analyses: #### cardinality @@ -107,3 +107,7 @@ There are 2 types of column analyses: * Estimated byte size for the segment columns if they were stored in a flat format * Estimated total segment byte size in if it was stored in a flat format + +#### interval + +* If present, the SegmentMetadataQuery will return the list of intervals associated with the queried segments. diff --git a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java index a52e7579125..76a39a83a99 100644 --- a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java @@ -40,7 +40,7 @@ public abstract class ResultMergeQueryRunner extends BySegmentSkippingQueryRu @Override public Sequence doRun(QueryRunner baseRunner, Query query, Map context) { - return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query)); + return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query), null); } protected abstract Ordering makeOrdering(Query query); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java index af0ef26213a..57cc7a934f4 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java @@ -281,4 +281,8 @@ public class SegmentAnalyzer return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY); } + private boolean analysisHasInterva(EnumSet analysisTypes) { + return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.INTERVAL); + } + } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index e7f3ec96bde..dfe20e1bc32 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -34,7 +34,9 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; +import io.druid.common.guava.CombiningSequence; import io.druid.common.utils.JodaUtils; +import io.druid.data.input.Row; import io.druid.query.CacheStrategy; import io.druid.query.DruidMetrics; import io.druid.query.Query; @@ -77,6 +79,36 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest(runner) { + private Function transformFn = new Function() + { + @Override + public SegmentAnalysis apply(SegmentAnalysis analysis) + { + return new SegmentAnalysis( + analysis.getId(), + JodaUtils.condenseIntervals(analysis.getIntervals()), + analysis.getColumns(), + analysis.getSize(), + analysis.getNumRows() + ); + } + }; + + @Override + public Sequence doRun( + QueryRunner baseRunner, + Query query, + Map context + ) + { + return CombiningSequence.create( + baseRunner.run(query, context), + makeOrdering(query), + createMergeFn(query), + transformFn + ); + } + @Override protected Ordering makeOrdering(Query query) { @@ -115,9 +147,11 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest newIntervals = JodaUtils.condenseIntervals( - Iterables.concat(arg1.getIntervals(), arg2.getIntervals()) - ); + List newIntervals = null; + if (query.hasInterval()) { + newIntervals = arg1.getIntervals(); + newIntervals.addAll(arg2.getIntervals()); + } final Map leftColumns = arg1.getColumns(); final Map rightColumns = arg2.getColumns(); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index ae35ab35341..fd88321d9f5 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -45,9 +45,11 @@ import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.segment.QueryableIndex; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; +import org.joda.time.Interval; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -117,12 +119,13 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory retIntervals = query.hasInterval() ? Arrays.asList(segment.getDataInterval()) : null; return Sequences.simple( Arrays.asList( new SegmentAnalysis( segment.getIdentifier(), - Arrays.asList(segment.getDataInterval()), + retIntervals, columns, totalSize, numRows diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index 43ec39c6895..881d52aed64 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -50,7 +50,8 @@ public class SegmentMetadataQuery extends BaseQuery public enum AnalysisType { CARDINALITY, - SIZE; + SIZE, + INTERVAL; @JsonValue @Override @@ -77,7 +78,8 @@ public class SegmentMetadataQuery extends BaseQuery public static final EnumSet DEFAULT_ANALYSIS_TYPES = EnumSet.of( AnalysisType.CARDINALITY, - AnalysisType.SIZE + AnalysisType.SIZE, + AnalysisType.INTERVAL ); private final ColumnIncluderator toInclude; @@ -163,6 +165,11 @@ public class SegmentMetadataQuery extends BaseQuery return analysisTypes.contains(AnalysisType.SIZE); } + public boolean hasInterval() + { + return analysisTypes.contains(AnalysisType.INTERVAL); + } + public byte[] getAnalysisTypesCacheKey() { int size = 1; @@ -259,6 +266,10 @@ public class SegmentMetadataQuery extends BaseQuery if (usingDefaultInterval != that.usingDefaultInterval) { return false; } + + if (!analysisTypes.equals(that.analysisTypes)) { + return false; + } return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null); } @@ -270,6 +281,7 @@ public class SegmentMetadataQuery extends BaseQuery result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0); result = 31 * result + (merge ? 1 : 0); result = 31 * result + (usingDefaultInterval ? 1 : 0); + result = 31 * result + analysisTypes.hashCode(); return result; } } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index a6923a424c9..86ca2fa7b69 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -54,7 +54,7 @@ public class SegmentMetadataQueryQueryToolChestTest new SegmentMetadataQueryQueryToolChest(null).getCacheStrategy(query); // Test cache key generation - byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01}; + byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01, 0x02}; byte[] actualKey = strategy.computeCacheKey(query); Assert.assertArrayEquals(expectedKey, actualKey); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 8d5641a4cb5..6d5c709c2f1 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -1025,6 +1025,7 @@ public class RealtimePlumber implements Plumber final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), + interval, new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), metaData, indexSpec diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 9d05fac366c..c20451f6236 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -396,21 +396,23 @@ public class RealtimePlumberSchoolTest Map sinks = restoredPlumber.getSinks(); Assert.assertEquals(1, sinks.size()); + List hydrants = Lists.newArrayList(sinks.get(new Long(0))); DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z"); + Interval expectedInterval = new Interval(startTime, new DateTime("1971-01-01T00:00:00.000Z")); Assert.assertEquals(0, hydrants.get(0).getCount()); Assert.assertEquals( - new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")), + expectedInterval, hydrants.get(0).getSegment().getDataInterval() ); Assert.assertEquals(2, hydrants.get(1).getCount()); Assert.assertEquals( - new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")), + expectedInterval, hydrants.get(1).getSegment().getDataInterval() ); Assert.assertEquals(4, hydrants.get(2).getCount()); Assert.assertEquals( - new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")), + expectedInterval, hydrants.get(2).getSegment().getDataInterval() );