diff --git a/processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java b/processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java deleted file mode 100644 index 2d700d00333..00000000000 --- a/processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.query; - -import com.google.common.collect.Lists; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import org.joda.time.Interval; - -import java.util.List; -import java.util.Map; - -/** - */ -public class ReportTimelineMissingIntervalQueryRunner implements QueryRunner -{ - private final Interval interval; - - public ReportTimelineMissingIntervalQueryRunner(Interval interval) - { - this.interval = interval; - } - - @Override - public Sequence run( - Query query, Map responseContext - ) - { - List missingIntervals = (List) responseContext.get(Result.MISSING_INTERVALS_KEY); - if (missingIntervals == null) { - missingIntervals = Lists.newArrayList(); - responseContext.put(Result.MISSING_INTERVALS_KEY, missingIntervals); - } - missingIntervals.add(interval); - return Sequences.empty(); - } -} diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 17d62a06fe1..1cf8593bf63 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -21,7 +21,6 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -29,10 +28,8 @@ import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingSequenceBase; import com.metamx.emitter.EmittingLogger; -import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.segment.SegmentMissingException; -import org.joda.time.Interval; import java.util.List; import java.util.Map; @@ -60,10 +57,10 @@ public class RetryQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final Query query, final Map context) { final List> listOfSequences = Lists.newArrayList(); - listOfSequences.add(baseRunner.run(query, responseContext)); + listOfSequences.add(baseRunner.run(query, context)); return new YieldingSequenceBase() { @@ -72,100 +69,48 @@ public class RetryQueryRunner implements QueryRunner OutType initValue, YieldingAccumulator accumulator ) { - // Try to find missing segments - doRetryLogic( - responseContext, - Result.MISSING_SEGMENTS_KEY, - new TypeReference>() - { - }, - new Function, Query>() - { - @Override - public Query apply(List input) - { - return query.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - input - ) - ); - } - }, - listOfSequences - ); + final List missingSegments = getMissingSegments(context); - // Try to find missing intervals - doRetryLogic( - responseContext, - Result.MISSING_INTERVALS_KEY, - new TypeReference>() - { - }, - new Function, Query>() - { - @Override - public Query apply(List input) - { - return query.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec( - input - ) - ); - } - }, - listOfSequences - ); + if (!missingSegments.isEmpty()) { + for (int i = 0; i < config.getNumTries(); i++) { + log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); + + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + final Query retryQuery = query.withQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + missingSegments + ) + ); + Sequence retrySequence = baseRunner.run(retryQuery, context); + listOfSequences.add(retrySequence); + if (getMissingSegments(context).isEmpty()) { + break; + } + } + + final List finalMissingSegs = getMissingSegments(context); + if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) { + throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); + } + } return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); } }; } - private void doRetryLogic( - final Map responseContext, - final String key, - final TypeReference> typeReference, - final Function, Query> function, - final List> listOfSequences - ) + private List getMissingSegments(final Map context) { - final List missingItems = getMissingItems(responseContext, key, typeReference); - - if (!missingItems.isEmpty()) { - for (int i = 0; i < config.getNumTries(); i++) { - log.info("[%,d] missing items found. Retry attempt [%,d]", missingItems.size(), i); - - responseContext.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); - final Query retryQuery = function.apply(missingItems); - Sequence retrySequence = baseRunner.run(retryQuery, responseContext); - listOfSequences.add(retrySequence); - if (getMissingItems(responseContext, key, typeReference).isEmpty()) { - break; - } - } - - final List finalMissingItems = getMissingItems(responseContext, key, typeReference); - if (!config.isReturnPartialResults() && !finalMissingItems.isEmpty()) { - throw new SegmentMissingException("No results found for items[%s]", finalMissingItems); - } - } - } - - private List getMissingItems( - final Map context, - final String key, - final TypeReference> typeReference - ) - { - final Object maybeMissing = context.get(key); - if (maybeMissing == null) { + final Object maybeMissingSegments = context.get(Result.MISSING_SEGMENTS_KEY); + if (maybeMissingSegments == null) { return Lists.newArrayList(); } return jsonMapper.convertValue( - maybeMissing, - typeReference + maybeMissingSegments, + new TypeReference>() + { + } ); } } - diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index 3b5e16261b7..ec3a95a4cca 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -51,9 +51,9 @@ public class RetryQueryRunnerTest new QueryRunner>() { @Override - public Sequence> run(Query query, Map responseContext) + public Sequence> run(Query query, Map context) { - ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -96,44 +96,6 @@ public class RetryQueryRunnerTest Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); } - @Test - public void testRunWithMissingInterval() throws Exception - { - Map context = new MapMaker().makeMap(); - context.put(Result.MISSING_INTERVALS_KEY, Lists.newArrayList()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new ReportTimelineMissingIntervalQueryRunner>(new Interval("2013/2014")), - (QueryToolChest) new TimeseriesQueryQueryToolChest( - new QueryConfig() - ), - new RetryQueryRunnerConfig() - { - @Override - public int getNumTries() - { - return 0; - } - - @Override - public boolean isReturnPartialResults() - { - return true; - } - }, - jsonMapper - ); - - Iterable> actualResults = Sequences.toList( - runner.run(query, context), - Lists.>newArrayList() - ); - - Assert.assertTrue( - "Should have one entry in the list of missing segments", - ((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1 - ); - Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); - } @Test public void testRetry() throws Exception @@ -147,11 +109,11 @@ public class RetryQueryRunnerTest @Override public Sequence> run( Query> query, - Map responseContext + Map context ) { - if ((int) responseContext.get("count") == 0) { - ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( + if ((int) context.get("count") == 0) { + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -159,7 +121,7 @@ public class RetryQueryRunnerTest ), "test", 1 ) ); - responseContext.put("count", 1); + context.put("count", 1); return Sequences.empty(); } else { return Sequences.simple( @@ -214,11 +176,11 @@ public class RetryQueryRunnerTest @Override public Sequence> run( Query> query, - Map responseContext + Map context ) { - if ((int) responseContext.get("count") < 3) { - ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( + if ((int) context.get("count") < 3) { + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -226,7 +188,7 @@ public class RetryQueryRunnerTest ), "test", 1 ) ); - responseContext.put("count", (int) responseContext.get("count") + 1); + context.put("count", (int) context.get("count") + 1); return Sequences.empty(); } else { return Sequences.simple( @@ -280,10 +242,10 @@ public class RetryQueryRunnerTest @Override public Sequence> run( Query> query, - Map responseContext + Map context ) { - ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -299,9 +261,12 @@ public class RetryQueryRunnerTest ), new RetryQueryRunnerConfig() { - public int getNumTries() { return 1; } + private int numTries = 1; + private boolean returnPartialResults = false; - public boolean returnPartialResults() { return false; } + public int getNumTries() { return numTries; } + + public boolean returnPartialResults() { return returnPartialResults; } }, jsonMapper ); @@ -316,34 +281,4 @@ public class RetryQueryRunnerTest ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1 ); } - - @Test(expected = SegmentMissingException.class) - public void testIntervalMissingCausesException() throws Exception - { - Map context = new MapMaker().makeMap(); - context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new ReportTimelineMissingIntervalQueryRunner>(new Interval("2013/2014")), - (QueryToolChest) new TimeseriesQueryQueryToolChest( - new QueryConfig() - ), - new RetryQueryRunnerConfig() - { - public int getNumTries() { return 1; } - - public boolean returnPartialResults() { return false; } - }, - jsonMapper - ); - - Iterable> actualResults = Sequences.toList( - runner.run(query, context), - Lists.>newArrayList() - ); - - Assert.assertTrue( - "Should have one entry in the list of missing segments", - ((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1 - ); - } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 4a0d04bc44d..292cad4b26e 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -47,7 +47,6 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; -import io.druid.query.ReportTimelineMissingIntervalQueryRunner; import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.TableDataSource; @@ -265,57 +264,47 @@ public class ServerManager implements QuerySegmentWalker FunctionalIterable> queryRunners = FunctionalIterable .create(intervals) .transformCat( - new Function>>() + new Function>>() { @Override - public Iterable> apply(final Interval interval) + public Iterable> apply(Interval input) { - Iterable> holders = timeline.lookup(interval); - - if (holders == null) { - return Arrays.>asList(new ReportTimelineMissingIntervalQueryRunner(interval)); + return timeline.lookup(input); + } + } + ) + .transformCat( + new Function, Iterable>>() + { + @Override + public Iterable> apply( + @Nullable + final TimelineObjectHolder holder + ) + { + if (holder == null) { + return null; } return FunctionalIterable - .create(holders) - .transformCat( - new Function, Iterable>>() + .create(holder.getObject()) + .transform( + new Function, QueryRunner>() { @Override - public Iterable> apply( - @Nullable final TimelineObjectHolder holder - ) + public QueryRunner apply(PartitionChunk input) { - if (holder == null) { - return Arrays.>asList( - new ReportTimelineMissingIntervalQueryRunner( - interval - ) - ); - } + return buildAndDecorateQueryRunner( + factory, + toolChest, + input.getObject(), + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + input.getChunkNumber() + ) - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(PartitionChunk input) - { - return buildAndDecorateQueryRunner( - factory, - toolChest, - input.getObject(), - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - input.getChunkNumber() - ) - - ); - } - } - ); + ); } } ); @@ -349,7 +338,9 @@ public class ServerManager implements QuerySegmentWalker String dataSourceName = getDataSourceName(query.getDataSource()); - final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); + final VersionedIntervalTimeline timeline = dataSources.get( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner();