fix bug for #887, that is cannot query multi partitions of one datasource on realtime node.

This commit is contained in:
Weinan Zhao 2014-11-20 16:36:57 +08:00
parent 2f08ab85fc
commit 5096cbc5e4
1 changed files with 35 additions and 15 deletions

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -29,6 +30,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException; import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
@ -43,14 +45,17 @@ import io.druid.query.SegmentDescriptor;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors;
/** /**
*/ */
@ -61,7 +66,10 @@ public class RealtimeManager implements QuerySegmentWalker
private final List<FireDepartment> fireDepartments; private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate; private final QueryRunnerFactoryConglomerate conglomerate;
private final Map<String, FireChief> chiefs; /**
* key=data source name,value=FireChiefs of all partition of that data source
*/
private final Map<String, List<FireChief>> chiefs;
@Inject @Inject
public RealtimeManager( public RealtimeManager(
@ -82,7 +90,12 @@ public class RealtimeManager implements QuerySegmentWalker
DataSchema schema = fireDepartment.getDataSchema(); DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment); final FireChief chief = new FireChief(fireDepartment);
chiefs.put(schema.getDataSource(), chief); List<FireChief> chiefsOfDataSource = chiefs.get(schema.getDataSource());
if (chiefsOfDataSource == null){
chiefsOfDataSource = new ArrayList<RealtimeManager.FireChief>();
chiefs.put(schema.getDataSource(), chiefsOfDataSource);
}
chiefsOfDataSource.add(chief);
chief.setName(String.format("chief-%s", schema.getDataSource())); chief.setName(String.format("chief-%s", schema.getDataSource()));
chief.setDaemon(true); chief.setDaemon(true);
@ -94,34 +107,41 @@ public class RealtimeManager implements QuerySegmentWalker
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
for (FireChief chief : chiefs.values()) { for (Iterable<FireChief> chiefOfDatasource : chiefs.values()) {
CloseQuietly.close(chief); for (FireChief chief: chiefOfDatasource) {
CloseQuietly.close(chief);
}
} }
} }
public FireDepartmentMetrics getMetrics(String datasource) public FireDepartmentMetrics getMetrics(String datasource)
{ {
FireChief chief = chiefs.get(datasource); List<FireChief> chiefsOfDatasource = chiefs.get(datasource);
if (chief == null) { if (chiefsOfDatasource == null || chiefsOfDatasource.size() == 0) {
return null; return null;
} }
return chief.getMetrics(); return chiefsOfDatasource.get(0).getMetrics();
} }
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
{ {
final FireChief chief = chiefs.get(getDataSourceName(query)); return getQueryRunnerForSegments(query, null);
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
} }
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
{ {
final FireChief chief = chiefs.get(getDataSourceName(query)); QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
Iterable<FireChief> chiefsOfDataSource = chiefs.get(getDataSourceName(query));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query); // the SindleThreadExecutor is only used to submit sub queries, instead of run them. 'cause can't find decent way to referrence the QueryExecutorService
return chiefsOfDataSource == null? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
factory.mergeRunners(Executors.newSingleThreadExecutor(),
Iterables.transform(chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>() {
@Override
public QueryRunner<T> apply(FireChief input) {
return input.getQueryRunner(query);
}})));
} }
private <T> String getDataSourceName(Query<T> query) private <T> String getDataSourceName(Query<T> query)