From 3d9d989a9f4d95b7e2e2bf0b39d7ab41548c12a5 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 20 Nov 2014 12:04:37 -0800 Subject: [PATCH] A set of fixes to retry the query for missing intervals in the timeline --- .../io/druid/query/BySegmentQueryRunner.java | 6 +- .../query/BySegmentSkippingQueryRunner.java | 6 +- .../query/ChainedExecutionQueryRunner.java | 11 +- .../io/druid/query/ConcatQueryRunner.java | 4 +- .../query/FinalizeResultsQueryRunner.java | 4 +- .../query/GroupByParallelQueryRunner.java | 6 +- .../query/IntervalChunkingQueryRunner.java | 6 +- .../query/MetricsEmittingQueryRunner.java | 6 +- .../java/io/druid/query/NoopQueryRunner.java | 2 +- .../main/java/io/druid/query/QueryRunner.java | 2 +- .../ReferenceCountingSegmentQueryRunner.java | 4 +- ...ortTimelineMissingIntervalQueryRunner.java | 54 ++++++++ ...portTimelineMissingSegmentQueryRunner.java | 53 ++++++++ .../src/main/java/io/druid/query/Result.java | 3 + .../java/io/druid/query/RetryQueryRunner.java | 119 +++++++++++++----- .../io/druid/query/SubqueryQueryRunner.java | 6 +- .../java/io/druid/query/TimewarpOperator.java | 4 +- .../java/io/druid/query/UnionQueryRunner.java | 6 +- .../groupby/GroupByQueryQueryToolChest.java | 10 +- .../groupby/GroupByQueryRunnerFactory.java | 8 +- .../SegmentMetadataQueryRunnerFactory.java | 6 +- .../search/SearchQueryQueryToolChest.java | 6 +- .../druid/query/search/SearchQueryRunner.java | 2 +- .../select/SelectQueryRunnerFactory.java | 2 +- .../spec/SpecificSegmentQueryRunner.java | 10 +- .../TimeBoundaryQueryRunnerFactory.java | 2 +- .../TimeseriesQueryRunnerFactory.java | 2 +- .../query/topn/TopNQueryQueryToolChest.java | 6 +- .../query/topn/TopNQueryRunnerFactory.java | 2 +- .../ChainedExecutionQueryRunnerTest.java | 2 +- .../io/druid/query/RetryQueryRunnerTest.java | 118 +++++++++++++---- .../io/druid/query/TimewarpOperatorTest.java | 6 +- .../query/groupby/GroupByQueryRunnerTest.java | 12 +- .../GroupByTimeseriesQueryRunnerTest.java | 4 +- .../spec/SpecificSegmentQueryRunnerTest.java | 8 +- .../TimeBoundaryQueryRunnerTest.java | 4 +- .../TimeSeriesUnionQueryRunnerTest.java | 3 +- .../druid/client/CachingClusteredClient.java | 6 +- .../io/druid/client/CachingQueryRunner.java | 6 +- .../bridge/BridgeQuerySegmentWalker.java | 2 +- .../server/coordination/ServerManager.java | 102 ++++++++------- .../druid/client/CachingQueryRunnerTest.java | 4 +- .../coordination/ServerManagerTest.java | 4 +- 43 files changed, 438 insertions(+), 201 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java create mode 100644 processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index c1cf7886bfc..ec780942485 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -49,11 +49,11 @@ public class BySegmentQueryRunner implements QueryRunner @Override @SuppressWarnings("unchecked") - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { if (query.getContextBySegment(false)) { - final Sequence baseSequence = base.run(query, context); + final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( Arrays.asList( @@ -68,6 +68,6 @@ public class BySegmentQueryRunner implements QueryRunner ) ); } - return base.run(query, context); + return base.run(query, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 5f9651f5222..303abebb575 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -37,13 +37,13 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { if (query.getContextBySegment(false)) { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } - return doRun(baseRunner, query, context); + return doRun(baseRunner, query, responseContext); } protected abstract Sequence doRun(QueryRunner baseRunner, Query query, Map context); diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index e5edfd3d4cf..196e6e5cb39 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -20,7 +20,6 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -89,12 +88,12 @@ public class ChainedExecutionQueryRunner implements QueryRunner // since it already implements ListeningExecutorService this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; - this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.queryables = Iterables.unmodifiableIterable(queryables); this.queryWatcher = queryWatcher; } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final int priority = query.getContextPriority(0); @@ -125,11 +124,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Iterable call() throws Exception { try { - if (input == null) { - throw new ISE("Input is null?! How is this possible?!"); - } - - Sequence result = input.run(query, context); + Sequence result = input.run(query, responseContext); if (result == null) { throw new ISE("Got a null result! Segments are missing!"); } diff --git a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java index 74c4a6481f5..0870ea87cf3 100644 --- a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java @@ -39,7 +39,7 @@ public class ConcatQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { return Sequences.concat( Sequences.map( @@ -49,7 +49,7 @@ public class ConcatQueryRunner implements QueryRunner @Override public Sequence apply(final QueryRunner input) { - return input.run(query, context); + return input.run(query, responseContext); } } ) diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 1794024b60c..d9c27c02dfc 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,7 +49,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { final boolean isBySegment = query.getContextBySegment(false); final boolean shouldFinalize = query.getContextFinalize(true); @@ -102,7 +102,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner return Sequences.map( - baseRunner.run(queryToRun, context), + baseRunner.run(queryToRun, responseContext), finalizerFn ); diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 5997518dddd..1ec7c596132 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -78,7 +78,7 @@ public class GroupByParallelQueryRunner implements QueryRunner } @Override - public Sequence run(final Query queryParam, final Map context) + public Sequence run(final Query queryParam, final Map responseContext) { final GroupByQuery query = (GroupByQuery) queryParam; final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( @@ -111,10 +111,10 @@ public class GroupByParallelQueryRunner implements QueryRunner { try { if (bySegment) { - input.run(queryParam, context) + input.run(queryParam, responseContext) .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); } else { - input.run(queryParam, context) + input.run(queryParam, responseContext) .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index 557420aa377..59ec44ba8e7 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -49,10 +49,10 @@ public class IntervalChunkingQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { if (period.getMillis() == 0) { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } return Sequences.concat( @@ -76,7 +76,7 @@ public class IntervalChunkingQueryRunner implements QueryRunner { return baseRunner.run( query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), - context + responseContext ); } } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 5a8005185a7..a2f6863d326 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -84,7 +84,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final ServiceMetricEvent.Builder builder = builderFn.apply(query); String queryId = query.getId(); @@ -102,7 +102,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner long startTime = System.currentTimeMillis(); try { - retVal = queryRunner.run(query, context).accumulate(outType, accumulator); + retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator); } catch (RuntimeException e) { builder.setUser10("failed"); @@ -132,7 +132,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner long startTime = System.currentTimeMillis(); try { - retVal = queryRunner.run(query, context).toYielder(initValue, accumulator); + retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator); } catch (RuntimeException e) { builder.setUser10("failed"); diff --git a/processing/src/main/java/io/druid/query/NoopQueryRunner.java b/processing/src/main/java/io/druid/query/NoopQueryRunner.java index d2f3863ab62..63b8ffd3a4e 100644 --- a/processing/src/main/java/io/druid/query/NoopQueryRunner.java +++ b/processing/src/main/java/io/druid/query/NoopQueryRunner.java @@ -30,7 +30,7 @@ import java.util.Map; public class NoopQueryRunner implements QueryRunner { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return Sequences.empty(); } diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java index d7a3f8af36f..9ed07edb915 100644 --- a/processing/src/main/java/io/druid/query/QueryRunner.java +++ b/processing/src/main/java/io/druid/query/QueryRunner.java @@ -27,5 +27,5 @@ import java.util.Map; */ public interface QueryRunner { - public Sequence run(Query query, Map context); + public Sequence run(Query query, Map responseContext); } \ No newline at end of file diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index 736c60f76ab..fc393ab2a0c 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -45,11 +45,11 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { final Closeable closeable = adapter.increment(); try { - final Sequence baseSequence = factory.createRunner(adapter).run(query, context); + final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); return new ResourceClosingSequence(baseSequence, closeable); } diff --git a/processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java b/processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java new file mode 100644 index 00000000000..2d700d00333 --- /dev/null +++ b/processing/src/main/java/io/druid/query/ReportTimelineMissingIntervalQueryRunner.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Map; + +/** + */ +public class ReportTimelineMissingIntervalQueryRunner implements QueryRunner +{ + private final Interval interval; + + public ReportTimelineMissingIntervalQueryRunner(Interval interval) + { + this.interval = interval; + } + + @Override + public Sequence run( + Query query, Map responseContext + ) + { + List missingIntervals = (List) responseContext.get(Result.MISSING_INTERVALS_KEY); + if (missingIntervals == null) { + missingIntervals = Lists.newArrayList(); + responseContext.put(Result.MISSING_INTERVALS_KEY, missingIntervals); + } + missingIntervals.add(interval); + return Sequences.empty(); + } +} diff --git a/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java new file mode 100644 index 00000000000..e41419f75ae --- /dev/null +++ b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; + +import java.util.List; +import java.util.Map; + +/** + */ +public class ReportTimelineMissingSegmentQueryRunner implements QueryRunner +{ + private final SegmentDescriptor descriptor; + + public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor) + { + this.descriptor = descriptor; + } + + @Override + public Sequence run( + Query query, Map responseContext + ) + { + List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); + if (missingSegments == null) { + missingSegments = Lists.newArrayList(); + responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments); + } + missingSegments.add(descriptor); + return Sequences.empty(); + } +} diff --git a/processing/src/main/java/io/druid/query/Result.java b/processing/src/main/java/io/druid/query/Result.java index bb299d51c78..c1625620c18 100644 --- a/processing/src/main/java/io/druid/query/Result.java +++ b/processing/src/main/java/io/druid/query/Result.java @@ -27,6 +27,9 @@ import org.joda.time.DateTime; */ public class Result implements Comparable> { + public static String MISSING_SEGMENTS_KEY = "missingSegments"; + public static String MISSING_INTERVALS_KEY = "missingIntervals"; + private final DateTime timestamp; private final T value; diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 9f6fd5d474a..17d62a06fe1 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -21,6 +21,7 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -28,15 +29,16 @@ import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingSequenceBase; import com.metamx.emitter.EmittingLogger; +import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.segment.SegmentMissingException; +import org.joda.time.Interval; import java.util.List; import java.util.Map; public class RetryQueryRunner implements QueryRunner { - public static String MISSING_SEGMENTS_KEY = "missingSegments"; private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class); private final QueryRunner baseRunner; @@ -58,10 +60,10 @@ public class RetryQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final List> listOfSequences = Lists.newArrayList(); - listOfSequences.add(baseRunner.run(query, context)); + listOfSequences.add(baseRunner.run(query, responseContext)); return new YieldingSequenceBase() { @@ -70,48 +72,99 @@ public class RetryQueryRunner implements QueryRunner OutType initValue, YieldingAccumulator accumulator ) { - final List missingSegments = getMissingSegments(context); + // Try to find missing segments + doRetryLogic( + responseContext, + Result.MISSING_SEGMENTS_KEY, + new TypeReference>() + { + }, + new Function, Query>() + { + @Override + public Query apply(List input) + { + return query.withQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + input + ) + ); + } + }, + listOfSequences + ); - if (!missingSegments.isEmpty()) { - for (int i = 0; i < config.getNumTries(); i++) { - log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); - - context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); - final Query retryQuery = query.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - missingSegments - ) - ); - Sequence retrySequence = baseRunner.run(retryQuery, context); - listOfSequences.add(retrySequence); - if (getMissingSegments(context).isEmpty()) { - break; - } - } - - final List finalMissingSegs = getMissingSegments(context); - if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) { - throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); - } - } + // Try to find missing intervals + doRetryLogic( + responseContext, + Result.MISSING_INTERVALS_KEY, + new TypeReference>() + { + }, + new Function, Query>() + { + @Override + public Query apply(List input) + { + return query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec( + input + ) + ); + } + }, + listOfSequences + ); return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); } }; } - private List getMissingSegments(final Map context) + private void doRetryLogic( + final Map responseContext, + final String key, + final TypeReference> typeReference, + final Function, Query> function, + final List> listOfSequences + ) { - final Object maybeMissingSegments = context.get(MISSING_SEGMENTS_KEY); - if (maybeMissingSegments == null) { + final List missingItems = getMissingItems(responseContext, key, typeReference); + + if (!missingItems.isEmpty()) { + for (int i = 0; i < config.getNumTries(); i++) { + log.info("[%,d] missing items found. Retry attempt [%,d]", missingItems.size(), i); + + responseContext.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + final Query retryQuery = function.apply(missingItems); + Sequence retrySequence = baseRunner.run(retryQuery, responseContext); + listOfSequences.add(retrySequence); + if (getMissingItems(responseContext, key, typeReference).isEmpty()) { + break; + } + } + + final List finalMissingItems = getMissingItems(responseContext, key, typeReference); + if (!config.isReturnPartialResults() && !finalMissingItems.isEmpty()) { + throw new SegmentMissingException("No results found for items[%s]", finalMissingItems); + } + } + } + + private List getMissingItems( + final Map context, + final String key, + final TypeReference> typeReference + ) + { + final Object maybeMissing = context.get(key); + if (maybeMissing == null) { return Lists.newArrayList(); } return jsonMapper.convertValue( - maybeMissingSegments, - new TypeReference>() - { - } + maybeMissing, + typeReference ); } } diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java index d16a660e25a..983779cf073 100644 --- a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java @@ -39,13 +39,13 @@ public class SubqueryQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map context) + public Sequence run(final Query query, Map responseContext) { DataSource dataSource = query.getDataSource(); if (dataSource instanceof QueryDataSource) { - return run((Query) ((QueryDataSource) dataSource).getQuery(), context); + return run((Query) ((QueryDataSource) dataSource).getQuery(), responseContext); } else { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } } } diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index 2f26f6890f1..ce3fff5fcda 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -81,7 +81,7 @@ public class TimewarpOperator implements PostProcessingOperator return new QueryRunner() { @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final long offset = computeOffset(now); @@ -93,7 +93,7 @@ public class TimewarpOperator implements PostProcessingOperator return Sequences.map( baseRunner.run( query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), - context + responseContext ), new Function() { diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index 2426bda9310..42b65d48da0 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -44,7 +44,7 @@ public class UnionQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { @@ -59,7 +59,7 @@ public class UnionQueryRunner implements QueryRunner { return baseRunner.run( query.withDataSource(singleSource), - context + responseContext ); } } @@ -67,7 +67,7 @@ public class UnionQueryRunner implements QueryRunner ) ); } else { - return baseRunner.run(query, context); + return baseRunner.run(query, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 790d83482c3..e35c876971d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -111,16 +111,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { @Override - public Sequence run(Query input, Map context) + public Sequence run(Query input, Map responseContext) { if (input.getContextBySegment(false)) { - return runner.run(input, context); + return runner.run(input, responseContext); } if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { - return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, context); + return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner, + responseContext + ); } - return runner.run(input, context); + return runner.run(input, responseContext); } }; } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index cd86d16a92b..72cdcb27b73 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -108,7 +108,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory() { @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final GroupByQuery queryParam = (GroupByQuery) query; final Pair> indexAccumulatorPair = GroupByQueryHelper @@ -128,13 +128,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(Query input, Map context) + public Sequence run(Query input, Map responseContext) { if (!(input instanceof GroupByQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index ac46c49d2a2..c0efdcdd502 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -77,7 +77,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory() { @Override - public Sequence run(Query inQ, Map context) + public Sequence run(Query inQ, Map responseContext) { SegmentMetadataQuery query = (SegmentMetadataQuery) inQ; @@ -138,7 +138,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory run( final Query query, - final Map context + final Map responseContext ) { final int priority = query.getContextPriority(0); @@ -148,7 +148,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory call() throws Exception { - return input.run(query, context); + return input.run(query, responseContext); } } ); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 79d9686cee8..f4c38c87507 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -280,7 +280,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof SearchQuery)) { @@ -289,13 +289,13 @@ public class SearchQueryQueryToolChest extends QueryToolChest, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java index ec2c576e0e3..2b91847da57 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java @@ -73,7 +73,7 @@ public class SearchQueryRunner implements QueryRunner> @Override public Sequence> run( final Query> input, - Map context + Map responseContext ) { if (!(input instanceof SearchQuery)) { diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 5210a56ae6a..7efb8d4e2df 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -92,7 +92,7 @@ public class SelectQueryRunnerFactory @Override public Sequence> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof SelectQuery)) { diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index b8b68f86895..65eb5e4d4e4 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -27,7 +27,7 @@ import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import io.druid.query.Query; import io.druid.query.QueryRunner; -import io.druid.query.RetryQueryRunner; +import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.segment.SegmentMissingException; @@ -53,7 +53,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner } @Override - public Sequence run(final Query input, final Map context) + public Sequence run(final Query input, final Map responseContext) { final Query query = input.withQuerySegmentSpec(specificSpec); @@ -67,7 +67,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public Sequence call() throws Exception { - return base.run(query, context); + return base.run(query, responseContext); } } ); @@ -87,10 +87,10 @@ public class SpecificSegmentQueryRunner implements QueryRunner return baseSequence.accumulate(initValue, accumulator); } catch (SegmentMissingException e) { - List missingSegments = (List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); if (missingSegments == null) { missingSegments = Lists.newArrayList(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments); + responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments); } missingSegments.add(specificSpec.getDescriptor()); return initValue; diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 7c302706b34..af6bd529c44 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -87,7 +87,7 @@ public class TimeBoundaryQueryRunnerFactory @Override public Sequence> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TimeBoundaryQuery)) { diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 3da84f7ff24..322e779c385 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -93,7 +93,7 @@ public class TimeseriesQueryRunnerFactory @Override public Sequence> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TimeseriesQuery)) { diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index b52a1c127d4..39244d090c0 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -414,7 +414,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TopNQuery)) { @@ -423,13 +423,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest minTopNThreshold) { - return runner.run(query, context); + return runner.run(query, responseContext); } final boolean isBySegment = query.getContextBySegment(false); return Sequences.map( - runner.run(query.withThreshold(minTopNThreshold), context), + runner.run(query.withThreshold(minTopNThreshold), responseContext), new Function, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 54d5286254b..d8e096f457d 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -67,7 +67,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory> run( Query> input, - Map context + Map responseContext ) { if (!(input instanceof TopNQuery)) { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 489c36e799f..9ba455eaed8 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -279,7 +279,7 @@ public class ChainedExecutionQueryRunnerTest } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { hasStarted = true; start.countDown(); diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index f79e9279026..3b5e16261b7 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -46,14 +46,14 @@ public class RetryQueryRunnerTest public void testRunWithMissingSegments() throws Exception { Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @Override - public Sequence> run(Query query, Map context) + public Sequence> run(Query query, Map responseContext) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -70,7 +70,8 @@ public class RetryQueryRunnerTest new RetryQueryRunnerConfig() { @Override - public int getNumTries() { + public int getNumTries() + { return 0; } @@ -90,29 +91,67 @@ public class RetryQueryRunnerTest Assert.assertTrue( "Should have one entry in the list of missing segments", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1 ); Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); } + @Test + public void testRunWithMissingInterval() throws Exception + { + Map context = new MapMaker().makeMap(); + context.put(Result.MISSING_INTERVALS_KEY, Lists.newArrayList()); + RetryQueryRunner> runner = new RetryQueryRunner<>( + new ReportTimelineMissingIntervalQueryRunner>(new Interval("2013/2014")), + (QueryToolChest) new TimeseriesQueryQueryToolChest( + new QueryConfig() + ), + new RetryQueryRunnerConfig() + { + @Override + public int getNumTries() + { + return 0; + } + + @Override + public boolean isReturnPartialResults() + { + return true; + } + }, + jsonMapper + ); + + Iterable> actualResults = Sequences.toList( + runner.run(query, context), + Lists.>newArrayList() + ); + + Assert.assertTrue( + "Should have one entry in the list of missing segments", + ((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1 + ); + Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); + } @Test public void testRetry() throws Exception { Map context = new MapMaker().makeMap(); context.put("count", 0); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { - if ((int) context.get("count") == 0) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + if ((int) responseContext.get("count") == 0) { + ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -120,7 +159,7 @@ public class RetryQueryRunnerTest ), "test", 1 ) ); - context.put("count", 1); + responseContext.put("count", 1); return Sequences.empty(); } else { return Sequences.simple( @@ -159,7 +198,7 @@ public class RetryQueryRunnerTest Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0 ); } @@ -168,18 +207,18 @@ public class RetryQueryRunnerTest { Map context = new MapMaker().makeMap(); context.put("count", 0); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { - if ((int) context.get("count") < 3) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + if ((int) responseContext.get("count") < 3) { + ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -187,7 +226,7 @@ public class RetryQueryRunnerTest ), "test", 1 ) ); - context.put("count", (int) context.get("count") + 1); + responseContext.put("count", (int) responseContext.get("count") + 1); return Sequences.empty(); } else { return Sequences.simple( @@ -226,7 +265,7 @@ public class RetryQueryRunnerTest Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); Assert.assertTrue( "Should have nothing in missingSegment list", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 0 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 0 ); } @@ -234,17 +273,17 @@ public class RetryQueryRunnerTest public void testException() throws Exception { Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); RetryQueryRunner> runner = new RetryQueryRunner<>( new QueryRunner>() { @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).add( + ((List) responseContext.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( new Interval( 178888, @@ -260,12 +299,9 @@ public class RetryQueryRunnerTest ), new RetryQueryRunnerConfig() { - private int numTries = 1; - private boolean returnPartialResults = false; + public int getNumTries() { return 1; } - public int getNumTries() { return numTries; } - - public boolean returnPartialResults() { return returnPartialResults; } + public boolean returnPartialResults() { return false; } }, jsonMapper ); @@ -277,7 +313,37 @@ public class RetryQueryRunnerTest Assert.assertTrue( "Should have one entry in the list of missing segments", - ((List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY)).size() == 1 + ((List) context.get(Result.MISSING_SEGMENTS_KEY)).size() == 1 + ); + } + + @Test(expected = SegmentMissingException.class) + public void testIntervalMissingCausesException() throws Exception + { + Map context = new MapMaker().makeMap(); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + RetryQueryRunner> runner = new RetryQueryRunner<>( + new ReportTimelineMissingIntervalQueryRunner>(new Interval("2013/2014")), + (QueryToolChest) new TimeseriesQueryQueryToolChest( + new QueryConfig() + ), + new RetryQueryRunnerConfig() + { + public int getNumTries() { return 1; } + + public boolean returnPartialResults() { return false; } + }, + jsonMapper + ); + + Iterable> actualResults = Sequences.toList( + runner.run(query, context), + Lists.>newArrayList() + ); + + Assert.assertTrue( + "Should have one entry in the list of missing segments", + ((List) context.get(Result.MISSING_INTERVALS_KEY)).size() == 1 ); } } diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index 73f5c39dcb6..d183e7dd716 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -82,7 +82,7 @@ public class TimewarpOperatorTest @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { return Sequences.simple( @@ -144,7 +144,7 @@ public class TimewarpOperatorTest @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { return Sequences.simple( @@ -194,7 +194,7 @@ public class TimewarpOperatorTest @Override public Sequence> run( Query> query, - Map context + Map responseContext ) { return Sequences.simple( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index e4bca779dbc..24e586a5d20 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -556,7 +556,7 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map context + Query query, Map responseContext ) { // simulate two daily segments @@ -566,7 +566,7 @@ public class GroupByQueryRunnerTest final Query query2 = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); - return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); + return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); } } ); @@ -752,7 +752,7 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map context + Query query, Map responseContext ) { // simulate two daily segments @@ -762,7 +762,7 @@ public class GroupByQueryRunnerTest final Query query2 = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); - return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); + return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); } } ); @@ -1109,7 +1109,7 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map context + Query query, Map responseContext ) { // simulate two daily segments @@ -1119,7 +1119,7 @@ public class GroupByQueryRunnerTest final Query query2 = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); - return Sequences.concat(runner.run(query1, context), runner.run(query2, context)); + return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); } } ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 78c936e0a57..edd2a719381 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -95,7 +95,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest QueryRunner timeseriesRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map metadata) + public Sequence run(Query query, Map responseContext) { TimeseriesQuery tsQuery = (TimeseriesQuery) query; @@ -109,7 +109,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest .setAggregatorSpecs(tsQuery.getAggregatorSpecs()) .setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs()) .build(), - metadata + responseContext ), new Function>() { diff --git a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java index 01127e06876..afd9e87d307 100644 --- a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -69,7 +69,7 @@ public class SpecificSegmentQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return new Sequence() { @@ -112,7 +112,7 @@ public class SpecificSegmentQueryRunnerTest ); Sequences.toList(results, Lists.newArrayList()); - Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments instanceof List); @@ -149,7 +149,7 @@ public class SpecificSegmentQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return Sequences.withEffect( Sequences.simple(Arrays.asList(value)), @@ -196,7 +196,7 @@ public class SpecificSegmentQueryRunnerTest Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows")); - Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments instanceof List); diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index ba93287b828..156adc1b7aa 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -93,7 +93,7 @@ public class TimeBoundaryQueryRunnerTest .bound(TimeBoundaryQuery.MAX_TIME) .build(); Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); Iterable> results = Sequences.toList( runner.run(timeBoundaryQuery, context), Lists.>newArrayList() @@ -115,7 +115,7 @@ public class TimeBoundaryQueryRunnerTest .bound(TimeBoundaryQuery.MIN_TIME) .build(); Map context = new MapMaker().makeMap(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, Lists.newArrayList()); + context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); Iterable> results = Sequences.toList( runner.run(timeBoundaryQuery, context), Lists.>newArrayList() diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 57b06a31ff5..146a4b6dbe0 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -146,7 +146,8 @@ public class TimeSeriesUnionQueryRunnerTest { @Override public Sequence> run(Query> query, - Map context) + Map responseContext + ) { if (query.getDataSource().equals(new TableDataSource("ds1"))) { return Sequences.simple( diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 8a43cf6ddfb..45b0ca0c25d 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -117,7 +117,7 @@ public class CachingClusteredClient implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final Query query, final Map responseContext) { final QueryToolChest> toolChest = warehouse.getToolChest(query); final CacheStrategy> strategy = toolChest.getCacheStrategy(query); @@ -319,13 +319,13 @@ public class CachingClusteredClient implements QueryRunner List intervals = segmentSpec.getIntervals(); if (!server.isAssignable() || !populateCache || isBySegment) { - resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), context); + resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); } else { // this could be more efficient, since we only need to reorder results // for batches of segments with the same segment start time. resultSeqToAdd = toolChest.mergeSequencesUnordered( Sequences.map( - clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), context), + clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext), new Function>() { private final Function cacheFn = strategy.prepareForCache(); diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index f76dcbb9dd2..cb0616d98d3 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -73,7 +73,7 @@ public class CachingQueryRunner implements QueryRunner } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { final CacheStrategy strategy = toolChest.getCacheStrategy(query); @@ -143,7 +143,7 @@ public class CachingQueryRunner implements QueryRunner return Sequences.withEffect( Sequences.map( - base.run(query, context), + base.run(query, responseContext), new Function() { @Override @@ -165,7 +165,7 @@ public class CachingQueryRunner implements QueryRunner MoreExecutors.sameThreadExecutor() ); } else { - return base.run(query, context); + return base.run(query, responseContext); } } diff --git a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java index 4f260002ccb..ee17781d5ac 100644 --- a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java @@ -87,7 +87,7 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker return new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { try { Server instance = brokerSelector.pick(); diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index f3d22df8fe6..4a0d04bc44d 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,7 +21,6 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; @@ -48,6 +47,8 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; +import io.druid.query.ReportTimelineMissingIntervalQueryRunner; +import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.TableDataSource; import io.druid.query.spec.SpecificSegmentQueryRunner; @@ -261,62 +262,71 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner(); } - FunctionalIterable> adapters = FunctionalIterable + FunctionalIterable> queryRunners = FunctionalIterable .create(intervals) .transformCat( - new Function>>() + new Function>>() { @Override - public Iterable> apply(Interval input) + public Iterable> apply(final Interval interval) { - return timeline.lookup(input); - } - } - ) - .transformCat( - new Function, Iterable>>() - { - @Override - public Iterable> apply( - @Nullable final TimelineObjectHolder holder - ) - { - if (holder == null) { - return null; + Iterable> holders = timeline.lookup(interval); + + if (holders == null) { + return Arrays.>asList(new ReportTimelineMissingIntervalQueryRunner(interval)); } return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, QueryRunner>() + .create(holders) + .transformCat( + new Function, Iterable>>() { @Override - public QueryRunner apply(PartitionChunk input) + public Iterable> apply( + @Nullable final TimelineObjectHolder holder + ) { - return buildAndDecorateQueryRunner( - factory, - toolChest, - input.getObject(), - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - input.getChunkNumber() - ) + if (holder == null) { + return Arrays.>asList( + new ReportTimelineMissingIntervalQueryRunner( + interval + ) + ); + } - ); + return FunctionalIterable + .create(holder.getObject()) + .transform( + new Function, QueryRunner>() + { + @Override + public QueryRunner apply(PartitionChunk input) + { + return buildAndDecorateQueryRunner( + factory, + toolChest, + input.getObject(), + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + input.getChunkNumber() + ) + + ); + } + } + ); } } - ) - .filter(Predicates.>notNull()); + ); } } - ) - .filter( - Predicates.>notNull() ); - - return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); + return new FinalizeResultsQueryRunner( + toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), + toolChest + ); } private String getDataSourceName(DataSource dataSource) @@ -345,7 +355,7 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner(); } - FunctionalIterable> adapters = FunctionalIterable + FunctionalIterable> queryRunners = FunctionalIterable .create(specs) .transformCat( new Function>>() @@ -359,12 +369,12 @@ public class ServerManager implements QuerySegmentWalker ); if (entry == null) { - return null; + return Arrays.>asList(new ReportTimelineMissingSegmentQueryRunner(input)); } final PartitionChunk chunk = entry.getChunk(input.getPartitionNumber()); if (chunk == null) { - return null; + return Arrays.>asList(new ReportTimelineMissingSegmentQueryRunner(input)); } final ReferenceCountingSegment adapter = chunk.getObject(); @@ -373,12 +383,12 @@ public class ServerManager implements QuerySegmentWalker ); } } - ) - .filter( - Predicates.>notNull() ); - return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); + return new FinalizeResultsQueryRunner( + toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), + toolChest + ); } private QueryRunner buildAndDecorateQueryRunner( diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index a358ec44798..4975e0b5f01 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -124,7 +124,7 @@ public class CachingQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return resultSeq; } @@ -214,7 +214,7 @@ public class CachingQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { return Sequences.empty(); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 72a15d993ac..266cf88645e 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -685,9 +685,9 @@ public class ServerManagerTest } @Override - public Sequence run(Query query, Map context) + public Sequence run(Query query, Map responseContext) { - return new BlockingSequence(runner.run(query, context), waitLatch, waitYieldLatch, notifyLatch); + return new BlockingSequence(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch); } }