diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 1d163dce09d..4f998f7185f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -12,6 +12,7 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.Granularity; +import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.FunctionalIterable; @@ -30,6 +31,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryToolChest; +import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; @@ -243,7 +245,22 @@ public class RealtimePlumber implements Plumber @Override public QueryRunner apply(TimelineObjectHolder holder) { + if (holder == null) { + throw new ISE("No timeline entry at all!"); + } + final Sink theSink = holder.getObject().getChunk(0).getObject(); + + if (theSink == null) { + throw new ISE("Missing sink for timeline entry[%s]!", holder); + } + + final SegmentDescriptor descriptor = new SegmentDescriptor( + holder.getInterval(), + theSink.getSegment().getVersion(), + theSink.getSegment().getShardSpec().getPartitionNum() + ); + return new SpecificSegmentQueryRunner( new MetricsEmittingQueryRunner( emitter, @@ -257,6 +274,13 @@ public class RealtimePlumber implements Plumber @Override public QueryRunner apply(FireHydrant input) { + // It is possible that we got a query for a segment, and while that query + // is in the jetty queue, the segment is abandoned. Here, we need to retry + // the query for the segment. + if (input == null || input.getSegment() == null) { + return new ReportTimelineMissingSegmentQueryRunner(descriptor); + } + // Prevent the underlying segment from closing when its being iterated final Closeable closeable = input.getSegment().increment(); try { @@ -276,11 +300,7 @@ public class RealtimePlumber implements Plumber ) ).withWaitMeasuredFromNow(), new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - theSink.getSegment().getVersion(), - theSink.getSegment().getShardSpec().getPartitionNum() - ) + descriptor ) ); }