Allow realtime nodes to have multiple shards of the same datasource

This commit is contained in:
Eric Tschetter 2015-07-01 08:36:09 -07:00 committed by Bingkun Guo
parent 14a306c553
commit 68631d89e9
1 changed files with 71 additions and 26 deletions

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -51,9 +52,12 @@ import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Plumbers; import io.druid.segment.realtime.plumber.Plumbers;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -69,7 +73,7 @@ public class RealtimeManager implements QuerySegmentWalker
/** /**
* key=data source name,value=FireChiefs of all partition of that data source * key=data source name,value=FireChiefs of all partition of that data source
*/ */
private final Map<String, List<FireChief>> chiefs; private final Map<String, Map<Integer, FireChief>> chiefs;
@Inject @Inject
public RealtimeManager( public RealtimeManager(
@ -90,12 +94,12 @@ public class RealtimeManager implements QuerySegmentWalker
DataSchema schema = fireDepartment.getDataSchema(); DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment); final FireChief chief = new FireChief(fireDepartment);
List<FireChief> chiefs = this.chiefs.get(schema.getDataSource()); Map<Integer, FireChief> chiefs = this.chiefs.get(schema.getDataSource());
if (chiefs == null) { if (chiefs == null) {
chiefs = new ArrayList<FireChief>(); chiefs = new HashMap<Integer, FireChief>();
this.chiefs.put(schema.getDataSource(), chiefs); this.chiefs.put(schema.getDataSource(), chiefs);
} }
chiefs.add(chief); chiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
chief.setName( chief.setName(
String.format( String.format(
@ -112,8 +116,8 @@ public class RealtimeManager implements QuerySegmentWalker
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
for (Iterable<FireChief> chiefs : this.chiefs.values()) { for (Map<Integer, FireChief> chiefs : this.chiefs.values()) {
for (FireChief chief : chiefs) { for (FireChief chief : chiefs.values()) {
CloseQuietly.close(chief); CloseQuietly.close(chief);
} }
} }
@ -121,12 +125,12 @@ public class RealtimeManager implements QuerySegmentWalker
public FireDepartmentMetrics getMetrics(String datasource) public FireDepartmentMetrics getMetrics(String datasource)
{ {
List<FireChief> chiefs = this.chiefs.get(datasource); Map<Integer, FireChief> chiefs = this.chiefs.get(datasource);
if (chiefs == null) { if (chiefs == null) {
return null; return null;
} }
FireDepartmentMetrics snapshot = null; FireDepartmentMetrics snapshot = null;
for (FireChief chief : chiefs) { for (FireChief chief : chiefs.values()) {
if (snapshot == null) { if (snapshot == null) {
snapshot = chief.getMetrics().snapshot(); snapshot = chief.getMetrics().snapshot();
} else { } else {
@ -138,22 +142,23 @@ public class RealtimeManager implements QuerySegmentWalker
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
{
return getQueryRunnerForSegments(query, null);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
{ {
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query); final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final Iterable<QueryRunner> runners;
Iterable<FireChief> chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames())); final List<String> names = query.getDataSource().getNames();
runners = Iterables.transform(
names, new Function<String, QueryRunner>()
{
@Override
public QueryRunner<T> apply(String input)
{
Map<Integer, FireChief> chiefsOfDataSource = chiefs.get(input);
return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(
factory.mergeRunners( factory.mergeRunners(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
Iterables.transform( Iterables.transform(
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>() chiefsOfDataSource.values(), new Function<FireChief, QueryRunner<T>>()
{ {
@Override @Override
public QueryRunner<T> apply(FireChief input) public QueryRunner<T> apply(FireChief input)
@ -165,6 +170,46 @@ public class RealtimeManager implements QuerySegmentWalker
) )
); );
} }
}
);
return new UnionQueryRunner<>(
runners, conglomerate.findFactory(query).getToolchest()
);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
List<QueryRunner> runners = new ArrayList();
for (String dataSource : query.getDataSource().getNames()) {
final Map<Integer, FireChief> dataSourceChiefs = RealtimeManager.this.chiefs.get(dataSource);
if (dataSourceChiefs == null) {
continue;
}
QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
Iterable<QueryRunner<T>> subRunners = Iterables.transform(
specs,
new Function<SegmentDescriptor, QueryRunner<T>>()
{
@Nullable
@Override
public QueryRunner<T> apply(SegmentDescriptor spec)
{
FireChief retVal = dataSourceChiefs.get(spec.getPartitionNumber());
return retVal == null ? new NoopQueryRunner<T>() : retVal.getQueryRunner(query);
}
}
);
runners.add(
toolchest.mergeResults(factory.mergeRunners(MoreExecutors.sameThreadExecutor(), subRunners))
);
}
return new UnionQueryRunner<>(
runners, conglomerate.findFactory(query).getToolchest()
);
}
private class FireChief extends Thread implements Closeable private class FireChief extends Thread implements Closeable
{ {