Add retry capabilities for realtime logic

This commit is contained in:
fjy 2015-01-13 10:42:03 -08:00
parent 705e5ab0b1
commit 021a7de759
1 changed files with 25 additions and 5 deletions

View File

@ -12,6 +12,7 @@ import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
@ -30,6 +31,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
@ -243,7 +245,22 @@ public class RealtimePlumber implements Plumber
@Override
public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
{
if (holder == null) {
throw new ISE("No timeline entry at all!");
}
final Sink theSink = holder.getObject().getChunk(0).getObject();
if (theSink == null) {
throw new ISE("Missing sink for timeline entry[%s]!", holder);
}
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(),
theSink.getSegment().getVersion(),
theSink.getSegment().getShardSpec().getPartitionNum()
);
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
@ -257,6 +274,13 @@ public class RealtimePlumber implements Plumber
@Override
public QueryRunner<T> apply(FireHydrant input)
{
// It is possible that we got a query for a segment, and while that query
// is in the jetty queue, the segment is abandoned. Here, we need to retry
// the query for the segment.
if (input == null || input.getSegment() == null) {
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor);
}
// Prevent the underlying segment from closing when its being iterated
final Closeable closeable = input.getSegment().increment();
try {
@ -276,11 +300,7 @@ public class RealtimePlumber implements Plumber
)
).withWaitMeasuredFromNow(),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
theSink.getSegment().getVersion(),
theSink.getSegment().getShardSpec().getPartitionNum()
)
descriptor
)
);
}