From 2686bfa394241673111bfad12de18c334369ac9e Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 17 Feb 2016 16:22:43 +0900 Subject: [PATCH] Select query cannot span to next segment with paging --- .../io/druid/query/select/SelectBinaryFn.java | 18 +- .../io/druid/query/select/SelectQuery.java | 15 ++ .../druid/query/select/SelectQueryEngine.java | 5 +- .../select/SelectQueryQueryToolChest.java | 55 +++++ .../select/SelectResultValueBuilder.java | 83 ++++--- .../java/io/druid/segment/SegmentDesc.java | 85 +++++++ .../io/druid/query/QueryRunnerTestHelper.java | 77 ++++++ .../groupby/GroupByQueryRunnerTestHelper.java | 9 +- .../select/MultiSegmentSelectQueryTest.java | 225 ++++++++++++++++++ .../test/java/io/druid/segment/TestIndex.java | 93 +++++--- 10 files changed, 587 insertions(+), 78 deletions(-) create mode 100644 processing/src/main/java/io/druid/segment/SegmentDesc.java create mode 100644 processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java diff --git a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java index 662d06db7e9..a80b6f3470c 100644 --- a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java +++ b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java @@ -25,6 +25,8 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Result; import org.joda.time.DateTime; +import java.util.List; + /** */ public class SelectBinaryFn @@ -58,14 +60,22 @@ public class SelectBinaryFn return arg1; } + final List arg1Val = arg1.getValue().getEvents(); + final List arg2Val = arg2.getValue().getEvents(); + + if (arg1Val == null || arg1Val.isEmpty()) { + return arg2; + } + + if (arg2Val == null || arg2Val.isEmpty()) { + return arg1; + } + final DateTime timestamp = (gran instanceof AllGranularity) ? arg1.getTimestamp() : gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())); - SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec, descending); - - SelectResultValue arg1Val = arg1.getValue(); - SelectResultValue arg2Val = arg2.getValue(); + SelectResultValueBuilder builder = new SelectResultValueBuilder.MergeBuilder(timestamp, pagingSpec, descending); for (EventHolder event : arg1Val) { builder.addEntry(event); diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 2e39c0f547e..e36bb210562 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -173,6 +173,21 @@ public class SelectQuery extends BaseQuery> ); } + public SelectQuery withPagingSpec(PagingSpec pagingSpec) + { + return new SelectQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + dimFilter, + granularity, + dimensions, + metrics, + pagingSpec, + getContext() + ); + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index d6f1bee5f3e..4546a64ffd0 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -105,6 +105,7 @@ public class SelectQueryEngine cursor.advanceTo(offset.startDelta()); + int lastOffset = offset.startOffset(); for (; !cursor.isDone() && offset.hasNext(); cursor.advance(), offset.next()) { final Map theEvent = Maps.newLinkedHashMap(); theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get())); @@ -145,12 +146,14 @@ public class SelectQueryEngine builder.addEntry( new EventHolder( segment.getIdentifier(), - offset.current(), + lastOffset = offset.current(), theEvent ) ); } + builder.finished(segment.getIdentifier(), lastOffset); + return builder.build(); } } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index eba425bd62b..0fdfb11b08e 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -23,8 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import com.google.inject.Inject; import com.metamx.common.StringUtils; import com.metamx.common.guava.nary.BinaryFn; @@ -42,11 +45,15 @@ import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.DimFilter; +import io.druid.segment.SegmentDesc; +import io.druid.timeline.LogicalSegment; import org.joda.time.DateTime; +import org.joda.time.Interval; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -257,4 +264,52 @@ public class SelectQueryQueryToolChest extends QueryToolChest List filterSegments(SelectQuery query, List segments) + { + PagingSpec pagingSpec = query.getPagingSpec(); + Map paging = pagingSpec.getPagingIdentifiers(); + if (paging == null || paging.isEmpty()) { + return segments; + } + List intervals = Lists.newArrayList( + Iterables.transform( + paging.keySet(), + SegmentDesc.INTERVAL_EXTRACTOR + ) + ); + Collections.sort( + intervals, new Comparator() + { + @Override + public int compare(Interval o1, Interval o2) + { + return Longs.compare(o1.getStartMillis(), o2.getStartMillis()); + } + } + ); + + List queryIntervals = Lists.newArrayList(segments); + + Iterator it = queryIntervals.iterator(); + if (query.isDescending()) { + final long lastEnd = intervals.get(intervals.size() - 1).getEndMillis(); + while (it.hasNext()) { + T segment = it.next(); + if (segment.getInterval().getStartMillis() > lastEnd) { + it.remove(); + } + } + } else { + final long firstStart = intervals.get(0).getStartMillis(); + while (it.hasNext()) { + T segment = it.next(); + if (segment.getInterval().getEndMillis() < firstStart) { + it.remove(); + } + } + } + return queryIntervals; + } } diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java index 9891c87df4d..fba023caafa 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java @@ -56,55 +56,76 @@ public class SelectResultValueBuilder } }; - private final DateTime timestamp; - private final PagingSpec pagingSpec; + protected final DateTime timestamp; + protected final PagingSpec pagingSpec; + protected final boolean descending; - private Queue pQueue = null; + protected final Queue pQueue; + protected final Map pagingIdentifiers; - public SelectResultValueBuilder( - DateTime timestamp, - PagingSpec pagingSpec, - boolean descending - ) + public SelectResultValueBuilder(DateTime timestamp, PagingSpec pagingSpec, boolean descending) { this.timestamp = timestamp; this.pagingSpec = pagingSpec; - - instantiatePQueue(pagingSpec.getThreshold(), descending ? Comparators.inverse(comparator) : comparator); + this.descending = descending; + this.pagingIdentifiers = Maps.newLinkedHashMap(); + this.pQueue = instantiatePQueue(); } - public void addEntry( - EventHolder event - ) + public void addEntry(EventHolder event) { pQueue.add(event); } + public void finished(String segmentId, int lastOffset) + { + pagingIdentifiers.put(segmentId, lastOffset); + } + public Result build() { - // Pull out top aggregated values - List values = Lists.newArrayListWithCapacity(pQueue.size()); - Map pagingIdentifiers = Maps.newLinkedHashMap(); - while (!pQueue.isEmpty()) { - EventHolder event = pQueue.remove(); - pagingIdentifiers.put(event.getSegmentId(), event.getOffset()); - values.add(event); - } - - if (pagingIdentifiers.isEmpty()) { - pagingIdentifiers.putAll(pagingSpec.getPagingIdentifiers()); - } - return new Result( timestamp, - new SelectResultValue(pagingIdentifiers, values) + new SelectResultValue(pagingIdentifiers, getEventHolders()) ); } - private void instantiatePQueue(int threshold, final Comparator comparator) + protected List getEventHolders() { - this.pQueue = threshold > 0 - ? MinMaxPriorityQueue.orderedBy(comparator).maximumSize(threshold).create() - : Queues.newArrayDeque(); + return Lists.newArrayList(pQueue); + } + + protected Queue instantiatePQueue() + { + return Queues.newArrayDeque(); + } + + public static class MergeBuilder extends SelectResultValueBuilder + { + public MergeBuilder(DateTime timestamp, PagingSpec pagingSpec, boolean descending) + { + super(timestamp, pagingSpec, descending); + } + + @Override + protected Queue instantiatePQueue() + { + int threshold = pagingSpec.getThreshold(); + return MinMaxPriorityQueue.orderedBy(descending ? Comparators.inverse(comparator) : comparator) + .maximumSize(threshold > 0 ? threshold : Integer.MAX_VALUE) + .create(); + } + + @Override + protected List getEventHolders() + { + final List values = Lists.newArrayListWithCapacity(pQueue.size()); + while (!pQueue.isEmpty()) { + EventHolder event = pQueue.remove(); + pagingIdentifiers.put(event.getSegmentId(), event.getOffset()); + values.add(event); + } + return values; + } } } diff --git a/processing/src/main/java/io/druid/segment/SegmentDesc.java b/processing/src/main/java/io/druid/segment/SegmentDesc.java new file mode 100644 index 00000000000..178c4fa03a1 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/SegmentDesc.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.google.common.base.Function; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +/** + * identifier to DataSegment. wishfully included in DataSegment + */ +public class SegmentDesc +{ + public static Function INTERVAL_EXTRACTOR = new Function() + { + @Override + public Interval apply(String identifier) + { + return valueOf(identifier).getInterval(); + } + }; + + // ignores shard spec + public static SegmentDesc valueOf(final String identifier) + { + String[] splits = identifier.split(DataSegment.delimiter); + if (splits.length < 4) { + throw new IllegalArgumentException("Invalid identifier " + identifier); + } + String datasource = splits[0]; + DateTime start = new DateTime(splits[1]); + DateTime end = new DateTime(splits[2]); + String version = splits[3]; + + return new SegmentDesc( + datasource, + new Interval(start.getMillis(), end.getMillis()), + version + ); + } + + private final String dataSource; + private final Interval interval; + private final String version; + + public SegmentDesc(String dataSource, Interval interval, String version) + { + this.dataSource = dataSource; + this.interval = interval; + this.version = version; + } + + public String getDataSource() + { + return dataSource; + } + + public Interval getInterval() + { + return interval; + } + + public String getVersion() + { + return version; + } +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index e7987d5f6d3..75ef8c9cd4b 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -25,7 +25,9 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.UOE; +import com.metamx.common.guava.MergeSequence; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -46,6 +48,7 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; import io.druid.segment.TestIndex; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.timeline.LogicalSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -438,6 +441,80 @@ public class QueryRunnerTestHelper ); } + public static QueryRunner makeFilteringQueryRunner( + final List segments, + final QueryRunnerFactory> factory + ) + { + return makeQueryRunner( + Lists.transform( + segments, new Function() + { + @Override + public LogicalWrapper apply(Segment segment) + { + return new LogicalWrapper(segment); + } + } + ), factory + ); + } + + private static QueryRunner makeQueryRunner( + final List segments, + final QueryRunnerFactory> factory + ) + { + final QueryToolChest> toolChest = factory.getToolchest(); + return new FinalizeResultsQueryRunner( + toolChest.postMergeQueryDecoration( + toolChest.mergeResults( + toolChest.preMergeQueryDecoration( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + List> sequences = Lists.newArrayList(); + for (LogicalWrapper segment : toolChest.filterSegments(query, segments)) { + sequences.add(factory.createRunner(segment.segment).run(query, responseContext)); + } + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple(sequences) + ); + } + } + ) + ) + ), + toolChest + ); + } + + // wish Segment implements LogicalSegment + private static class LogicalWrapper implements LogicalSegment + { + private final Segment segment; + + private LogicalWrapper(Segment segment) + { + this.segment = segment; + } + + @Override + public Interval getInterval() + { + return segment.getDataInterval(); + } + + @Override + public String toString() + { + return segment.getIdentifier(); + } + } + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() { return new IntervalChunkingQueryRunnerDecorator(null, null, null) { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTestHelper.java index 5e1053ed6ae..a1976bce776 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTestHelper.java @@ -27,6 +27,7 @@ import com.metamx.common.guava.Sequences; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -38,17 +39,17 @@ import java.util.Map; */ public class GroupByQueryRunnerTestHelper { - public static Iterable runQuery(QueryRunnerFactory factory, QueryRunner runner, GroupByQuery query) + public static Iterable runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) { QueryToolChest toolChest = factory.getToolchest(); - QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)), toolChest ); - Sequence queryResult = theRunner.run(query, Maps.newHashMap()); - return Sequences.toList(queryResult, Lists.newArrayList()); + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + return Sequences.toList(queryResult, Lists.newArrayList()); } public static Row createExpectedRow(final String timestamp, Object... vals) diff --git a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java new file mode 100644 index 00000000000..fcb838c00b0 --- /dev/null +++ b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.select; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.CharSource; +import com.metamx.common.guava.Sequences; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.Result; +import io.druid.query.TableDataSource; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.TestIndex; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + */ +public class MultiSegmentSelectQueryTest +{ + private static final SelectQueryQueryToolChest toolChest = new SelectQueryQueryToolChest( + new DefaultObjectMapper(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + private static final QueryRunnerFactory factory = new SelectQueryRunnerFactory( + toolChest, + new SelectQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + + private static Segment segment0; + private static Segment segment1; + + private static QueryRunner runner; + + @BeforeClass + public static void setup() throws IOException + { + CharSource v_0112 = CharSource.wrap(StringUtils.join(SelectQueryRunnerTest.V_0112, "\n")); + CharSource v_0113 = CharSource.wrap(StringUtils.join(SelectQueryRunnerTest.V_0113, "\n")); + + IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIncrementalIndex("2011-01-12T00:00:00.000Z"), v_0112); + IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIncrementalIndex("2011-01-13T00:00:00.000Z"), v_0113); + + segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0)); + segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1)); + + runner = QueryRunnerTestHelper.makeFilteringQueryRunner(Arrays.asList(segment0, segment1), factory); + } + + private static String makeIdentifier(IncrementalIndex index) + { + Interval interval = index.getInterval(); + return DataSegment.makeDataSegmentIdentifier( + QueryRunnerTestHelper.dataSource, + interval.getStart(), + interval.getEnd(), + "v", + new NoneShardSpec() + ); + } + + private static IncrementalIndex newIncrementalIndex(String minTimeStamp) { + return newIncrementalIndex(minTimeStamp, 10000); + } + + private static IncrementalIndex newIncrementalIndex(String minTimeStamp, int maxRowCount) + { + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime(minTimeStamp).getMillis()) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(TestIndex.METRIC_AGGS) + .build(); + return new OnheapIncrementalIndex(schema, maxRowCount); + } + + @AfterClass + public static void clear() + { + IOUtils.closeQuietly(segment0); + IOUtils.closeQuietly(segment1); + } + + @Test + public void testAllGranularity() + { + PagingSpec pagingSpec = new PagingSpec(null, 3); + SelectQuery query = new SelectQuery( + new TableDataSource(QueryRunnerTestHelper.dataSource), + SelectQueryRunnerTest.I_0112_0114, + false, + null, + QueryRunnerTestHelper.allGran, + DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions), + Arrays.asList(), + pagingSpec, + null + ); + + for (int[] expected : new int[][]{ + {2, 0, 3}, {5, 0, 3}, {8, 0, 3}, {11, 0, 3}, {12, 1, 3}, + {0, 4, 3}, {0, 7, 3}, {0, 10, 3}, {0, 12, 2}, {0, 13, 0} + }) { + List> results = Sequences.toList( + runner.run(query, ImmutableMap.of()), + Lists.>newArrayList() + ); + Assert.assertEquals(1, results.size()); + + SelectResultValue value = results.get(0).getValue(); + Map pagingIdentifiers = value.getPagingIdentifiers(); + + if (expected[0] != 0) { + Assert.assertEquals(expected[0], pagingIdentifiers.get(segment0.getIdentifier()).intValue()); + } + if (expected[1] != 0) { + Assert.assertEquals(expected[1], pagingIdentifiers.get(segment1.getIdentifier()).intValue()); + } + Assert.assertEquals(expected[2], value.getEvents().size()); + + query = query.withPagingSpec(toNextPager(3, pagingIdentifiers)); + } + } + + @Test + public void testDayGranularity() + { + PagingSpec pagingSpec = new PagingSpec(null, 3); + SelectQuery query = new SelectQuery( + new TableDataSource(QueryRunnerTestHelper.dataSource), + SelectQueryRunnerTest.I_0112_0114, + false, + null, + QueryRunnerTestHelper.dayGran, + DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions), + Arrays.asList(), + pagingSpec, + null + ); + + for (int[] expected : new int[][]{ + {2, 2, 3, 3}, {5, 5, 3, 3}, {8, 8, 3, 3}, {11, 11, 3, 3}, {12, 12, 1, 1} + }) { + List> results = Sequences.toList( + runner.run(query, ImmutableMap.of()), + Lists.>newArrayList() + ); + Assert.assertEquals(2, results.size()); + + SelectResultValue value0 = results.get(0).getValue(); + SelectResultValue value1 = results.get(1).getValue(); + + Map pagingIdentifiers0 = value0.getPagingIdentifiers(); + Map pagingIdentifiers1 = value1.getPagingIdentifiers(); + + Assert.assertEquals(1, pagingIdentifiers0.size()); + Assert.assertEquals(1, pagingIdentifiers1.size()); + + if (expected[0] != 0) { + Assert.assertEquals(expected[0], pagingIdentifiers0.get(segment0.getIdentifier()).intValue()); + } + if (expected[1] != 0) { + Assert.assertEquals(expected[1], pagingIdentifiers1.get(segment1.getIdentifier()).intValue()); + } + Assert.assertEquals(expected[2], value0.getEvents().size()); + Assert.assertEquals(expected[3], value1.getEvents().size()); + + query = query.withPagingSpec(toNextPager(3, pagingIdentifiers0, pagingIdentifiers1)); + } + } + + @SafeVarargs + private final PagingSpec toNextPager(int threshold, Map... pagers) + { + LinkedHashMap next = Maps.newLinkedHashMap(); + for (Map pager : pagers) { + for (Map.Entry entry : pager.entrySet()) { + next.put(entry.getKey(), entry.getValue() + 1); + } + } + return new PagingSpec(next, threshold); + } +} diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index c6ecc351c29..7aa22aa238e 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -183,49 +183,66 @@ public class TestIndex .build(); final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000); - final AtomicLong startTime = new AtomicLong(); - int lineCount; try { - lineCount = source.readLines( - new LineProcessor() - { - StringInputRowParser parser = new StringInputRowParser( - new DelimitedParseSpec( - new TimestampSpec("ts", "iso", null), - new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null), - "\t", - "\u0001", - Arrays.asList(COLUMNS) - ) - ); - boolean runOnce = false; - int lineCount = 0; - - @Override - public boolean processLine(String line) throws IOException - { - if (!runOnce) { - startTime.set(System.currentTimeMillis()); - runOnce = true; - } - retVal.add(parser.parse(line)); - - ++lineCount; - return true; - } - - @Override - public Integer getResult() - { - return lineCount; - } - } - ); + return loadIncrementalIndex(retVal, source); } - catch (IOException e) { + catch (Exception e) { realtimeIndex = null; throw Throwables.propagate(e); } + } + + public static IncrementalIndex loadIncrementalIndex( + final IncrementalIndex retVal, + final CharSource source + ) throws IOException + { + final StringInputRowParser parser = new StringInputRowParser( + new DelimitedParseSpec( + new TimestampSpec("ts", "iso", null), + new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null), + "\t", + "\u0001", + Arrays.asList(COLUMNS) + ) + , "utf8" + ); + return loadIncrementalIndex(retVal, source, parser); + } + + public static IncrementalIndex loadIncrementalIndex( + final IncrementalIndex retVal, + final CharSource source, + final StringInputRowParser parser + ) throws IOException + { + final AtomicLong startTime = new AtomicLong(); + int lineCount = source.readLines( + new LineProcessor() + { + boolean runOnce = false; + int lineCount = 0; + + @Override + public boolean processLine(String line) throws IOException + { + if (!runOnce) { + startTime.set(System.currentTimeMillis()); + runOnce = true; + } + retVal.add(parser.parse(line)); + + ++lineCount; + return true; + } + + @Override + public Integer getResult() + { + return lineCount; + } + } + ); log.info("Loaded %,d lines in %,d millis.", lineCount, System.currentTimeMillis() - startTime.get());