diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 9f3921bd87d..3597b9b11ef 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -22,11 +22,12 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import com.google.common.collect.MapMaker; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingSequenceBase; +import com.metamx.emitter.EmittingLogger; import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.segment.SegmentMissingException; @@ -36,18 +37,22 @@ import java.util.Map; public class RetryQueryRunner implements QueryRunner { public static String MISSING_SEGMENTS_KEY = "missingSegments"; + private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class); private final QueryRunner baseRunner; + private final QueryToolChest> toolChest; private final RetryQueryRunnerConfig config; private final ObjectMapper jsonMapper; public RetryQueryRunner( QueryRunner baseRunner, + QueryToolChest> toolChest, RetryQueryRunnerConfig config, ObjectMapper jsonMapper ) { this.baseRunner = baseRunner; + this.toolChest = toolChest; this.config = config; this.jsonMapper = jsonMapper; } @@ -55,7 +60,8 @@ public class RetryQueryRunner implements QueryRunner @Override public Sequence run(final Query query, final Map context) { - final Sequence returningSeq = baseRunner.run(query, context); + final List> listOfSequences = Lists.newArrayList(); + listOfSequences.add(baseRunner.run(query, context)); return new YieldingSequenceBase() { @@ -64,33 +70,32 @@ public class RetryQueryRunner implements QueryRunner OutType initValue, YieldingAccumulator accumulator ) { - Yielder yielder = returningSeq.toYielder(initValue, accumulator); - final List missingSegments = getMissingSegments(context); - if (missingSegments.isEmpty()) { - return yielder; - } + if (!missingSegments.isEmpty()) { + for (int i = 0; i < config.numTries(); i++) { + log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); - for (int i = 0; i < config.numTries(); i++) { - context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); - final Query retryQuery = query.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - missingSegments - ) - ); - yielder = baseRunner.run(retryQuery, context).toYielder(initValue, accumulator); - if (getMissingSegments(context).isEmpty()) { - break; + context.put(MISSING_SEGMENTS_KEY, Lists.newArrayList()); + final Query retryQuery = query.withQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + missingSegments + ) + ); + Sequence retrySequence = baseRunner.run(retryQuery, context); + listOfSequences.add(retrySequence); + if (getMissingSegments(context).isEmpty()) { + break; + } + } + + final List finalMissingSegs = getMissingSegments(context); + if (!finalMissingSegs.isEmpty()) { + throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); } } - final List finalMissingSegs = getMissingSegments(context); - if (!config.returnPartialResults() && !finalMissingSegs.isEmpty()) { - throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); - } - - return yielder; + return toolChest.mergeSequencesUnordered(Sequences.simple(listOfSequences)).toYielder(initValue, accumulator); } }; } diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java b/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java index 2b8bb730b68..86f26c83be2 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java @@ -24,10 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class RetryQueryRunnerConfig { @JsonProperty - private int numTries = 0; - @JsonProperty - private boolean returnPartialResults = false; + private int numTries = 1; - public int numTries() { return numTries; } - public boolean returnPartialResults() { return returnPartialResults; } + public int numTries() + { + return numTries; + } } diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index b8b25922e26..b8b68f86895 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -23,7 +23,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import io.druid.query.Query; @@ -68,20 +67,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public Sequence call() throws Exception { - Sequence returningSeq; - try { - returningSeq = base.run(query, context); - } - catch (SegmentMissingException e) { - List missingSegments = (List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); - if (missingSegments == null) { - missingSegments = Lists.newArrayList(); - context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments); - } - missingSegments.add(specificSpec.getDescriptor()); - returningSeq = Sequences.empty(); - } - return returningSeq; + return base.run(query, context); } } ); @@ -97,7 +83,18 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public OutType call() throws Exception { - return baseSequence.accumulate(initValue, accumulator); + try { + return baseSequence.accumulate(initValue, accumulator); + } + catch (SegmentMissingException e) { + List missingSegments = (List) context.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + if (missingSegments == null) { + missingSegments = Lists.newArrayList(); + context.put(RetryQueryRunner.MISSING_SEGMENTS_KEY, missingSegments); + } + missingSegments.add(specificSpec.getDescriptor()); + return initValue; + } } } ); diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index 9489e400a6e..98882a75a9d 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -36,8 +36,6 @@ import java.util.List; */ public class TimeseriesQueryEngine { - private static final int AGG_UNROLL_COUNT = 8; - public Sequence> process(final TimeseriesQuery query, final StorageAdapter adapter) { if (adapter == null) { @@ -61,7 +59,7 @@ public class TimeseriesQueryEngine Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs); try { while (!cursor.isDone()) { - for(Aggregator aggregator : aggregators) { + for (Aggregator aggregator : aggregators) { aggregator.aggregate(); } cursor.advance(); diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index 3adfed65c06..18abb62fb02 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -9,6 +9,7 @@ import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.SegmentMissingException; import org.joda.time.DateTime; @@ -63,6 +64,9 @@ public class RetryQueryRunnerTest return Sequences.empty(); } }, + (QueryToolChest) new TimeseriesQueryQueryToolChest( + new QueryConfig() + ), new RetryQueryRunnerConfig() { private int numTries = 0; @@ -128,6 +132,9 @@ public class RetryQueryRunnerTest } } }, + (QueryToolChest) new TimeseriesQueryQueryToolChest( + new QueryConfig() + ), new RetryQueryRunnerConfig() { private int numTries = 1; @@ -192,6 +199,9 @@ public class RetryQueryRunnerTest } } }, + (QueryToolChest) new TimeseriesQueryQueryToolChest( + new QueryConfig() + ), new RetryQueryRunnerConfig() { private int numTries = 4; @@ -241,6 +251,9 @@ public class RetryQueryRunnerTest return Sequences.empty(); } }, + (QueryToolChest) new TimeseriesQueryQueryToolChest( + new QueryConfig() + ), new RetryQueryRunnerConfig() { private int numTries = 1; diff --git a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java new file mode 100644 index 00000000000..01127e06876 --- /dev/null +++ b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -0,0 +1,212 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.spec; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.Result; +import io.druid.query.RetryQueryRunner; +import io.druid.query.SegmentDescriptor; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregator; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesResultBuilder; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.SegmentMissingException; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class SpecificSegmentQueryRunnerTest +{ + @Test + public void testRetry() throws Exception + { + final ObjectMapper mapper = new DefaultObjectMapper(); + SegmentDescriptor descriptor = new SegmentDescriptor( + new Interval("2012-01-01T00:00:00Z/P1D"), + "version", + 0 + ); + + final SpecificSegmentQueryRunner queryRunner = new SpecificSegmentQueryRunner( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map context) + { + return new Sequence() + { + @Override + public Object accumulate(Object initValue, Accumulator accumulator) + { + throw new SegmentMissingException("FAILSAUCE"); + } + + @Override + public Yielder toYielder( + Object initValue, YieldingAccumulator accumulator + ) + { + return null; + } + }; + + } + }, + new SpecificSegmentSpec( + descriptor + ) + ); + + final Map responseContext = Maps.newHashMap(); + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .granularity(QueryGranularity.ALL) + .intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D"))) + .aggregators( + ImmutableList.of( + new CountAggregatorFactory("rows") + ) + ) + .build(); + Sequence results = queryRunner.run( + query, + responseContext + ); + Sequences.toList(results, Lists.newArrayList()); + + Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + + Assert.assertTrue(missingSegments != null); + Assert.assertTrue(missingSegments instanceof List); + + Object segmentDesc = ((List) missingSegments).get(0); + + Assert.assertTrue(segmentDesc instanceof SegmentDescriptor); + + SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class); + + Assert.assertEquals(descriptor, newDesc); + } + + @SuppressWarnings("unchecked") + @Test + public void testRetry2() throws Exception + { + final ObjectMapper mapper = new DefaultObjectMapper(); + SegmentDescriptor descriptor = new SegmentDescriptor( + new Interval("2012-01-01T00:00:00Z/P1D"), + "version", + 0 + ); + + TimeseriesResultBuilder builder = new TimeseriesResultBuilder( + new DateTime("2012-01-01T00:00:00Z") + ); + CountAggregator rows = new CountAggregator("rows"); + rows.aggregate(); + builder.addMetric(rows); + final Result value = builder.build(); + + final SpecificSegmentQueryRunner queryRunner = new SpecificSegmentQueryRunner( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map context) + { + return Sequences.withEffect( + Sequences.simple(Arrays.asList(value)), + new Runnable() + { + @Override + public void run() + { + throw new SegmentMissingException("FAILSAUCE"); + } + }, + MoreExecutors.sameThreadExecutor() + ); + } + }, + new SpecificSegmentSpec( + descriptor + ) + ); + + final Map responseContext = Maps.newHashMap(); + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .granularity(QueryGranularity.ALL) + .intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D"))) + .aggregators( + ImmutableList.of( + new CountAggregatorFactory("rows") + ) + ) + .build(); + Sequence results = queryRunner.run( + query, + responseContext + ); + List> res = Sequences.toList( + results, + Lists.>newArrayList() + ); + + Assert.assertEquals(1, res.size()); + + Result theVal = res.get(0); + + Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows")); + + Object missingSegments = responseContext.get(RetryQueryRunner.MISSING_SEGMENTS_KEY); + + Assert.assertTrue(missingSegments != null); + Assert.assertTrue(missingSegments instanceof List); + + Object segmentDesc = ((List) missingSegments).get(0); + + Assert.assertTrue(segmentDesc instanceof SegmentDescriptor); + + SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class); + + Assert.assertEquals(descriptor, newDesc); + } +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index f84607e659b..8eb670922ca 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -101,6 +101,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker toolChest.preMergeQueryDecoration( new RetryQueryRunner( baseClient, + toolChest, retryConfig, objectMapper ) diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index b66b080f6b9..be09886f4a9 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -153,8 +153,8 @@ public class QueryResource log.debug("Got query [%s]", query); } - final Map context = new MapMaker().makeMap(); - final Sequence res = query.run(texasRanger, context); + final Map responseContext = new MapMaker().makeMap(); + final Sequence res = query.run(texasRanger, responseContext); final Sequence results; if (res == null) { results = Sequences.empty(); @@ -212,7 +212,7 @@ public class QueryResource isSmile ? APPLICATION_JSON : APPLICATION_SMILE ) .header("X-Druid-Query-Id", queryId) - .header("X-Druid-Response-Context", jsonMapper.writeValueAsString(context)) + .header("X-Druid-Response-Context", jsonMapper.writeValueAsString(responseContext)) .build(); } catch (Exception e) {