diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index 763f1c186e6..1683b3de911 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Yielder; +import io.druid.java.util.common.guava.Yielders; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -87,12 +88,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner return baseSequence.accumulate(initValue, accumulator); } catch (SegmentMissingException e) { - List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); - if (missingSegments == null) { - missingSegments = Lists.newArrayList(); - responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments); - } - missingSegments.add(specificSpec.getDescriptor()); + appendMissingSegment(responseContext); return initValue; } } @@ -112,7 +108,13 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public Yielder call() throws Exception { - return makeYielder(baseSequence.toYielder(initValue, accumulator)); + try { + return makeYielder(baseSequence.toYielder(initValue, accumulator)); + } + catch (SegmentMissingException e) { + appendMissingSegment(responseContext); + return Yielders.done(initValue, null); + } } } ); @@ -164,6 +166,16 @@ public class SpecificSegmentQueryRunner implements QueryRunner }; } + private void appendMissingSegment(Map responseContext) + { + List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); + if (missingSegments == null) { + missingSegments = Lists.newArrayList(); + responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments); + } + missingSegments.add(specificSpec.getDescriptor()); + } + private RetType doNamed(Thread currThread, String currName, String newName, Callable toRun) { try { diff --git a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java index 97f2b24f390..72c25ec9ee8 100644 --- a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -83,7 +83,7 @@ public class SpecificSegmentQueryRunnerTest Object initValue, YieldingAccumulator accumulator ) { - return null; + throw new SegmentMissingException("FAILSAUCE"); } }; @@ -94,7 +94,8 @@ public class SpecificSegmentQueryRunnerTest ) ); - final Map responseContext = Maps.newHashMap(); + // from accumulate + Map responseContext = Maps.newHashMap(); TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource("foo") .granularity(QueryGranularities.ALL) @@ -105,24 +106,26 @@ public class SpecificSegmentQueryRunnerTest ) ) .build(); - Sequence results = queryRunner.run( - query, - responseContext - ); + Sequence results = queryRunner.run(query, responseContext); Sequences.toList(results, Lists.newArrayList()); + validate(mapper, descriptor, responseContext); - Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); - - Assert.assertTrue(missingSegments != null); - Assert.assertTrue(missingSegments instanceof List); - - Object segmentDesc = ((List) missingSegments).get(0); - - Assert.assertTrue(segmentDesc instanceof SegmentDescriptor); - - SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class); - - Assert.assertEquals(descriptor, newDesc); + // from toYielder + responseContext = Maps.newHashMap(); + results = queryRunner.run(query, responseContext); + results.toYielder( + null, new YieldingAccumulator() + { + final List lists = Lists.newArrayList(); + @Override + public Object accumulate(Object accumulated, Object in) + { + lists.add(in); + return in; + } + } + ); + validate(mapper, descriptor, responseContext); } @SuppressWarnings("unchecked") @@ -195,6 +198,12 @@ public class SpecificSegmentQueryRunnerTest Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows")); + validate(mapper, descriptor, responseContext); + } + + private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, Map responseContext) + throws java.io.IOException + { Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); Assert.assertTrue(missingSegments != null);