diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 6f1080b8ea3..64ec2b247c6 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; @@ -51,9 +52,12 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumbers; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,7 +73,7 @@ public class RealtimeManager implements QuerySegmentWalker /** * key=data source name,value=FireChiefs of all partition of that data source */ - private final Map> chiefs; + private final Map> chiefs; @Inject public RealtimeManager( @@ -90,12 +94,12 @@ public class RealtimeManager implements QuerySegmentWalker DataSchema schema = fireDepartment.getDataSchema(); final FireChief chief = new FireChief(fireDepartment); - List chiefs = this.chiefs.get(schema.getDataSource()); + Map chiefs = this.chiefs.get(schema.getDataSource()); if (chiefs == null) { - chiefs = new ArrayList(); + chiefs = new HashMap(); this.chiefs.put(schema.getDataSource(), chiefs); } - chiefs.add(chief); + chiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); chief.setName( String.format( @@ -112,8 +116,8 @@ public class RealtimeManager implements QuerySegmentWalker @LifecycleStop public void stop() { - for (Iterable chiefs : this.chiefs.values()) { - for (FireChief chief : chiefs) { + for (Map chiefs : this.chiefs.values()) { + for (FireChief chief : chiefs.values()) { CloseQuietly.close(chief); } } @@ -121,12 +125,12 @@ public class RealtimeManager implements QuerySegmentWalker public FireDepartmentMetrics getMetrics(String datasource) { - List chiefs = this.chiefs.get(datasource); + Map chiefs = this.chiefs.get(datasource); if (chiefs == null) { return null; } FireDepartmentMetrics snapshot = null; - for (FireChief chief : chiefs) { + for (FireChief chief : chiefs.values()) { if (snapshot == null) { snapshot = chief.getMetrics().snapshot(); } else { @@ -139,30 +143,71 @@ public class RealtimeManager implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForIntervals(final Query query, Iterable intervals) { - return getQueryRunnerForSegments(query, null); + final QueryRunnerFactory> factory = conglomerate.findFactory(query); + final Iterable runners; + final List names = query.getDataSource().getNames(); + runners = Iterables.transform( + names, new Function() + { + @Override + public QueryRunner apply(String input) + { + Map chiefsOfDataSource = chiefs.get(input); + return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + Iterables.transform( + chiefsOfDataSource.values(), new Function>() + { + @Override + public QueryRunner apply(FireChief input) + { + return input.getQueryRunner(query); + } + } + ) + ) + ); + } + } + ); + return new UnionQueryRunner<>( + runners, conglomerate.findFactory(query).getToolchest() + ); } @Override - public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) + public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); + List runners = new ArrayList(); + for (String dataSource : query.getDataSource().getNames()) { + final Map dataSourceChiefs = RealtimeManager.this.chiefs.get(dataSource); + if (dataSourceChiefs == null) { + continue; + } - Iterable chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames())); - return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - chiefsOfDataSource, new Function>() - { - @Override - public QueryRunner apply(FireChief input) - { - return input.getQueryRunner(query); - } - } - ) - ) + QueryToolChest> toolchest = factory.getToolchest(); + Iterable> subRunners = Iterables.transform( + specs, + new Function>() + { + @Nullable + @Override + public QueryRunner apply(SegmentDescriptor spec) + { + FireChief retVal = dataSourceChiefs.get(spec.getPartitionNumber()); + return retVal == null ? new NoopQueryRunner() : retVal.getQueryRunner(query); + } + } + ); + runners.add( + toolchest.mergeResults(factory.mergeRunners(MoreExecutors.sameThreadExecutor(), subRunners)) + ); + } + return new UnionQueryRunner<>( + runners, conglomerate.findFactory(query).getToolchest() ); }