mirror of https://github.com/apache/druid.git
Added more complex test case with versioned segments
This commit is contained in:
parent
2686bfa394
commit
5f1e60324a
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.select;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.ISE;
|
||||
|
@ -33,11 +34,13 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.SegmentDesc;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -69,6 +72,11 @@ public class SelectQueryEngine
|
|||
} else {
|
||||
metrics = query.getMetrics();
|
||||
}
|
||||
List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
|
||||
Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals);
|
||||
|
||||
// should be rewritten with given interval
|
||||
final String segmentId = SegmentDesc.withInterval(segment.getIdentifier(), intervals.get(0));
|
||||
|
||||
return QueryRunnerHelper.makeCursorBasedQuery(
|
||||
adapter,
|
||||
|
@ -101,7 +109,7 @@ public class SelectQueryEngine
|
|||
metSelectors.put(metric, metricSelector);
|
||||
}
|
||||
|
||||
final PagingOffset offset = query.getPagingOffset(segment.getIdentifier());
|
||||
final PagingOffset offset = query.getPagingOffset(segmentId);
|
||||
|
||||
cursor.advanceTo(offset.startDelta());
|
||||
|
||||
|
@ -145,14 +153,14 @@ public class SelectQueryEngine
|
|||
|
||||
builder.addEntry(
|
||||
new EventHolder(
|
||||
segment.getIdentifier(),
|
||||
segmentId,
|
||||
lastOffset = offset.current(),
|
||||
theEvent
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
builder.finished(segment.getIdentifier(), lastOffset);
|
||||
builder.finished(segmentId, lastOffset);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -25,11 +25,12 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Functions;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.StringUtils;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
import com.metamx.common.guava.nary.BinaryFn;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
|
@ -53,11 +54,11 @@ import org.joda.time.Interval;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -273,39 +274,50 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
|||
if (paging == null || paging.isEmpty()) {
|
||||
return segments;
|
||||
}
|
||||
|
||||
final QueryGranularity granularity = query.getGranularity();
|
||||
|
||||
List<Interval> intervals = Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
paging.keySet(),
|
||||
SegmentDesc.INTERVAL_EXTRACTOR
|
||||
)
|
||||
Iterables.transform(paging.keySet(), SegmentDesc.INTERVAL_EXTRACTOR)
|
||||
);
|
||||
Collections.sort(
|
||||
intervals, new Comparator<Interval>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Interval o1, Interval o2)
|
||||
{
|
||||
return Longs.compare(o1.getStartMillis(), o2.getStartMillis());
|
||||
}
|
||||
}
|
||||
intervals, query.isDescending() ? Comparators.intervalsByEndThenStart()
|
||||
: Comparators.intervalsByStartThenEnd()
|
||||
);
|
||||
|
||||
TreeMap<Long, Long> granularThresholds = Maps.newTreeMap();
|
||||
for (Interval interval : intervals) {
|
||||
if (query.isDescending()) {
|
||||
long granularEnd = granularity.truncate(interval.getEndMillis());
|
||||
Long currentEnd = granularThresholds.get(granularEnd);
|
||||
if (currentEnd == null || interval.getEndMillis() > currentEnd) {
|
||||
granularThresholds.put(granularEnd, interval.getEndMillis());
|
||||
}
|
||||
} else {
|
||||
long granularStart = granularity.truncate(interval.getStartMillis());
|
||||
Long currentStart = granularThresholds.get(granularStart);
|
||||
if (currentStart == null || interval.getStartMillis() < currentStart) {
|
||||
granularThresholds.put(granularStart, interval.getStartMillis());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<T> queryIntervals = Lists.newArrayList(segments);
|
||||
|
||||
Iterator<T> it = queryIntervals.iterator();
|
||||
if (query.isDescending()) {
|
||||
final long lastEnd = intervals.get(intervals.size() - 1).getEndMillis();
|
||||
while (it.hasNext()) {
|
||||
T segment = it.next();
|
||||
if (segment.getInterval().getStartMillis() > lastEnd) {
|
||||
Interval interval = it.next().getInterval();
|
||||
Map.Entry<Long, Long> ceiling = granularThresholds.ceilingEntry(granularity.truncate(interval.getEndMillis()));
|
||||
if (ceiling == null || interval.getStartMillis() >= ceiling.getValue()) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final long firstStart = intervals.get(0).getStartMillis();
|
||||
while (it.hasNext()) {
|
||||
T segment = it.next();
|
||||
if (segment.getInterval().getEndMillis() < firstStart) {
|
||||
Interval interval = it.next().getInterval();
|
||||
Map.Entry<Long, Long> floor = granularThresholds.floorEntry(granularity.truncate(interval.getStartMillis()));
|
||||
if (floor == null || interval.getEndMillis() <= floor.getValue()) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -29,6 +30,8 @@ import org.joda.time.Interval;
|
|||
*/
|
||||
public class SegmentDesc
|
||||
{
|
||||
private static final Logger LOGGER = new Logger(SegmentDesc.class);
|
||||
|
||||
public static Function<String, Interval> INTERVAL_EXTRACTOR = new Function<String, Interval>()
|
||||
{
|
||||
@Override
|
||||
|
@ -57,6 +60,26 @@ public class SegmentDesc
|
|||
);
|
||||
}
|
||||
|
||||
public static String withInterval(final String identifier, Interval newInterval)
|
||||
{
|
||||
String[] splits = identifier.split(DataSegment.delimiter);
|
||||
if (splits.length < 4) {
|
||||
// happens for test segments which has invalid segment id.. ignore for now
|
||||
LOGGER.warn("Invalid segment identifier " + identifier);
|
||||
return identifier;
|
||||
}
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append(splits[0]).append(DataSegment.delimiter);
|
||||
builder.append(newInterval.getStart()).append(DataSegment.delimiter);
|
||||
builder.append(newInterval.getEnd()).append(DataSegment.delimiter);
|
||||
for (int i = 3; i < splits.length - 1; i++) {
|
||||
builder.append(splits[i]).append(DataSegment.delimiter);
|
||||
}
|
||||
builder.append(splits[splits.length - 1]);
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private final String dataSource;
|
||||
private final Interval interval;
|
||||
private final String version;
|
||||
|
|
|
@ -42,13 +42,15 @@ import io.druid.query.aggregation.post.ConstantPostAggregator;
|
|||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
import io.druid.query.spec.SpecificSegmentSpec;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.timeline.LogicalSegment;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
import io.druid.timeline.VersionedIntervalTimeline;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -442,29 +444,9 @@ public class QueryRunnerTestHelper
|
|||
}
|
||||
|
||||
public static <T> QueryRunner<T> makeFilteringQueryRunner(
|
||||
final List<Segment> segments,
|
||||
final QueryRunnerFactory<T, Query<T>> factory
|
||||
)
|
||||
{
|
||||
return makeQueryRunner(
|
||||
Lists.transform(
|
||||
segments, new Function<Segment, LogicalWrapper>()
|
||||
{
|
||||
@Override
|
||||
public LogicalWrapper apply(Segment segment)
|
||||
{
|
||||
return new LogicalWrapper(segment);
|
||||
}
|
||||
}
|
||||
), factory
|
||||
);
|
||||
}
|
||||
final VersionedIntervalTimeline<String, Segment> timeline,
|
||||
final QueryRunnerFactory<T, Query<T>> factory) {
|
||||
|
||||
private static <T> QueryRunner<T> makeQueryRunner(
|
||||
final List<LogicalWrapper> segments,
|
||||
final QueryRunnerFactory<T, Query<T>> factory
|
||||
)
|
||||
{
|
||||
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
|
||||
return new FinalizeResultsQueryRunner(
|
||||
toolChest.postMergeQueryDecoration(
|
||||
|
@ -475,14 +457,25 @@ public class QueryRunnerTestHelper
|
|||
@Override
|
||||
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
|
||||
{
|
||||
List<Sequence<T>> sequences = Lists.newArrayList();
|
||||
for (LogicalWrapper segment : toolChest.filterSegments(query, segments)) {
|
||||
sequences.add(factory.createRunner(segment.segment).run(query, responseContext));
|
||||
List<TimelineObjectHolder> segments = Lists.newArrayList();
|
||||
for (Interval interval : query.getIntervals()) {
|
||||
segments.addAll(timeline.lookup(interval));
|
||||
}
|
||||
return new MergeSequence<>(
|
||||
query.getResultOrdering(),
|
||||
Sequences.simple(sequences)
|
||||
);
|
||||
List<Sequence<T>> sequences = Lists.newArrayList();
|
||||
for (TimelineObjectHolder<String, Segment> holder : toolChest.filterSegments(query, segments)) {
|
||||
Segment segment = holder.getObject().getChunk(0).getObject();
|
||||
Query running = query.withQuerySegmentSpec(
|
||||
new SpecificSegmentSpec(
|
||||
new SegmentDescriptor(
|
||||
holder.getInterval(),
|
||||
holder.getVersion(),
|
||||
0
|
||||
)
|
||||
)
|
||||
);
|
||||
sequences.add(factory.createRunner(segment).run(running, responseContext));
|
||||
}
|
||||
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -492,29 +485,6 @@ public class QueryRunnerTestHelper
|
|||
);
|
||||
}
|
||||
|
||||
// wish Segment implements LogicalSegment
|
||||
private static class LogicalWrapper implements LogicalSegment
|
||||
{
|
||||
private final Segment segment;
|
||||
|
||||
private LogicalWrapper(Segment segment)
|
||||
{
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interval getInterval()
|
||||
{
|
||||
return segment.getDataInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return segment.getIdentifier();
|
||||
}
|
||||
}
|
||||
|
||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||
{
|
||||
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
|
||||
|
|
|
@ -26,12 +26,14 @@ import com.google.common.io.CharSource;
|
|||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.ordering.StringComparators;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestIndex;
|
||||
|
@ -39,7 +41,10 @@ import io.druid.segment.incremental.IncrementalIndex;
|
|||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
import io.druid.timeline.VersionedIntervalTimeline;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import io.druid.timeline.partition.SingleElementPartitionChunk;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -50,7 +55,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -70,50 +74,110 @@ public class MultiSegmentSelectQueryTest
|
|||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||
);
|
||||
|
||||
// time modified version of druid.sample.tsv
|
||||
public static final String[] V_0112 = {
|
||||
"2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000",
|
||||
"2011-01-12T01:00:00.000Z spot business preferred bpreferred 100.000000",
|
||||
"2011-01-12T02:00:00.000Z spot entertainment preferred epreferred 100.000000",
|
||||
"2011-01-12T03:00:00.000Z spot health preferred hpreferred 100.000000",
|
||||
"2011-01-12T04:00:00.000Z spot mezzanine preferred mpreferred 100.000000",
|
||||
"2011-01-12T05:00:00.000Z spot news preferred npreferred 100.000000",
|
||||
"2011-01-12T06:00:00.000Z spot premium preferred ppreferred 100.000000",
|
||||
"2011-01-12T07:00:00.000Z spot technology preferred tpreferred 100.000000",
|
||||
"2011-01-12T08:00:00.000Z spot travel preferred tpreferred 100.000000",
|
||||
"2011-01-12T09:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
|
||||
"2011-01-12T10:00:00.000Z total_market premium preferred ppreferred 1000.000000",
|
||||
"2011-01-12T11:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value",
|
||||
"2011-01-12T12:00:00.000Z upfront premium preferred ppreferred 800.000000 value"
|
||||
};
|
||||
public static final String[] V_0113 = {
|
||||
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713",
|
||||
"2011-01-13T01:00:00.000Z spot business preferred bpreferred 103.629399",
|
||||
"2011-01-13T02:00:00.000Z spot entertainment preferred epreferred 110.087299",
|
||||
"2011-01-13T03:00:00.000Z spot health preferred hpreferred 114.947403",
|
||||
"2011-01-13T04:00:00.000Z spot mezzanine preferred mpreferred 104.465767",
|
||||
"2011-01-13T05:00:00.000Z spot news preferred npreferred 102.851683",
|
||||
"2011-01-13T06:00:00.000Z spot premium preferred ppreferred 108.863011",
|
||||
"2011-01-13T07:00:00.000Z spot technology preferred tpreferred 111.356672",
|
||||
"2011-01-13T08:00:00.000Z spot travel preferred tpreferred 106.236928",
|
||||
"2011-01-13T09:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
|
||||
"2011-01-13T10:00:00.000Z total_market premium preferred ppreferred 1689.012875",
|
||||
"2011-01-13T11:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value",
|
||||
"2011-01-13T12:00:00.000Z upfront premium preferred ppreferred 1564.617729 value"
|
||||
};
|
||||
|
||||
public static final String[] V_OVERRIDE = {
|
||||
"2011-01-12T04:00:00.000Z spot automotive preferred apreferred 999.000000",
|
||||
"2011-01-12T05:00:00.000Z spot business preferred bpreferred 999.000000",
|
||||
"2011-01-12T06:00:00.000Z spot entertainment preferred epreferred 999.000000",
|
||||
"2011-01-12T07:00:00.000Z spot health preferred hpreferred 999.000000"
|
||||
};
|
||||
|
||||
private static Segment segment0;
|
||||
private static Segment segment1;
|
||||
private static Segment segment_override; // this makes segment0 split into three logical segments
|
||||
|
||||
private static List<String> segmentIdentifiers;
|
||||
|
||||
private static QueryRunner runner;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException
|
||||
{
|
||||
CharSource v_0112 = CharSource.wrap(StringUtils.join(SelectQueryRunnerTest.V_0112, "\n"));
|
||||
CharSource v_0113 = CharSource.wrap(StringUtils.join(SelectQueryRunnerTest.V_0113, "\n"));
|
||||
CharSource v_0112 = CharSource.wrap(StringUtils.join(V_0112, "\n"));
|
||||
CharSource v_0113 = CharSource.wrap(StringUtils.join(V_0113, "\n"));
|
||||
CharSource v_override = CharSource.wrap(StringUtils.join(V_OVERRIDE, "\n"));
|
||||
|
||||
IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIncrementalIndex("2011-01-12T00:00:00.000Z"), v_0112);
|
||||
IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIncrementalIndex("2011-01-13T00:00:00.000Z"), v_0113);
|
||||
IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIndex("2011-01-12T00:00:00.000Z"), v_0112);
|
||||
IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIndex("2011-01-13T00:00:00.000Z"), v_0113);
|
||||
IncrementalIndex index2 = TestIndex.loadIncrementalIndex(newIndex("2011-01-12T04:00:00.000Z"), v_override);
|
||||
|
||||
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0));
|
||||
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1));
|
||||
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1"));
|
||||
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
|
||||
segment_override = new IncrementalIndexSegment(index2, makeIdentifier(index2, "v2"));
|
||||
|
||||
runner = QueryRunnerTestHelper.makeFilteringQueryRunner(Arrays.asList(segment0, segment1), factory);
|
||||
VersionedIntervalTimeline<String, Segment> timeline = new VersionedIntervalTimeline(StringComparators.LEXICOGRAPHIC);
|
||||
timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk(segment0));
|
||||
timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk(segment1));
|
||||
timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk(segment_override));
|
||||
|
||||
segmentIdentifiers = Lists.newArrayList();
|
||||
for (TimelineObjectHolder<String, ?> holder : timeline.lookup(new Interval("2011-01-12/2011-01-14"))) {
|
||||
segmentIdentifiers.add(makeIdentifier(holder.getInterval(), holder.getVersion()));
|
||||
}
|
||||
|
||||
runner = QueryRunnerTestHelper.makeFilteringQueryRunner(timeline, factory);
|
||||
}
|
||||
|
||||
private static String makeIdentifier(IncrementalIndex index)
|
||||
private static String makeIdentifier(IncrementalIndex index, String version)
|
||||
{
|
||||
return makeIdentifier(index.getInterval(), version);
|
||||
}
|
||||
|
||||
private static String makeIdentifier(Interval interval, String version)
|
||||
{
|
||||
Interval interval = index.getInterval();
|
||||
return DataSegment.makeDataSegmentIdentifier(
|
||||
QueryRunnerTestHelper.dataSource,
|
||||
interval.getStart(),
|
||||
interval.getEnd(),
|
||||
"v",
|
||||
version,
|
||||
new NoneShardSpec()
|
||||
);
|
||||
}
|
||||
|
||||
private static IncrementalIndex newIncrementalIndex(String minTimeStamp) {
|
||||
return newIncrementalIndex(minTimeStamp, 10000);
|
||||
private static IncrementalIndex newIndex(String minTimeStamp)
|
||||
{
|
||||
return newIndex(minTimeStamp, 10000);
|
||||
}
|
||||
|
||||
private static IncrementalIndex newIncrementalIndex(String minTimeStamp, int maxRowCount)
|
||||
private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount)
|
||||
{
|
||||
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(new DateTime(minTimeStamp).getMillis())
|
||||
.withQueryGranularity(QueryGranularity.NONE)
|
||||
.withQueryGranularity(QueryGranularity.HOUR)
|
||||
.withMetrics(TestIndex.METRIC_AGGS)
|
||||
.build();
|
||||
return new OnheapIncrementalIndex(schema, maxRowCount);
|
||||
return new OnheapIncrementalIndex(schema, true, maxRowCount);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -121,27 +185,53 @@ public class MultiSegmentSelectQueryTest
|
|||
{
|
||||
IOUtils.closeQuietly(segment0);
|
||||
IOUtils.closeQuietly(segment1);
|
||||
IOUtils.closeQuietly(segment_override);
|
||||
}
|
||||
|
||||
private final Druids.SelectQueryBuilder builder =
|
||||
Druids.newSelectQueryBuilder()
|
||||
.dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource))
|
||||
.intervals(SelectQueryRunnerTest.I_0112_0114)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions))
|
||||
.pagingSpec(new PagingSpec(null, 3));
|
||||
|
||||
@Test
|
||||
public void testAllGranularityAscending()
|
||||
{
|
||||
SelectQuery query = builder.build();
|
||||
|
||||
for (int[] expected : new int[][]{
|
||||
{2, -1, -1, -1, 3}, {3, 1, -1, -1, 3}, {-1, 3, 0, -1, 3}, {-1, -1, 3, -1, 3}, {-1, -1, 4, 1, 3},
|
||||
{-1, -1, -1, 4, 3}, {-1, -1, -1, 7, 3}, {-1, -1, -1, 10, 3}, {-1, -1, -1, 12, 2}, {-1, -1, -1, 13, 0}
|
||||
}) {
|
||||
List<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, ImmutableMap.of()),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
Assert.assertEquals(1, results.size());
|
||||
|
||||
SelectResultValue value = results.get(0).getValue();
|
||||
Map<String, Integer> pagingIdentifiers = value.getPagingIdentifiers();
|
||||
for (int i = 0; i < expected.length - 1; i++) {
|
||||
if (expected[i] >= 0) {
|
||||
Assert.assertEquals(expected[i], pagingIdentifiers.get(segmentIdentifiers.get(i)).intValue());
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(expected[expected.length - 1], value.getEvents().size());
|
||||
|
||||
query = query.withPagingSpec(toNextPager(3, query.isDescending(), pagingIdentifiers));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllGranularity()
|
||||
public void testAllGranularityDescending()
|
||||
{
|
||||
PagingSpec pagingSpec = new PagingSpec(null, 3);
|
||||
SelectQuery query = new SelectQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
SelectQueryRunnerTest.I_0112_0114,
|
||||
false,
|
||||
null,
|
||||
QueryRunnerTestHelper.allGran,
|
||||
DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions),
|
||||
Arrays.<String>asList(),
|
||||
pagingSpec,
|
||||
null
|
||||
);
|
||||
SelectQuery query = builder.descending(true).build();
|
||||
|
||||
for (int[] expected : new int[][]{
|
||||
{2, 0, 3}, {5, 0, 3}, {8, 0, 3}, {11, 0, 3}, {12, 1, 3},
|
||||
{0, 4, 3}, {0, 7, 3}, {0, 10, 3}, {0, 12, 2}, {0, 13, 0}
|
||||
{0, 0, 0, -3, 3}, {0, 0, 0, -6, 3}, {0, 0, 0, -9, 3}, {0, 0, 0, -12, 3}, {0, 0, -2, -13, 3},
|
||||
{0, 0, -5, 0, 3}, {0, -3, 0, 0, 3}, {-2, -4, 0, 0, 3}, {-4, 0, 0, 0, 2}, {-5, 0, 0, 0, 0}
|
||||
}) {
|
||||
List<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, ImmutableMap.of()),
|
||||
|
@ -152,36 +242,25 @@ public class MultiSegmentSelectQueryTest
|
|||
SelectResultValue value = results.get(0).getValue();
|
||||
Map<String, Integer> pagingIdentifiers = value.getPagingIdentifiers();
|
||||
|
||||
if (expected[0] != 0) {
|
||||
Assert.assertEquals(expected[0], pagingIdentifiers.get(segment0.getIdentifier()).intValue());
|
||||
for (int i = 0; i < expected.length - 1; i++) {
|
||||
if (expected[i] < 0) {
|
||||
Assert.assertEquals(expected[i], pagingIdentifiers.get(segmentIdentifiers.get(i)).intValue());
|
||||
}
|
||||
}
|
||||
if (expected[1] != 0) {
|
||||
Assert.assertEquals(expected[1], pagingIdentifiers.get(segment1.getIdentifier()).intValue());
|
||||
}
|
||||
Assert.assertEquals(expected[2], value.getEvents().size());
|
||||
Assert.assertEquals(expected[expected.length - 1], value.getEvents().size());
|
||||
|
||||
query = query.withPagingSpec(toNextPager(3, pagingIdentifiers));
|
||||
query = query.withPagingSpec(toNextPager(3, query.isDescending(), pagingIdentifiers));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDayGranularity()
|
||||
public void testDayGranularityAscending()
|
||||
{
|
||||
PagingSpec pagingSpec = new PagingSpec(null, 3);
|
||||
SelectQuery query = new SelectQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
SelectQueryRunnerTest.I_0112_0114,
|
||||
false,
|
||||
null,
|
||||
QueryRunnerTestHelper.dayGran,
|
||||
DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions),
|
||||
Arrays.<String>asList(),
|
||||
pagingSpec,
|
||||
null
|
||||
);
|
||||
SelectQuery query = builder.granularity(QueryRunnerTestHelper.dayGran).build();
|
||||
|
||||
for (int[] expected : new int[][]{
|
||||
{2, 2, 3, 3}, {5, 5, 3, 3}, {8, 8, 3, 3}, {11, 11, 3, 3}, {12, 12, 1, 1}
|
||||
{2, -1, -1, 2, 3, 0, 0, 3}, {3, 1, -1, 5, 1, 2, 0, 3}, {-1, 3, 0, 8, 0, 2, 1, 3},
|
||||
{-1, -1, 3, 11, 0, 0, 3, 3}, {-1, -1, 4, 12, 0, 0, 1, 1}, {-1, -1, 5, 13, 0, 0, 0, 0}
|
||||
}) {
|
||||
List<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, ImmutableMap.of()),
|
||||
|
@ -195,29 +274,57 @@ public class MultiSegmentSelectQueryTest
|
|||
Map<String, Integer> pagingIdentifiers0 = value0.getPagingIdentifiers();
|
||||
Map<String, Integer> pagingIdentifiers1 = value1.getPagingIdentifiers();
|
||||
|
||||
Assert.assertEquals(1, pagingIdentifiers0.size());
|
||||
Assert.assertEquals(1, pagingIdentifiers1.size());
|
||||
|
||||
if (expected[0] != 0) {
|
||||
Assert.assertEquals(expected[0], pagingIdentifiers0.get(segment0.getIdentifier()).intValue());
|
||||
for (int i = 0; i < 4; i++) {
|
||||
if (expected[i] >= 0) {
|
||||
Map<String, Integer> paging = i < 3 ? pagingIdentifiers0 : pagingIdentifiers1;
|
||||
Assert.assertEquals(expected[i], paging.get(segmentIdentifiers.get(i)).intValue());
|
||||
}
|
||||
}
|
||||
if (expected[1] != 0) {
|
||||
Assert.assertEquals(expected[1], pagingIdentifiers1.get(segment1.getIdentifier()).intValue());
|
||||
}
|
||||
Assert.assertEquals(expected[2], value0.getEvents().size());
|
||||
Assert.assertEquals(expected[3], value1.getEvents().size());
|
||||
|
||||
query = query.withPagingSpec(toNextPager(3, pagingIdentifiers0, pagingIdentifiers1));
|
||||
query = query.withPagingSpec(toNextPager(3, query.isDescending(), pagingIdentifiers0, pagingIdentifiers1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDayGranularityDescending()
|
||||
{
|
||||
QueryGranularity granularity = QueryRunnerTestHelper.dayGran;
|
||||
SelectQuery query = builder.granularity(granularity).descending(true).build();
|
||||
|
||||
for (int[] expected : new int[][]{
|
||||
{0, 0, -3, -3, 0, 0, 3, 3}, {0, -1, -5, -6, 0, 1, 2, 3}, {0, -4, 0, -9, 0, 3, 0, 3},
|
||||
{-3, 0, 0, -12, 3, 0, 0, 3}, {-4, 0, 0, -13, 1, 0, 0, 1}, {-5, 0, 0, -14, 0, 0, 0, 0}
|
||||
}) {
|
||||
List<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, ImmutableMap.of()),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
SelectResultValue value0 = results.get(0).getValue();
|
||||
SelectResultValue value1 = results.get(1).getValue();
|
||||
|
||||
Map<String, Integer> pagingIdentifiers0 = value0.getPagingIdentifiers();
|
||||
Map<String, Integer> pagingIdentifiers1 = value1.getPagingIdentifiers();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
if (expected[i] < 0) {
|
||||
Map<String, Integer> paging = i < 3 ? pagingIdentifiers1 : pagingIdentifiers0;
|
||||
Assert.assertEquals(expected[i], paging.get(segmentIdentifiers.get(i)).intValue());
|
||||
}
|
||||
}
|
||||
|
||||
query = query.withPagingSpec(toNextPager(3, query.isDescending(), pagingIdentifiers0, pagingIdentifiers1));
|
||||
}
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
private final PagingSpec toNextPager(int threshold, Map<String, Integer>... pagers)
|
||||
private final PagingSpec toNextPager(int threshold, boolean descending, Map<String, Integer>... pagers)
|
||||
{
|
||||
LinkedHashMap<String, Integer> next = Maps.newLinkedHashMap();
|
||||
for (Map<String, Integer> pager : pagers) {
|
||||
for (Map.Entry<String, Integer> entry : pager.entrySet()) {
|
||||
next.put(entry.getKey(), entry.getValue() + 1);
|
||||
next.put(entry.getKey(), descending ? entry.getValue() - 1 : entry.getValue() + 1);
|
||||
}
|
||||
}
|
||||
return new PagingSpec(next, threshold);
|
||||
|
|
Loading…
Reference in New Issue