Merge pull request #894 from zhaown/fix-query-realtime-with-multi-partition

bugfix for #887
This commit is contained in:
Fangjin Yang 2014-12-07 09:49:31 -07:00
commit c255c16c1a
3 changed files with 78 additions and 18 deletions

View File

@ -21,6 +21,8 @@ package io.druid.segment.realtime;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
/**
*/
public class FireDepartmentMetrics
@ -79,4 +81,20 @@ public class FireDepartmentMetrics
retVal.rowOutputCount.set(rowOutputCount.get());
return retVal;
}
/**
* merge other FireDepartmentMetrics, will modify this object's data
*
* @return this object
*/
public FireDepartmentMetrics merge(FireDepartmentMetrics other)
{
Preconditions.checkNotNull(other, "Cannot merge a null FireDepartmentMetrics");
FireDepartmentMetrics otherSnapshot = other.snapshot();
processedCount.addAndGet(otherSnapshot.processed());
thrownAwayCount.addAndGet(otherSnapshot.thrownAway());
rowOutputCount.addAndGet(otherSnapshot.rowOutput());
unparseableCount.addAndGet(otherSnapshot.unparseable());
return this;
}
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@ -29,8 +31,10 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@ -43,14 +47,17 @@ import io.druid.query.SegmentDescriptor;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
*/
@ -60,17 +67,24 @@ public class RealtimeManager implements QuerySegmentWalker
private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate;
private ExecutorService executorService;
/**
* key=data source name,value=FireChiefs of all partition of that data source
*/
private final Map<String, List<FireChief>> chiefs;
private final Map<String, FireChief> chiefs;
@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate
QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject @Processing ExecutorService executorService
)
{
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.executorService = executorService;
this.chiefs = Maps.newHashMap();
}
@ -82,7 +96,12 @@ public class RealtimeManager implements QuerySegmentWalker
DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment);
chiefs.put(schema.getDataSource(), chief);
List<FireChief> chiefs = this.chiefs.get(schema.getDataSource());
if (chiefs == null) {
chiefs = new ArrayList<RealtimeManager.FireChief>();
this.chiefs.put(schema.getDataSource(), chiefs);
}
chiefs.add(chief);
chief.setName(String.format("chief-%s", schema.getDataSource()));
chief.setDaemon(true);
@ -94,34 +113,56 @@ public class RealtimeManager implements QuerySegmentWalker
@LifecycleStop
public void stop()
{
for (FireChief chief : chiefs.values()) {
CloseQuietly.close(chief);
for (Iterable<FireChief> chiefs : this.chiefs.values()) {
for (FireChief chief : chiefs) {
CloseQuietly.close(chief);
}
}
}
public FireDepartmentMetrics getMetrics(String datasource)
{
FireChief chief = chiefs.get(datasource);
if (chief == null) {
{
List<FireChief> chiefs = this.chiefs.get(datasource);
if (chiefs == null) {
return null;
}
return chief.getMetrics();
FireDepartmentMetrics snapshot = null;
for (FireChief chief : chiefs) {
if (snapshot == null) {
snapshot = chief.getMetrics().snapshot();
} else {
snapshot.merge(chief.getMetrics());
}
}
return snapshot;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final FireChief chief = chiefs.get(getDataSourceName(query));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
{
return getQueryRunnerForSegments(query, null);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
{
final FireChief chief = chiefs.get(getDataSourceName(query));
return chief == null ? new NoopQueryRunner<T>() : chief.getQueryRunner(query);
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
Iterable<FireChief> chiefsOfDataSource = chiefs.get(getDataSourceName(query));
return chiefsOfDataSource == null ? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
factory.mergeRunners(
executorService,
Iterables.transform(
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(FireChief input)
{
return input.getQueryRunner(query);
}
}
)
)
);
}
private <T> String getDataSourceName(Query<T> query)

View File

@ -118,6 +118,7 @@ public class RealtimeManagerTest
tuningConfig
)
),
null,
null
);
}