SpecificSegmentQueryRunner misses missing segments from toYielder() (#3617)

This commit is contained in:
Navis Ryu 2016-10-31 03:47:29 +09:00 committed by Fangjin Yang
parent 23a8e22836
commit 3fca3be9ea
2 changed files with 46 additions and 25 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -87,12 +88,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
return baseSequence.accumulate(initValue, accumulator); return baseSequence.accumulate(initValue, accumulator);
} }
catch (SegmentMissingException e) { catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY); appendMissingSegment(responseContext);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
return initValue; return initValue;
} }
} }
@ -112,7 +108,13 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Yielder<OutType> call() throws Exception public Yielder<OutType> call() throws Exception
{ {
return makeYielder(baseSequence.toYielder(initValue, accumulator)); try {
return makeYielder(baseSequence.toYielder(initValue, accumulator));
}
catch (SegmentMissingException e) {
appendMissingSegment(responseContext);
return Yielders.done(initValue, null);
}
} }
} }
); );
@ -164,6 +166,16 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
}; };
} }
private void appendMissingSegment(Map<String, Object> responseContext)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
}
private <RetType> RetType doNamed(Thread currThread, String currName, String newName, Callable<RetType> toRun) private <RetType> RetType doNamed(Thread currThread, String currName, String newName, Callable<RetType> toRun)
{ {
try { try {

View File

@ -83,7 +83,7 @@ public class SpecificSegmentQueryRunnerTest
Object initValue, YieldingAccumulator accumulator Object initValue, YieldingAccumulator accumulator
) )
{ {
return null; throw new SegmentMissingException("FAILSAUCE");
} }
}; };
@ -94,7 +94,8 @@ public class SpecificSegmentQueryRunnerTest
) )
); );
final Map<String, Object> responseContext = Maps.newHashMap(); // from accumulate
Map<String, Object> responseContext = Maps.newHashMap();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("foo") .dataSource("foo")
.granularity(QueryGranularities.ALL) .granularity(QueryGranularities.ALL)
@ -105,24 +106,26 @@ public class SpecificSegmentQueryRunnerTest
) )
) )
.build(); .build();
Sequence results = queryRunner.run( Sequence results = queryRunner.run(query, responseContext);
query,
responseContext
);
Sequences.toList(results, Lists.newArrayList()); Sequences.toList(results, Lists.newArrayList());
validate(mapper, descriptor, responseContext);
Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); // from toYielder
responseContext = Maps.newHashMap();
Assert.assertTrue(missingSegments != null); results = queryRunner.run(query, responseContext);
Assert.assertTrue(missingSegments instanceof List); results.toYielder(
null, new YieldingAccumulator()
Object segmentDesc = ((List) missingSegments).get(0); {
final List lists = Lists.newArrayList();
Assert.assertTrue(segmentDesc instanceof SegmentDescriptor); @Override
public Object accumulate(Object accumulated, Object in)
SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class); {
lists.add(in);
Assert.assertEquals(descriptor, newDesc); return in;
}
}
);
validate(mapper, descriptor, responseContext);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -195,6 +198,12 @@ public class SpecificSegmentQueryRunnerTest
Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows")); Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));
validate(mapper, descriptor, responseContext);
}
private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, Map<String, Object> responseContext)
throws java.io.IOException
{
Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY); Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);
Assert.assertTrue(missingSegments != null); Assert.assertTrue(missingSegments != null);