Merge pull request #201 from metamx/fix-rt

Fix issue with realtime nodes selecting intervals that are too large in the results
This commit is contained in:
cheddar 2013-07-25 12:46:09 -07:00
commit 73cc47d79c
2 changed files with 36 additions and 7 deletions

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -37,6 +38,8 @@ import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
@ -50,6 +53,7 @@ import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.partition.SingleElementPartitionChunk;
import com.metamx.druid.query.MetricsEmittingQueryRunner; import com.metamx.druid.query.MetricsEmittingQueryRunner;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactory;
@ -186,6 +190,9 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile ScheduledExecutorService scheduledExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null;
private final Map<Long, Sink> sinks = Maps.newConcurrentMap(); private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
@Override @Override
public void startJob() public void startJob()
@ -219,6 +226,7 @@ public class RealtimePlumberSchool implements PlumberSchool
try { try {
segmentAnnouncer.announceSegment(retVal.getSegment()); segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal); sinks.put(truncatedTime, retVal);
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));
} }
catch (IOException e) { catch (IOException e) {
log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource())
@ -247,17 +255,23 @@ public class RealtimePlumberSchool implements PlumberSchool
} }
}; };
List<TimelineObjectHolder<String, Sink>> querySinks = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
querySinks.addAll(sinkTimeline.lookup(interval));
}
return toolchest.mergeResults( return toolchest.mergeResults(
factory.mergeRunners( factory.mergeRunners(
EXEC, EXEC,
FunctionalIterable FunctionalIterable
.create(sinks.values()) .create(querySinks)
.transform( .transform(
new Function<Sink, QueryRunner<T>>() new Function<TimelineObjectHolder<String, Sink>, QueryRunner<T>>()
{ {
@Override @Override
public QueryRunner<T> apply(Sink input) public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
{ {
final Sink theSink = holder.getObject().getChunk(0).getObject();
return new SpecificSegmentQueryRunner<T>( return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>( new MetricsEmittingQueryRunner<T>(
emitter, emitter,
@ -265,7 +279,7 @@ public class RealtimePlumberSchool implements PlumberSchool
factory.mergeRunners( factory.mergeRunners(
EXEC, EXEC,
Iterables.transform( Iterables.transform(
input, theSink,
new Function<FireHydrant, QueryRunner<T>>() new Function<FireHydrant, QueryRunner<T>>()
{ {
@Override @Override
@ -279,9 +293,9 @@ public class RealtimePlumberSchool implements PlumberSchool
), ),
new SpecificSegmentSpec( new SpecificSegmentSpec(
new SegmentDescriptor( new SegmentDescriptor(
input.getInterval(), holder.getInterval(),
input.getSegment().getVersion(), theSink.getSegment().getVersion(),
input.getSegment().getShardSpec().getPartitionNum() theSink.getSegment().getShardSpec().getPartitionNum()
) )
) )
); );
@ -442,6 +456,11 @@ public class RealtimePlumberSchool implements PlumberSchool
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants); Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink); sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),
currSink.getVersion(),
new SingleElementPartitionChunk<Sink>(currSink)
);
segmentAnnouncer.announceSegment(currSink.getSegment()); segmentAnnouncer.announceSegment(currSink.getSegment());
} }
@ -490,6 +509,11 @@ public class RealtimePlumberSchool implements PlumberSchool
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier()); log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier());
sinks.remove(sinkKey); sinks.remove(sinkKey);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<Sink>(sink)
);
synchronized (handoffCondition) { synchronized (handoffCondition) {
handoffCondition.notifyAll(); handoffCondition.notifyAll();

View File

@ -90,6 +90,11 @@ public class Sink implements Iterable<FireHydrant>
makeNewCurrIndex(interval.getStartMillis(), schema); makeNewCurrIndex(interval.getStartMillis(), schema);
} }
public String getVersion()
{
return version;
}
public Interval getInterval() public Interval getInterval()
{ {
return interval; return interval;