From a909a31bcc4bc07a511c42b58373a5e4475bd21f Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 24 Jul 2013 19:25:32 -0700 Subject: [PATCH] better select sinks --- .../plumber/RealtimePlumberSchool.java | 38 +++++++++++++++---- .../metamx/druid/realtime/plumber/Sink.java | 5 +++ 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index a429fbef9d5..0bd01226828 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -37,6 +38,8 @@ import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.Query; +import com.metamx.druid.TimelineObjectHolder; +import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.ServerView; @@ -50,6 +53,7 @@ import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.partition.SingleElementPartitionChunk; import com.metamx.druid.query.MetricsEmittingQueryRunner; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; @@ -186,6 +190,9 @@ public class RealtimePlumberSchool implements PlumberSchool private volatile ScheduledExecutorService scheduledExecutor = null; private final Map sinks = Maps.newConcurrentMap(); + private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( + String.CASE_INSENSITIVE_ORDER + ); @Override public void startJob() @@ -219,6 +226,7 @@ public class RealtimePlumberSchool implements PlumberSchool try { segmentAnnouncer.announceSegment(retVal.getSegment()); sinks.put(truncatedTime, retVal); + sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk(retVal)); } catch (IOException e) { log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) @@ -247,17 +255,23 @@ public class RealtimePlumberSchool implements PlumberSchool } }; + List> querySinks = Lists.newArrayList(); + for (Interval interval : query.getIntervals()) { + querySinks.addAll(sinkTimeline.lookup(interval)); + } + return toolchest.mergeResults( factory.mergeRunners( EXEC, FunctionalIterable - .create(sinks.values()) + .create(querySinks) .transform( - new Function>() + new Function, QueryRunner>() { @Override - public QueryRunner apply(Sink input) + public QueryRunner apply(TimelineObjectHolder holder) { + final Sink theSink = holder.getObject().getChunk(0).getObject(); return new SpecificSegmentQueryRunner( new MetricsEmittingQueryRunner( emitter, @@ -265,7 +279,7 @@ public class RealtimePlumberSchool implements PlumberSchool factory.mergeRunners( EXEC, Iterables.transform( - input, + theSink, new Function>() { @Override @@ -279,9 +293,9 @@ public class RealtimePlumberSchool implements PlumberSchool ), new SpecificSegmentSpec( new SegmentDescriptor( - input.getInterval(), - input.getSegment().getVersion(), - input.getSegment().getShardSpec().getPartitionNum() + holder.getInterval(), + theSink.getSegment().getVersion(), + theSink.getSegment().getShardSpec().getPartitionNum() ) ) ); @@ -442,6 +456,11 @@ public class RealtimePlumberSchool implements PlumberSchool Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants); sinks.put(sinkInterval.getStartMillis(), currSink); + sinkTimeline.add( + currSink.getInterval(), + currSink.getVersion(), + new SingleElementPartitionChunk(currSink) + ); segmentAnnouncer.announceSegment(currSink.getSegment()); } @@ -490,6 +509,11 @@ public class RealtimePlumberSchool implements PlumberSchool FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier()); sinks.remove(sinkKey); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk(sink) + ); synchronized (handoffCondition) { handoffCondition.notifyAll(); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java index a5dd4ae38a1..a1823b6c09a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java @@ -90,6 +90,11 @@ public class Sink implements Iterable makeNewCurrIndex(interval.getStartMillis(), schema); } + public String getVersion() + { + return version; + } + public Interval getInterval() { return interval;