SegmentMetadataQuery merging fixes.

- Fix merging when the INTERVALS analysisType is disabled, and add a test.
- Remove transformFn from CombiningSequence, use MappingSequence instead. transformFn did
  not work for "accumulate" anyway, which made the tests wrong (the intervals should have
  been condensed, but were not).
- Add analysisTypes to the Druids segmentMetadataQuery builder to make testing simpler.
This commit is contained in:
Gian Merlino 2015-12-22 01:21:56 -08:00
parent 7b5fd76058
commit 83f4130b5f
7 changed files with 143 additions and 73 deletions

View File

@ -38,29 +38,25 @@ public class CombiningSequence<T> implements Sequence<T>
public static <T> CombiningSequence<T> create( public static <T> CombiningSequence<T> create(
Sequence<T> baseSequence, Sequence<T> baseSequence,
Ordering<T> ordering, Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn, BinaryFn<T, T, T> mergeFn
Function transformFn
) )
{ {
return new CombiningSequence<T>(baseSequence, ordering, mergeFn, transformFn); return new CombiningSequence<T>(baseSequence, ordering, mergeFn);
} }
private final Sequence<T> baseSequence; private final Sequence<T> baseSequence;
private final Ordering<T> ordering; private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn; private final BinaryFn<T, T, T> mergeFn;
private final Function transformFn;
public CombiningSequence( public CombiningSequence(
Sequence<T> baseSequence, Sequence<T> baseSequence,
Ordering<T> ordering, Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn, BinaryFn<T, T, T> mergeFn
Function transformFn
) )
{ {
this.baseSequence = baseSequence; this.baseSequence = baseSequence;
this.ordering = ordering; this.ordering = ordering;
this.mergeFn = mergeFn; this.mergeFn = mergeFn;
this.transformFn = transformFn;
} }
@Override @Override
@ -122,9 +118,6 @@ public class CombiningSequence<T> implements Sequence<T>
@Override @Override
public OutType get() public OutType get()
{ {
if (transformFn != null) {
return (OutType) transformFn.apply(retVal);
}
return retVal; return retVal;
} }

View File

@ -214,8 +214,7 @@ public class CombiningSequenceTest
return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs); return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
} }
}, }
null
); );
List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList()); List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList());

View File

@ -55,6 +55,8 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -79,9 +81,9 @@ public class Druids
/** /**
* A Builder for AndDimFilter. * A Builder for AndDimFilter.
* * <p/>
* Required: fields() must be called before build() * Required: fields() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder() * AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
@ -125,9 +127,9 @@ public class Druids
/** /**
* A Builder for OrDimFilter. * A Builder for OrDimFilter.
* * <p/>
* Required: fields() must be called before build() * Required: fields() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* OrDimFilter orDimFilter = Druids.newOrDimFilterBuilder() * OrDimFilter orDimFilter = Druids.newOrDimFilterBuilder()
@ -180,9 +182,9 @@ public class Druids
/** /**
* A Builder for NotDimFilter. * A Builder for NotDimFilter.
* * <p/>
* Required: field() must be called before build() * Required: field() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* NotDimFilter notDimFilter = Druids.newNotDimFilterBuilder() * NotDimFilter notDimFilter = Druids.newNotDimFilterBuilder()
@ -226,9 +228,9 @@ public class Druids
/** /**
* A Builder for SelectorDimFilter. * A Builder for SelectorDimFilter.
* * <p/>
* Required: dimension() and value() must be called before build() * Required: dimension() and value() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* Selector selDimFilter = Druids.newSelectorDimFilterBuilder() * Selector selDimFilter = Druids.newSelectorDimFilterBuilder()
@ -305,10 +307,10 @@ public class Druids
/** /**
* A Builder for TimeseriesQuery. * A Builder for TimeseriesQuery.
* * <p/>
* Required: dataSource(), intervals(), and aggregators() must be called before build() * Required: dataSource(), intervals(), and aggregators() must be called before build()
* Optional: filters(), granularity(), postAggregators(), and context() can be called before build() * Optional: filters(), granularity(), postAggregators(), and context() can be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() * TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
@ -503,11 +505,11 @@ public class Druids
/** /**
* A Builder for SearchQuery. * A Builder for SearchQuery.
* * <p/>
* Required: dataSource(), intervals(), dimensions() and query() must be called before build() * Required: dataSource(), intervals(), dimensions() and query() must be called before build()
* * <p/>
* Optional: filters(), granularity(), and context() can be called before build() * Optional: filters(), granularity(), and context() can be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* SearchQuery query = Druids.newSearchQueryBuilder() * SearchQuery query = Druids.newSearchQueryBuilder()
@ -738,9 +740,9 @@ public class Druids
/** /**
* A Builder for TimeBoundaryQuery. * A Builder for TimeBoundaryQuery.
* * <p/>
* Required: dataSource() must be called before build() * Required: dataSource() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* TimeBoundaryQuery query = new MaxTimeQueryBuilder() * TimeBoundaryQuery query = new MaxTimeQueryBuilder()
@ -834,9 +836,9 @@ public class Druids
/** /**
* A Builder for Result. * A Builder for Result.
* * <p/>
* Required: timestamp() and value() must be called before build() * Required: timestamp() and value() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* Result&lt;T&gt; result = Druids.newResultBuilder() * Result&lt;T&gt; result = Druids.newResultBuilder()
@ -900,9 +902,9 @@ public class Druids
/** /**
* A Builder for SegmentMetadataQuery. * A Builder for SegmentMetadataQuery.
* * <p/>
* Required: dataSource(), intervals() must be called before build() * Required: dataSource(), intervals() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* SegmentMetadataQuery query = new SegmentMetadataQueryBuilder() * SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
@ -918,6 +920,7 @@ public class Druids
private DataSource dataSource; private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec; private QuerySegmentSpec querySegmentSpec;
private ColumnIncluderator toInclude; private ColumnIncluderator toInclude;
private EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
private Boolean merge; private Boolean merge;
private Map<String, Object> context; private Map<String, Object> context;
@ -926,6 +929,7 @@ public class Druids
dataSource = null; dataSource = null;
querySegmentSpec = null; querySegmentSpec = null;
toInclude = null; toInclude = null;
analysisTypes = null;
merge = null; merge = null;
context = null; context = null;
} }
@ -938,17 +942,22 @@ public class Druids
toInclude, toInclude,
merge, merge,
context, context,
null, analysisTypes,
false false
); );
} }
public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder) public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
{ {
final SegmentMetadataQuery.AnalysisType[] analysisTypesArray =
analysisTypes != null
? analysisTypes.toArray(new SegmentMetadataQuery.AnalysisType[analysisTypes.size()])
: null;
return new SegmentMetadataQueryBuilder() return new SegmentMetadataQueryBuilder()
.dataSource(builder.dataSource) .dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec) .intervals(builder.querySegmentSpec)
.toInclude(toInclude) .toInclude(toInclude)
.analysisTypes(analysisTypesArray)
.merge(merge) .merge(merge)
.context(builder.context); .context(builder.context);
} }
@ -989,6 +998,17 @@ public class Druids
return this; return this;
} }
public SegmentMetadataQueryBuilder analysisTypes(SegmentMetadataQuery.AnalysisType... analysisTypes)
{
if (analysisTypes == null) {
this.analysisTypes = null;
} else {
this.analysisTypes = analysisTypes.length == 0
? EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class)
: EnumSet.copyOf(Arrays.asList(analysisTypes));
}
return this;
}
public SegmentMetadataQueryBuilder merge(boolean merge) public SegmentMetadataQueryBuilder merge(boolean merge)
{ {
@ -1010,9 +1030,9 @@ public class Druids
/** /**
* A Builder for SelectQuery. * A Builder for SelectQuery.
* * <p/>
* Required: dataSource(), intervals() must be called before build() * Required: dataSource(), intervals() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* SelectQuery query = new SelectQueryBuilder() * SelectQuery query = new SelectQueryBuilder()
@ -1164,9 +1184,9 @@ public class Druids
/** /**
* A Builder for DataSourceMetadataQuery. * A Builder for DataSourceMetadataQuery.
* * <p/>
* Required: dataSource() must be called before build() * Required: dataSource() must be called before build()
* * <p/>
* Usage example: * Usage example:
* <pre><code> * <pre><code>
* DataSourceMetadataQueryBuilder query = new DataSourceMetadataQueryBuilder() * DataSourceMetadataQueryBuilder query = new DataSourceMetadataQueryBuilder()

View File

@ -40,7 +40,7 @@ public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRu
@Override @Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context) public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context)
{ {
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query), null); return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
} }
protected abstract Ordering<T> makeOrdering(Query<T> query); protected abstract Ordering<T> makeOrdering(Query<T> query);

View File

@ -59,7 +59,10 @@ public class SegmentAnalyzer
*/ */
private static final int NUM_BYTES_IN_TEXT_FLOAT = 8; private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
public Map<String, ColumnAnalysis> analyze(QueryableIndex index, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) public Map<String, ColumnAnalysis> analyze(
QueryableIndex index,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{ {
Preconditions.checkNotNull(index, "Index cannot be null"); Preconditions.checkNotNull(index, "Index cannot be null");
@ -100,7 +103,10 @@ public class SegmentAnalyzer
return columns; return columns;
} }
public Map<String, ColumnAnalysis> analyze(StorageAdapter adapter, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) public Map<String, ColumnAnalysis> analyze(
StorageAdapter adapter,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{ {
Preconditions.checkNotNull(adapter, "Adapter cannot be null"); Preconditions.checkNotNull(adapter, "Adapter cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap(); Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
@ -174,7 +180,11 @@ public class SegmentAnalyzer
return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT, analysisTypes); return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT, analysisTypes);
} }
private ColumnAnalysis lengthBasedAnalysis(Column column, final int numBytes, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) private ColumnAnalysis lengthBasedAnalysis(
Column column,
final int numBytes,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{ {
final ColumnCapabilities capabilities = column.getCapabilities(); final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasMultipleValues()) { if (capabilities.hasMultipleValues()) {
@ -273,16 +283,13 @@ public class SegmentAnalyzer
); );
} }
private boolean analysisHasSize(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) { private boolean analysisHasSize(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE); return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
} }
private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) { private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY); return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
} }
private boolean analysisHasInterva(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) {
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.INTERVAL);
}
} }

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.MappedSequence;
import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
@ -63,6 +64,20 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{ {
}; };
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
private static final Function<SegmentAnalysis, SegmentAnalysis> MERGE_TRANSFORM_FN = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
}
};
private final SegmentMetadataQueryConfig config; private final SegmentMetadataQueryConfig config;
@ -79,21 +94,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{ {
return new ResultMergeQueryRunner<SegmentAnalysis>(runner) return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
{ {
private Function<SegmentAnalysis, SegmentAnalysis> transformFn = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
JodaUtils.condenseIntervals(analysis.getIntervals()),
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
}
};
@Override @Override
public Sequence<SegmentAnalysis> doRun( public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner, QueryRunner<SegmentAnalysis> baseRunner,
@ -101,11 +101,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
Map<String, Object> context Map<String, Object> context
) )
{ {
return CombiningSequence.create( return new MappedSequence<>(
CombiningSequence.create(
baseRunner.run(query, context), baseRunner.run(query, context),
makeOrdering(query), makeOrdering(query),
createMergeFn(query), createMergeFn(query)
transformFn ),
MERGE_TRANSFORM_FN
); );
} }

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
@ -89,6 +90,7 @@ public class SegmentMetadataQueryTest
.dataSource("testing") .dataSource("testing")
.intervals("2013/2014") .intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement"))) .toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.analysisTypes(null)
.merge(true) .merge(true)
.build(); .build();
@ -127,10 +129,7 @@ public class SegmentMetadataQueryTest
{ {
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis( SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged", "merged",
ImmutableList.of( ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)),
expectedSegmentAnalysis.getIntervals().get(0),
expectedSegmentAnalysis.getIntervals().get(0)
),
ImmutableMap.of( ImmutableMap.of(
"placement", "placement",
new ColumnAnalysis( new ColumnAnalysis(
@ -151,7 +150,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
factory.mergeRunners( factory.mergeRunners(
Executors.newCachedThreadPool(), MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner) Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
) )
), ),
@ -169,6 +168,56 @@ public class SegmentMetadataQueryTest
exec.shutdownNow(); exec.shutdownNow();
} }
@Test
public void testSegmentMetadataQueryWithNoAnalysisTypesMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
0,
0,
null
)
),
0,
expectedSegmentAnalysis.getNumRows()*2
);
QueryToolChest toolChest = factory.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.analysisTypes()
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test @Test
public void testBySegmentResults() public void testBySegmentResults()
{ {
@ -188,7 +237,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>( QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults( toolChest.mergeResults(
factory.mergeRunners( factory.mergeRunners(
Executors.newCachedThreadPool(), MoreExecutors.sameThreadExecutor(),
//Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in //Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in
//https://github.com/druid-io/druid/pull/1172 //https://github.com/druid-io/druid/pull/1172
//the bug surfaces only when ordering is used which happens only when you have 2 things to compare //the bug surfaces only when ordering is used which happens only when you have 2 things to compare