From 35e080bbc14a0853f13dc3e29425c8719894d329 Mon Sep 17 00:00:00 2001 From: jisookim0513 Date: Thu, 19 Jun 2014 14:14:54 -0700 Subject: [PATCH] fix concurrency issue with the map; introduce new exception; add incomplete retry query runner test --- .../java/io/druid/query/RetryQueryRunner.java | 22 +++- .../druid/query/RetryQueryRunnerConfig.java | 2 + .../spec/SpecificSegmentQueryRunner.java | 4 +- .../timeseries/TimeseriesQueryEngine.java | 4 +- .../io/druid/query/topn/TopNQueryEngine.java | 4 +- ...tion.java => SegmentMissingException.java} | 6 +- .../io/druid/query/RetryQueryRunnerTest.java | 116 ++++++++++++++++++ .../java/io/druid/server/QueryResource.java | 9 +- 8 files changed, 149 insertions(+), 18 deletions(-) rename processing/src/main/java/io/druid/segment/{NullStorageAdapterException.java => SegmentMissingException.java} (85%) create mode 100644 processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 34781cfbb07..0c60d630433 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -21,8 +21,8 @@ package io.druid.query; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import io.druid.query.spec.QuerySegmentSpec; -import io.druid.query.spec.SpecificSegmentSpec; +import io.druid.query.spec.MultipleSpecificSegmentSpec; +import io.druid.segment.SegmentMissingException; import java.util.Arrays; import java.util.List; @@ -30,6 +30,7 @@ import java.util.Map; public class RetryQueryRunner implements QueryRunner { + public static String missingSegments = "missingSegments"; private final QueryRunner baseRunner; private final QueryToolChest> toolChest; private final RetryQueryRunnerConfig config; @@ -46,23 +47,32 @@ public class RetryQueryRunner implements QueryRunner { Sequence returningSeq = baseRunner.run(query, context); - for (int i = config.numTries(); i > 0; i--) { - for (int j = ((List)context.get("missingSegments")).size(); j > 0; j--) { - QuerySegmentSpec segmentSpec = new SpecificSegmentSpec((SegmentDescriptor)((List) context.get("missingSegments")).remove(0)); + + for (int i = config.numTries(); i > 0 && !((List)context.get(missingSegments)).isEmpty(); i--) { + List segList= (List)context.get(missingSegments); + ((List)context.get(missingSegments)).clear(); returningSeq = toolChest.mergeSequences( Sequences.simple( Arrays.asList( returningSeq, baseRunner.run( - query.withQuerySegmentSpec(segmentSpec), + query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(segList)), context ) ) ) ); + } + + if (!config.returnPartialResults() && !((List)context.get(missingSegments)).isEmpty()) { + String failedSegments = ""; + for (SegmentDescriptor segment : (List) context.get("missingSegments")) { + failedSegments = failedSegments + segment.toString() + " "; } + throw new SegmentMissingException("The following segments are missing: " + failedSegments); } return returningSeq; } } + diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java b/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java index 5b5ed2639b5..5759b2794bb 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunnerConfig.java @@ -25,6 +25,8 @@ public class RetryQueryRunnerConfig { @JsonProperty private int numTries = 1; + private boolean returnPartialResults = false; public int numTries() { return numTries; } + public boolean returnPartialResults() { return returnPartialResults; } } diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index 80eaa28fa33..9a457b96e04 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -27,7 +27,7 @@ import com.metamx.common.guava.Yielder; import com.metamx.common.guava.YieldingAccumulator; import io.druid.query.Query; import io.druid.query.QueryRunner; -import io.druid.segment.NullStorageAdapterException; +import io.druid.segment.SegmentMissingException; import java.io.IOException; import java.util.List; @@ -67,7 +67,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner Sequence returningSeq; try { returningSeq = base.run(query, context); - } catch (NullStorageAdapterException e) { + } catch (SegmentMissingException e) { ((List)context.get("missingSegments")).add(((SpecificSegmentSpec) specificSpec).getDescriptor()); returningSeq = Sequences.empty(); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index ee239c2c572..bcc3e13512f 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -27,7 +27,7 @@ import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.segment.Cursor; -import io.druid.segment.NullStorageAdapterException; +import io.druid.segment.SegmentMissingException; import io.druid.segment.StorageAdapter; import io.druid.segment.filter.Filters; @@ -40,7 +40,7 @@ public class TimeseriesQueryEngine public Sequence> process(final TimeseriesQuery query, final StorageAdapter adapter) { if (adapter == null) { - throw new NullStorageAdapterException( + throw new SegmentMissingException( "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." ); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java index 1faf1fb7699..65f2be86580 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java @@ -32,7 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.filter.Filter; import io.druid.segment.Capabilities; import io.druid.segment.Cursor; -import io.druid.segment.NullStorageAdapterException; +import io.druid.segment.SegmentMissingException; import io.druid.segment.StorageAdapter; import io.druid.segment.filter.Filters; import org.joda.time.Interval; @@ -56,7 +56,7 @@ public class TopNQueryEngine public Sequence> query(final TopNQuery query, final StorageAdapter adapter) { if (adapter == null) { - throw new NullStorageAdapterException( + throw new SegmentMissingException( "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." ); } diff --git a/processing/src/main/java/io/druid/segment/NullStorageAdapterException.java b/processing/src/main/java/io/druid/segment/SegmentMissingException.java similarity index 85% rename from processing/src/main/java/io/druid/segment/NullStorageAdapterException.java rename to processing/src/main/java/io/druid/segment/SegmentMissingException.java index 8d2b967afff..aade5e560ca 100644 --- a/processing/src/main/java/io/druid/segment/NullStorageAdapterException.java +++ b/processing/src/main/java/io/druid/segment/SegmentMissingException.java @@ -19,9 +19,11 @@ package io.druid.segment; -public class NullStorageAdapterException extends IllegalStateException +import com.metamx.common.ISE; + +public class SegmentMissingException extends ISE { - public NullStorageAdapterException(String formatText, Object... arguments) { + public SegmentMissingException(String formatText, Object... arguments) { super(String.format(formatText, arguments)); } } diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java new file mode 100644 index 00000000000..45da8bcad0e --- /dev/null +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -0,0 +1,116 @@ +package io.druid.query; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Ordering; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.collections.OrderedMergeSequence; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.MetricManipulationFn; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesResultValue; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class RetryQueryRunnerTest +{ + + final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ), + QueryRunnerTestHelper.qualityUniques + ) + ) + .build(); + + + @Test + public void testRunWithMissingSegments() throws Exception + { + Map context = new MapMaker().makeMap(); + context.put("missingSegments", Lists.newArrayList()); + RetryQueryRunner runner = new RetryQueryRunner( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map context) + { + ((List)context.get(RetryQueryRunner.missingSegments)).add(new SegmentDescriptor(new Interval(178888, 1999999), "test", 1)); + return Sequences.empty(); + } + }, + new QueryToolChest() + { + @Override + public QueryRunner mergeResults(QueryRunner runner) + { + return null; + } + + @Override + public Sequence mergeSequences(Sequence seqOfSequences) + { + return new OrderedMergeSequence>(getOrdering(), seqOfSequences); + } + + @Override + public ServiceMetricEvent.Builder makeMetricBuilder(Query query) + { + return null; + } + + @Override + public Function makePreComputeManipulatorFn( + Query query, MetricManipulationFn fn + ) + { + return null; + } + + @Override + public TypeReference getResultTypeReference() + { + return null; + } + + public Ordering> getOrdering() + { + return Ordering.natural(); + } + }, + new RetryQueryRunnerConfig() + { + private int numTries = 1; + private boolean returnPartialResults = true; + + public int numTries() { return numTries; } + public boolean returnPartialResults() { return returnPartialResults; } + } + ); + + Iterable> actualResults = Sequences.toList( + runner.run(query, context), + Lists.>newArrayList() + ); + + Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); + } +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index ebc33c9670f..353e7014e06 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -22,9 +22,11 @@ package io.druid.server; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.api.client.util.Lists; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.MapMaker; import com.google.common.io.ByteStreams; import com.google.inject.Inject; import com.metamx.common.guava.Sequence; @@ -57,9 +59,8 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.OutputStream; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -142,8 +143,8 @@ public class QueryResource log.debug("Got query [%s]", query); } - HashMap context = new HashMap(); - context.put("missingSegments", new LinkedList()); + Map context = new MapMaker().makeMap(); + context.put("missingSegments", Lists.newArrayList()); Sequence results = query.run(texasRanger, context); if (results == null) {