mirror of https://github.com/apache/druid.git
Select query cannot span to next segment with paging
This commit is contained in:
parent
8f97b1e40c
commit
2686bfa394
|
@ -25,6 +25,8 @@ import io.druid.granularity.QueryGranularity;
|
|||
import io.druid.query.Result;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SelectBinaryFn
|
||||
|
@ -58,14 +60,22 @@ public class SelectBinaryFn
|
|||
return arg1;
|
||||
}
|
||||
|
||||
final List<EventHolder> arg1Val = arg1.getValue().getEvents();
|
||||
final List<EventHolder> arg2Val = arg2.getValue().getEvents();
|
||||
|
||||
if (arg1Val == null || arg1Val.isEmpty()) {
|
||||
return arg2;
|
||||
}
|
||||
|
||||
if (arg2Val == null || arg2Val.isEmpty()) {
|
||||
return arg1;
|
||||
}
|
||||
|
||||
final DateTime timestamp = (gran instanceof AllGranularity)
|
||||
? arg1.getTimestamp()
|
||||
: gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
|
||||
|
||||
SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec, descending);
|
||||
|
||||
SelectResultValue arg1Val = arg1.getValue();
|
||||
SelectResultValue arg2Val = arg2.getValue();
|
||||
SelectResultValueBuilder builder = new SelectResultValueBuilder.MergeBuilder(timestamp, pagingSpec, descending);
|
||||
|
||||
for (EventHolder event : arg1Val) {
|
||||
builder.addEntry(event);
|
||||
|
|
|
@ -173,6 +173,21 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
);
|
||||
}
|
||||
|
||||
public SelectQuery withPagingSpec(PagingSpec pagingSpec)
|
||||
{
|
||||
return new SelectQuery(
|
||||
getDataSource(),
|
||||
getQuerySegmentSpec(),
|
||||
isDescending(),
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
metrics,
|
||||
pagingSpec,
|
||||
getContext()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -105,6 +105,7 @@ public class SelectQueryEngine
|
|||
|
||||
cursor.advanceTo(offset.startDelta());
|
||||
|
||||
int lastOffset = offset.startOffset();
|
||||
for (; !cursor.isDone() && offset.hasNext(); cursor.advance(), offset.next()) {
|
||||
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
|
||||
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get()));
|
||||
|
@ -145,12 +146,14 @@ public class SelectQueryEngine
|
|||
builder.addEntry(
|
||||
new EventHolder(
|
||||
segment.getIdentifier(),
|
||||
offset.current(),
|
||||
lastOffset = offset.current(),
|
||||
theEvent
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
builder.finished(segment.getIdentifier(), lastOffset);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.StringUtils;
|
||||
import com.metamx.common.guava.nary.BinaryFn;
|
||||
|
@ -42,11 +45,15 @@ import io.druid.query.ResultMergeQueryRunner;
|
|||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.segment.SegmentDesc;
|
||||
import io.druid.timeline.LogicalSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -257,4 +264,52 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
|||
{
|
||||
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends LogicalSegment> List<T> filterSegments(SelectQuery query, List<T> segments)
|
||||
{
|
||||
PagingSpec pagingSpec = query.getPagingSpec();
|
||||
Map<String, Integer> paging = pagingSpec.getPagingIdentifiers();
|
||||
if (paging == null || paging.isEmpty()) {
|
||||
return segments;
|
||||
}
|
||||
List<Interval> intervals = Lists.newArrayList(
|
||||
Iterables.transform(
|
||||
paging.keySet(),
|
||||
SegmentDesc.INTERVAL_EXTRACTOR
|
||||
)
|
||||
);
|
||||
Collections.sort(
|
||||
intervals, new Comparator<Interval>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Interval o1, Interval o2)
|
||||
{
|
||||
return Longs.compare(o1.getStartMillis(), o2.getStartMillis());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
List<T> queryIntervals = Lists.newArrayList(segments);
|
||||
|
||||
Iterator<T> it = queryIntervals.iterator();
|
||||
if (query.isDescending()) {
|
||||
final long lastEnd = intervals.get(intervals.size() - 1).getEndMillis();
|
||||
while (it.hasNext()) {
|
||||
T segment = it.next();
|
||||
if (segment.getInterval().getStartMillis() > lastEnd) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final long firstStart = intervals.get(0).getStartMillis();
|
||||
while (it.hasNext()) {
|
||||
T segment = it.next();
|
||||
if (segment.getInterval().getEndMillis() < firstStart) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
return queryIntervals;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,55 +56,76 @@ public class SelectResultValueBuilder
|
|||
}
|
||||
};
|
||||
|
||||
private final DateTime timestamp;
|
||||
private final PagingSpec pagingSpec;
|
||||
protected final DateTime timestamp;
|
||||
protected final PagingSpec pagingSpec;
|
||||
protected final boolean descending;
|
||||
|
||||
private Queue<EventHolder> pQueue = null;
|
||||
protected final Queue<EventHolder> pQueue;
|
||||
protected final Map<String, Integer> pagingIdentifiers;
|
||||
|
||||
public SelectResultValueBuilder(
|
||||
DateTime timestamp,
|
||||
PagingSpec pagingSpec,
|
||||
boolean descending
|
||||
)
|
||||
public SelectResultValueBuilder(DateTime timestamp, PagingSpec pagingSpec, boolean descending)
|
||||
{
|
||||
this.timestamp = timestamp;
|
||||
this.pagingSpec = pagingSpec;
|
||||
|
||||
instantiatePQueue(pagingSpec.getThreshold(), descending ? Comparators.inverse(comparator) : comparator);
|
||||
this.descending = descending;
|
||||
this.pagingIdentifiers = Maps.newLinkedHashMap();
|
||||
this.pQueue = instantiatePQueue();
|
||||
}
|
||||
|
||||
public void addEntry(
|
||||
EventHolder event
|
||||
)
|
||||
public void addEntry(EventHolder event)
|
||||
{
|
||||
pQueue.add(event);
|
||||
}
|
||||
|
||||
public void finished(String segmentId, int lastOffset)
|
||||
{
|
||||
pagingIdentifiers.put(segmentId, lastOffset);
|
||||
}
|
||||
|
||||
public Result<SelectResultValue> build()
|
||||
{
|
||||
// Pull out top aggregated values
|
||||
List<EventHolder> values = Lists.newArrayListWithCapacity(pQueue.size());
|
||||
Map<String, Integer> pagingIdentifiers = Maps.newLinkedHashMap();
|
||||
while (!pQueue.isEmpty()) {
|
||||
EventHolder event = pQueue.remove();
|
||||
pagingIdentifiers.put(event.getSegmentId(), event.getOffset());
|
||||
values.add(event);
|
||||
}
|
||||
|
||||
if (pagingIdentifiers.isEmpty()) {
|
||||
pagingIdentifiers.putAll(pagingSpec.getPagingIdentifiers());
|
||||
}
|
||||
|
||||
return new Result<SelectResultValue>(
|
||||
timestamp,
|
||||
new SelectResultValue(pagingIdentifiers, values)
|
||||
new SelectResultValue(pagingIdentifiers, getEventHolders())
|
||||
);
|
||||
}
|
||||
|
||||
private void instantiatePQueue(int threshold, final Comparator comparator)
|
||||
protected List<EventHolder> getEventHolders()
|
||||
{
|
||||
this.pQueue = threshold > 0
|
||||
? MinMaxPriorityQueue.orderedBy(comparator).maximumSize(threshold).create()
|
||||
: Queues.newArrayDeque();
|
||||
return Lists.newArrayList(pQueue);
|
||||
}
|
||||
|
||||
protected Queue<EventHolder> instantiatePQueue()
|
||||
{
|
||||
return Queues.newArrayDeque();
|
||||
}
|
||||
|
||||
public static class MergeBuilder extends SelectResultValueBuilder
|
||||
{
|
||||
public MergeBuilder(DateTime timestamp, PagingSpec pagingSpec, boolean descending)
|
||||
{
|
||||
super(timestamp, pagingSpec, descending);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Queue<EventHolder> instantiatePQueue()
|
||||
{
|
||||
int threshold = pagingSpec.getThreshold();
|
||||
return MinMaxPriorityQueue.orderedBy(descending ? Comparators.inverse(comparator) : comparator)
|
||||
.maximumSize(threshold > 0 ? threshold : Integer.MAX_VALUE)
|
||||
.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<EventHolder> getEventHolders()
|
||||
{
|
||||
final List<EventHolder> values = Lists.newArrayListWithCapacity(pQueue.size());
|
||||
while (!pQueue.isEmpty()) {
|
||||
EventHolder event = pQueue.remove();
|
||||
pagingIdentifiers.put(event.getSegmentId(), event.getOffset());
|
||||
values.add(event);
|
||||
}
|
||||
return values;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
* identifier to DataSegment. wishfully included in DataSegment
|
||||
*/
|
||||
public class SegmentDesc
|
||||
{
|
||||
public static Function<String, Interval> INTERVAL_EXTRACTOR = new Function<String, Interval>()
|
||||
{
|
||||
@Override
|
||||
public Interval apply(String identifier)
|
||||
{
|
||||
return valueOf(identifier).getInterval();
|
||||
}
|
||||
};
|
||||
|
||||
// ignores shard spec
|
||||
public static SegmentDesc valueOf(final String identifier)
|
||||
{
|
||||
String[] splits = identifier.split(DataSegment.delimiter);
|
||||
if (splits.length < 4) {
|
||||
throw new IllegalArgumentException("Invalid identifier " + identifier);
|
||||
}
|
||||
String datasource = splits[0];
|
||||
DateTime start = new DateTime(splits[1]);
|
||||
DateTime end = new DateTime(splits[2]);
|
||||
String version = splits[3];
|
||||
|
||||
return new SegmentDesc(
|
||||
datasource,
|
||||
new Interval(start.getMillis(), end.getMillis()),
|
||||
version
|
||||
);
|
||||
}
|
||||
|
||||
private final String dataSource;
|
||||
private final Interval interval;
|
||||
private final String version;
|
||||
|
||||
public SegmentDesc(String dataSource, Interval interval, String version)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.interval = interval;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
public String getVersion()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
}
|
|
@ -25,7 +25,9 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.common.UOE;
|
||||
import com.metamx.common.guava.MergeSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
|
@ -46,6 +48,7 @@ import io.druid.segment.QueryableIndexSegment;
|
|||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.timeline.LogicalSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -438,6 +441,80 @@ public class QueryRunnerTestHelper
|
|||
);
|
||||
}
|
||||
|
||||
public static <T> QueryRunner<T> makeFilteringQueryRunner(
|
||||
final List<Segment> segments,
|
||||
final QueryRunnerFactory<T, Query<T>> factory
|
||||
)
|
||||
{
|
||||
return makeQueryRunner(
|
||||
Lists.transform(
|
||||
segments, new Function<Segment, LogicalWrapper>()
|
||||
{
|
||||
@Override
|
||||
public LogicalWrapper apply(Segment segment)
|
||||
{
|
||||
return new LogicalWrapper(segment);
|
||||
}
|
||||
}
|
||||
), factory
|
||||
);
|
||||
}
|
||||
|
||||
private static <T> QueryRunner<T> makeQueryRunner(
|
||||
final List<LogicalWrapper> segments,
|
||||
final QueryRunnerFactory<T, Query<T>> factory
|
||||
)
|
||||
{
|
||||
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
|
||||
return new FinalizeResultsQueryRunner(
|
||||
toolChest.postMergeQueryDecoration(
|
||||
toolChest.mergeResults(
|
||||
toolChest.preMergeQueryDecoration(
|
||||
new QueryRunner<T>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
|
||||
{
|
||||
List<Sequence<T>> sequences = Lists.newArrayList();
|
||||
for (LogicalWrapper segment : toolChest.filterSegments(query, segments)) {
|
||||
sequences.add(factory.createRunner(segment.segment).run(query, responseContext));
|
||||
}
|
||||
return new MergeSequence<>(
|
||||
query.getResultOrdering(),
|
||||
Sequences.simple(sequences)
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
),
|
||||
toolChest
|
||||
);
|
||||
}
|
||||
|
||||
// wish Segment implements LogicalSegment
|
||||
private static class LogicalWrapper implements LogicalSegment
|
||||
{
|
||||
private final Segment segment;
|
||||
|
||||
private LogicalWrapper(Segment segment)
|
||||
{
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interval getInterval()
|
||||
{
|
||||
return segment.getDataInterval();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return segment.getIdentifier();
|
||||
}
|
||||
}
|
||||
|
||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||
{
|
||||
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.metamx.common.guava.Sequences;
|
|||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryToolChest;
|
||||
|
@ -38,17 +39,17 @@ import java.util.Map;
|
|||
*/
|
||||
public class GroupByQueryRunnerTestHelper
|
||||
{
|
||||
public static Iterable<Row> runQuery(QueryRunnerFactory factory, QueryRunner runner, GroupByQuery query)
|
||||
public static <T> Iterable<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
|
||||
{
|
||||
|
||||
QueryToolChest toolChest = factory.getToolchest();
|
||||
QueryRunner theRunner = new FinalizeResultsQueryRunner<>(
|
||||
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
|
||||
toolChest
|
||||
);
|
||||
|
||||
Sequence<Row> queryResult = theRunner.run(query, Maps.newHashMap());
|
||||
return Sequences.toList(queryResult, Lists.<Row>newArrayList());
|
||||
Sequence<T> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
|
||||
return Sequences.toList(queryResult, Lists.<T>newArrayList());
|
||||
}
|
||||
|
||||
public static Row createExpectedRow(final String timestamp, Object... vals)
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.select;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.CharSource;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MultiSegmentSelectQueryTest
|
||||
{
|
||||
private static final SelectQueryQueryToolChest toolChest = new SelectQueryQueryToolChest(
|
||||
new DefaultObjectMapper(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
);
|
||||
|
||||
private static final QueryRunnerFactory factory = new SelectQueryRunnerFactory(
|
||||
toolChest,
|
||||
new SelectQueryEngine(),
|
||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||
);
|
||||
|
||||
private static Segment segment0;
|
||||
private static Segment segment1;
|
||||
|
||||
private static QueryRunner runner;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException
|
||||
{
|
||||
CharSource v_0112 = CharSource.wrap(StringUtils.join(SelectQueryRunnerTest.V_0112, "\n"));
|
||||
CharSource v_0113 = CharSource.wrap(StringUtils.join(SelectQueryRunnerTest.V_0113, "\n"));
|
||||
|
||||
IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIncrementalIndex("2011-01-12T00:00:00.000Z"), v_0112);
|
||||
IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIncrementalIndex("2011-01-13T00:00:00.000Z"), v_0113);
|
||||
|
||||
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0));
|
||||
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1));
|
||||
|
||||
runner = QueryRunnerTestHelper.makeFilteringQueryRunner(Arrays.asList(segment0, segment1), factory);
|
||||
}
|
||||
|
||||
private static String makeIdentifier(IncrementalIndex index)
|
||||
{
|
||||
Interval interval = index.getInterval();
|
||||
return DataSegment.makeDataSegmentIdentifier(
|
||||
QueryRunnerTestHelper.dataSource,
|
||||
interval.getStart(),
|
||||
interval.getEnd(),
|
||||
"v",
|
||||
new NoneShardSpec()
|
||||
);
|
||||
}
|
||||
|
||||
private static IncrementalIndex newIncrementalIndex(String minTimeStamp) {
|
||||
return newIncrementalIndex(minTimeStamp, 10000);
|
||||
}
|
||||
|
||||
private static IncrementalIndex newIncrementalIndex(String minTimeStamp, int maxRowCount)
|
||||
{
|
||||
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(new DateTime(minTimeStamp).getMillis())
|
||||
.withQueryGranularity(QueryGranularity.NONE)
|
||||
.withMetrics(TestIndex.METRIC_AGGS)
|
||||
.build();
|
||||
return new OnheapIncrementalIndex(schema, maxRowCount);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void clear()
|
||||
{
|
||||
IOUtils.closeQuietly(segment0);
|
||||
IOUtils.closeQuietly(segment1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllGranularity()
|
||||
{
|
||||
PagingSpec pagingSpec = new PagingSpec(null, 3);
|
||||
SelectQuery query = new SelectQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
SelectQueryRunnerTest.I_0112_0114,
|
||||
false,
|
||||
null,
|
||||
QueryRunnerTestHelper.allGran,
|
||||
DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions),
|
||||
Arrays.<String>asList(),
|
||||
pagingSpec,
|
||||
null
|
||||
);
|
||||
|
||||
for (int[] expected : new int[][]{
|
||||
{2, 0, 3}, {5, 0, 3}, {8, 0, 3}, {11, 0, 3}, {12, 1, 3},
|
||||
{0, 4, 3}, {0, 7, 3}, {0, 10, 3}, {0, 12, 2}, {0, 13, 0}
|
||||
}) {
|
||||
List<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, ImmutableMap.of()),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
Assert.assertEquals(1, results.size());
|
||||
|
||||
SelectResultValue value = results.get(0).getValue();
|
||||
Map<String, Integer> pagingIdentifiers = value.getPagingIdentifiers();
|
||||
|
||||
if (expected[0] != 0) {
|
||||
Assert.assertEquals(expected[0], pagingIdentifiers.get(segment0.getIdentifier()).intValue());
|
||||
}
|
||||
if (expected[1] != 0) {
|
||||
Assert.assertEquals(expected[1], pagingIdentifiers.get(segment1.getIdentifier()).intValue());
|
||||
}
|
||||
Assert.assertEquals(expected[2], value.getEvents().size());
|
||||
|
||||
query = query.withPagingSpec(toNextPager(3, pagingIdentifiers));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDayGranularity()
|
||||
{
|
||||
PagingSpec pagingSpec = new PagingSpec(null, 3);
|
||||
SelectQuery query = new SelectQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
SelectQueryRunnerTest.I_0112_0114,
|
||||
false,
|
||||
null,
|
||||
QueryRunnerTestHelper.dayGran,
|
||||
DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions),
|
||||
Arrays.<String>asList(),
|
||||
pagingSpec,
|
||||
null
|
||||
);
|
||||
|
||||
for (int[] expected : new int[][]{
|
||||
{2, 2, 3, 3}, {5, 5, 3, 3}, {8, 8, 3, 3}, {11, 11, 3, 3}, {12, 12, 1, 1}
|
||||
}) {
|
||||
List<Result<SelectResultValue>> results = Sequences.toList(
|
||||
runner.run(query, ImmutableMap.of()),
|
||||
Lists.<Result<SelectResultValue>>newArrayList()
|
||||
);
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
SelectResultValue value0 = results.get(0).getValue();
|
||||
SelectResultValue value1 = results.get(1).getValue();
|
||||
|
||||
Map<String, Integer> pagingIdentifiers0 = value0.getPagingIdentifiers();
|
||||
Map<String, Integer> pagingIdentifiers1 = value1.getPagingIdentifiers();
|
||||
|
||||
Assert.assertEquals(1, pagingIdentifiers0.size());
|
||||
Assert.assertEquals(1, pagingIdentifiers1.size());
|
||||
|
||||
if (expected[0] != 0) {
|
||||
Assert.assertEquals(expected[0], pagingIdentifiers0.get(segment0.getIdentifier()).intValue());
|
||||
}
|
||||
if (expected[1] != 0) {
|
||||
Assert.assertEquals(expected[1], pagingIdentifiers1.get(segment1.getIdentifier()).intValue());
|
||||
}
|
||||
Assert.assertEquals(expected[2], value0.getEvents().size());
|
||||
Assert.assertEquals(expected[3], value1.getEvents().size());
|
||||
|
||||
query = query.withPagingSpec(toNextPager(3, pagingIdentifiers0, pagingIdentifiers1));
|
||||
}
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
private final PagingSpec toNextPager(int threshold, Map<String, Integer>... pagers)
|
||||
{
|
||||
LinkedHashMap<String, Integer> next = Maps.newLinkedHashMap();
|
||||
for (Map<String, Integer> pager : pagers) {
|
||||
for (Map.Entry<String, Integer> entry : pager.entrySet()) {
|
||||
next.put(entry.getKey(), entry.getValue() + 1);
|
||||
}
|
||||
}
|
||||
return new PagingSpec(next, threshold);
|
||||
}
|
||||
}
|
|
@ -183,49 +183,66 @@ public class TestIndex
|
|||
.build();
|
||||
final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000);
|
||||
|
||||
final AtomicLong startTime = new AtomicLong();
|
||||
int lineCount;
|
||||
try {
|
||||
lineCount = source.readLines(
|
||||
new LineProcessor<Integer>()
|
||||
{
|
||||
StringInputRowParser parser = new StringInputRowParser(
|
||||
new DelimitedParseSpec(
|
||||
new TimestampSpec("ts", "iso", null),
|
||||
new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null),
|
||||
"\t",
|
||||
"\u0001",
|
||||
Arrays.asList(COLUMNS)
|
||||
)
|
||||
);
|
||||
boolean runOnce = false;
|
||||
int lineCount = 0;
|
||||
|
||||
@Override
|
||||
public boolean processLine(String line) throws IOException
|
||||
{
|
||||
if (!runOnce) {
|
||||
startTime.set(System.currentTimeMillis());
|
||||
runOnce = true;
|
||||
}
|
||||
retVal.add(parser.parse(line));
|
||||
|
||||
++lineCount;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getResult()
|
||||
{
|
||||
return lineCount;
|
||||
}
|
||||
}
|
||||
);
|
||||
return loadIncrementalIndex(retVal, source);
|
||||
}
|
||||
catch (IOException e) {
|
||||
catch (Exception e) {
|
||||
realtimeIndex = null;
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static IncrementalIndex loadIncrementalIndex(
|
||||
final IncrementalIndex retVal,
|
||||
final CharSource source
|
||||
) throws IOException
|
||||
{
|
||||
final StringInputRowParser parser = new StringInputRowParser(
|
||||
new DelimitedParseSpec(
|
||||
new TimestampSpec("ts", "iso", null),
|
||||
new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null),
|
||||
"\t",
|
||||
"\u0001",
|
||||
Arrays.asList(COLUMNS)
|
||||
)
|
||||
, "utf8"
|
||||
);
|
||||
return loadIncrementalIndex(retVal, source, parser);
|
||||
}
|
||||
|
||||
public static IncrementalIndex loadIncrementalIndex(
|
||||
final IncrementalIndex retVal,
|
||||
final CharSource source,
|
||||
final StringInputRowParser parser
|
||||
) throws IOException
|
||||
{
|
||||
final AtomicLong startTime = new AtomicLong();
|
||||
int lineCount = source.readLines(
|
||||
new LineProcessor<Integer>()
|
||||
{
|
||||
boolean runOnce = false;
|
||||
int lineCount = 0;
|
||||
|
||||
@Override
|
||||
public boolean processLine(String line) throws IOException
|
||||
{
|
||||
if (!runOnce) {
|
||||
startTime.set(System.currentTimeMillis());
|
||||
runOnce = true;
|
||||
}
|
||||
retVal.add(parser.parse(line));
|
||||
|
||||
++lineCount;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getResult()
|
||||
{
|
||||
return lineCount;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
log.info("Loaded %,d lines in %,d millis.", lineCount, System.currentTimeMillis() - startTime.get());
|
||||
|
||||
|
|
Loading…
Reference in New Issue