Merge pull request #2143 from gianm/fix-segment-metadata-intervals

SegmentMetadataQuery merging fixes.
This commit is contained in:
Jonathan Wei 2015-12-22 18:23:25 -08:00
commit c3374b04e8
7 changed files with 143 additions and 73 deletions

View File

@ -38,29 +38,25 @@ public class CombiningSequence<T> implements Sequence<T>
public static <T> CombiningSequence<T> create(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn,
Function transformFn
BinaryFn<T, T, T> mergeFn
)
{
return new CombiningSequence<T>(baseSequence, ordering, mergeFn, transformFn);
return new CombiningSequence<T>(baseSequence, ordering, mergeFn);
}
private final Sequence<T> baseSequence;
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;
private final Function transformFn;
public CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn,
Function transformFn
BinaryFn<T, T, T> mergeFn
)
{
this.baseSequence = baseSequence;
this.ordering = ordering;
this.mergeFn = mergeFn;
this.transformFn = transformFn;
}
@Override
@ -122,9 +118,6 @@ public class CombiningSequence<T> implements Sequence<T>
@Override
public OutType get()
{
if (transformFn != null) {
return (OutType) transformFn.apply(retVal);
}
return retVal;
}

View File

@ -214,8 +214,7 @@ public class CombiningSequenceTest
return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
},
null
}
);
List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList());

View File

@ -55,6 +55,8 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -79,9 +81,9 @@ public class Druids
/**
* A Builder for AndDimFilter.
*
* <p/>
* Required: fields() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
@ -125,9 +127,9 @@ public class Druids
/**
* A Builder for OrDimFilter.
*
* <p/>
* Required: fields() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* OrDimFilter orDimFilter = Druids.newOrDimFilterBuilder()
@ -180,9 +182,9 @@ public class Druids
/**
* A Builder for NotDimFilter.
*
* <p/>
* Required: field() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* NotDimFilter notDimFilter = Druids.newNotDimFilterBuilder()
@ -226,9 +228,9 @@ public class Druids
/**
* A Builder for SelectorDimFilter.
*
* <p/>
* Required: dimension() and value() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* Selector selDimFilter = Druids.newSelectorDimFilterBuilder()
@ -305,10 +307,10 @@ public class Druids
/**
* A Builder for TimeseriesQuery.
*
* <p/>
* Required: dataSource(), intervals(), and aggregators() must be called before build()
* Optional: filters(), granularity(), postAggregators(), and context() can be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
@ -503,11 +505,11 @@ public class Druids
/**
* A Builder for SearchQuery.
*
* <p/>
* Required: dataSource(), intervals(), dimensions() and query() must be called before build()
*
* <p/>
* Optional: filters(), granularity(), and context() can be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* SearchQuery query = Druids.newSearchQueryBuilder()
@ -738,9 +740,9 @@ public class Druids
/**
* A Builder for TimeBoundaryQuery.
*
* <p/>
* Required: dataSource() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* TimeBoundaryQuery query = new MaxTimeQueryBuilder()
@ -834,9 +836,9 @@ public class Druids
/**
* A Builder for Result.
*
* <p/>
* Required: timestamp() and value() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* Result&lt;T&gt; result = Druids.newResultBuilder()
@ -900,9 +902,9 @@ public class Druids
/**
* A Builder for SegmentMetadataQuery.
*
* <p/>
* Required: dataSource(), intervals() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
@ -918,6 +920,7 @@ public class Druids
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private ColumnIncluderator toInclude;
private EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
private Boolean merge;
private Map<String, Object> context;
@ -926,6 +929,7 @@ public class Druids
dataSource = null;
querySegmentSpec = null;
toInclude = null;
analysisTypes = null;
merge = null;
context = null;
}
@ -938,17 +942,22 @@ public class Druids
toInclude,
merge,
context,
null,
analysisTypes,
false
);
}
public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
{
final SegmentMetadataQuery.AnalysisType[] analysisTypesArray =
analysisTypes != null
? analysisTypes.toArray(new SegmentMetadataQuery.AnalysisType[analysisTypes.size()])
: null;
return new SegmentMetadataQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.toInclude(toInclude)
.analysisTypes(analysisTypesArray)
.merge(merge)
.context(builder.context);
}
@ -989,6 +998,17 @@ public class Druids
return this;
}
public SegmentMetadataQueryBuilder analysisTypes(SegmentMetadataQuery.AnalysisType... analysisTypes)
{
if (analysisTypes == null) {
this.analysisTypes = null;
} else {
this.analysisTypes = analysisTypes.length == 0
? EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class)
: EnumSet.copyOf(Arrays.asList(analysisTypes));
}
return this;
}
public SegmentMetadataQueryBuilder merge(boolean merge)
{
@ -1010,9 +1030,9 @@ public class Druids
/**
* A Builder for SelectQuery.
*
* <p/>
* Required: dataSource(), intervals() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* SelectQuery query = new SelectQueryBuilder()
@ -1164,9 +1184,9 @@ public class Druids
/**
* A Builder for DataSourceMetadataQuery.
*
* <p/>
* Required: dataSource() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* DataSourceMetadataQueryBuilder query = new DataSourceMetadataQueryBuilder()

View File

@ -40,7 +40,7 @@ public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRu
@Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context)
{
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query), null);
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
}
protected abstract Ordering<T> makeOrdering(Query<T> query);

View File

@ -59,7 +59,10 @@ public class SegmentAnalyzer
*/
private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
public Map<String, ColumnAnalysis> analyze(QueryableIndex index, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
public Map<String, ColumnAnalysis> analyze(
QueryableIndex index,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{
Preconditions.checkNotNull(index, "Index cannot be null");
@ -100,7 +103,10 @@ public class SegmentAnalyzer
return columns;
}
public Map<String, ColumnAnalysis> analyze(StorageAdapter adapter, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
public Map<String, ColumnAnalysis> analyze(
StorageAdapter adapter,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{
Preconditions.checkNotNull(adapter, "Adapter cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
@ -174,7 +180,11 @@ public class SegmentAnalyzer
return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT, analysisTypes);
}
private ColumnAnalysis lengthBasedAnalysis(Column column, final int numBytes, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
private ColumnAnalysis lengthBasedAnalysis(
Column column,
final int numBytes,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasMultipleValues()) {
@ -273,16 +283,13 @@ public class SegmentAnalyzer
);
}
private boolean analysisHasSize(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) {
private boolean analysisHasSize(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
}
private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) {
private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
}
private boolean analysisHasInterva(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) {
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.INTERVAL);
}
}

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.MappedSequence;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
@ -63,6 +64,20 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
private static final Function<SegmentAnalysis, SegmentAnalysis> MERGE_TRANSFORM_FN = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
}
};
private final SegmentMetadataQueryConfig config;
@ -79,21 +94,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
{
private Function<SegmentAnalysis, SegmentAnalysis> transformFn = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
JodaUtils.condenseIntervals(analysis.getIntervals()),
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
}
};
@Override
public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner,
@ -101,11 +101,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
Map<String, Object> context
)
{
return CombiningSequence.create(
baseRunner.run(query, context),
makeOrdering(query),
createMergeFn(query),
transformFn
return new MappedSequence<>(
CombiningSequence.create(
baseRunner.run(query, context),
makeOrdering(query),
createMergeFn(query)
),
MERGE_TRANSFORM_FN
);
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.Sequences;
import io.druid.common.utils.JodaUtils;
import io.druid.jackson.DefaultObjectMapper;
@ -89,6 +90,7 @@ public class SegmentMetadataQueryTest
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.analysisTypes(null)
.merge(true)
.build();
@ -127,10 +129,7 @@ public class SegmentMetadataQueryTest
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
ImmutableList.of(
expectedSegmentAnalysis.getIntervals().get(0),
expectedSegmentAnalysis.getIntervals().get(0)
),
ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)),
ImmutableMap.of(
"placement",
new ColumnAnalysis(
@ -151,7 +150,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Executors.newCachedThreadPool(),
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
),
@ -169,6 +168,56 @@ public class SegmentMetadataQueryTest
exec.shutdownNow();
}
@Test
public void testSegmentMetadataQueryWithNoAnalysisTypesMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
0,
0,
null
)
),
0,
expectedSegmentAnalysis.getNumRows()*2
);
QueryToolChest toolChest = factory.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.analysisTypes()
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test
public void testBySegmentResults()
{
@ -188,7 +237,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Executors.newCachedThreadPool(),
MoreExecutors.sameThreadExecutor(),
//Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in
//https://github.com/druid-io/druid/pull/1172
//the bug surfaces only when ordering is used which happens only when you have 2 things to compare