Modify according to fjy's review, add reformatting according to eaclipse_formatting.xml

This commit is contained in:
Weinan Zhao 2014-11-26 00:14:40 +08:00
parent 5096cbc5e4
commit bcc5f86723
3 changed files with 83 additions and 42 deletions

View File

@ -21,6 +21,8 @@ package io.druid.segment.realtime;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
/** /**
*/ */
public class FireDepartmentMetrics public class FireDepartmentMetrics
@ -79,4 +81,19 @@ public class FireDepartmentMetrics
retVal.rowOutputCount.set(rowOutputCount.get()); retVal.rowOutputCount.set(rowOutputCount.get());
return retVal; return retVal;
} }
/**
* merge other FireDepartmentMetrics, will modify this object's data
* @return this object
*/
public FireDepartmentMetrics merge(FireDepartmentMetrics other)
{
Preconditions.checkNotNull(other, "Cannot merge a null FireDepartmentMetrics");
FireDepartmentMetrics otherSnapshot = other.snapshot();
processedCount.addAndGet(otherSnapshot.processed());
thrownAwayCount.addAndGet(otherSnapshot.thrownAway());
rowOutputCount.addAndGet(otherSnapshot.rowOutput());
unparseableCount.addAndGet(otherSnapshot.unparseable());
return this;
}
} }

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -33,6 +34,7 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner; import io.druid.query.NoopQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
@ -55,7 +57,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService;
/** /**
*/ */
@ -65,20 +67,24 @@ public class RealtimeManager implements QuerySegmentWalker
private final List<FireDepartment> fireDepartments; private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate; private final QueryRunnerFactoryConglomerate conglomerate;
private ExecutorService executorService;
/**
* key=data source name,value=FireChiefs of all partition of that data source
*/
private final Map<String, List<FireChief>> chiefs;
/**
* key=data source name,value=FireChiefs of all partition of that data source
*/
private final Map<String, List<FireChief>> chiefs;
@Inject @Inject
public RealtimeManager( public RealtimeManager(
List<FireDepartment> fireDepartments, List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject @Processing ExecutorService executorService
) )
{ {
this.fireDepartments = fireDepartments; this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate; this.conglomerate = conglomerate;
this.executorService = executorService;
this.chiefs = Maps.newHashMap(); this.chiefs = Maps.newHashMap();
} }
@ -89,60 +95,77 @@ public class RealtimeManager implements QuerySegmentWalker
for (final FireDepartment fireDepartment : fireDepartments) { for (final FireDepartment fireDepartment : fireDepartments) {
DataSchema schema = fireDepartment.getDataSchema(); DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment); final FireChief chief = new FireChief(fireDepartment);
List<FireChief> chiefsOfDataSource = chiefs.get(schema.getDataSource()); List<FireChief> chiefs = this.chiefs.get(schema.getDataSource());
if (chiefsOfDataSource == null){ if (chiefs == null)
chiefsOfDataSource = new ArrayList<RealtimeManager.FireChief>(); {
chiefs.put(schema.getDataSource(), chiefsOfDataSource); chiefs = new ArrayList<RealtimeManager.FireChief>();
} this.chiefs.put(schema.getDataSource(), chiefs);
chiefsOfDataSource.add(chief); }
chiefs.add(chief);
chief.setName(String.format("chief-%s", schema.getDataSource())); chief.setName(String.format("chief-%s", schema.getDataSource()));
chief.setDaemon(true); chief.setDaemon(true);
chief.init(); chief.init();
chief.start(); chief.start();
} }
} }
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
for (Iterable<FireChief> chiefOfDatasource : chiefs.values()) { for (Iterable<FireChief> chiefs : this.chiefs.values())
for (FireChief chief: chiefOfDatasource) { {
CloseQuietly.close(chief); for (FireChief chief : chiefs)
} {
} CloseQuietly.close(chief);
}
}
} }
public FireDepartmentMetrics getMetrics(String datasource) public FireDepartmentMetrics getMetrics(String datasource)
{ {
List<FireChief> chiefsOfDatasource = chiefs.get(datasource); List<FireChief> chiefs = this.chiefs.get(datasource);
if (chiefsOfDatasource == null || chiefsOfDatasource.size() == 0) { if (chiefs == null)
return null; {
} return null;
return chiefsOfDatasource.get(0).getMetrics(); }
FireDepartmentMetrics snapshot = null;
for (FireChief chief : chiefs)
{
if (snapshot == null)
{
snapshot = chief.getMetrics().snapshot();
} else
{
snapshot.merge(chief.getMetrics());
}
}
return snapshot;
} }
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals) public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, Iterable<Interval> intervals)
{ {
return getQueryRunnerForSegments(query, null); return getQueryRunnerForSegments(query, null);
} }
@Override @Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs) public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> specs)
{ {
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query); QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
Iterable<FireChief> chiefsOfDataSource = chiefs.get(getDataSourceName(query)); Iterable<FireChief> chiefsOfDataSource = chiefs.get(getDataSourceName(query));
// the SindleThreadExecutor is only used to submit sub queries, instead of run them. 'cause can't find decent way to referrence the QueryExecutorService return chiefsOfDataSource == null ? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
return chiefsOfDataSource == null? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults( factory.mergeRunners(executorService,
factory.mergeRunners(Executors.newSingleThreadExecutor(), Iterables.transform(chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
Iterables.transform(chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>() { {
@Override @Override
public QueryRunner<T> apply(FireChief input) { public QueryRunner<T> apply(FireChief input)
return input.getQueryRunner(query); {
}}))); return input.getQueryRunner(query);
} }
})));
}
private <T> String getDataSourceName(Query<T> query) private <T> String getDataSourceName(Query<T> query)
{ {

View File

@ -118,6 +118,7 @@ public class RealtimeManagerTest
tuningConfig tuningConfig
) )
), ),
null,
null null
); );
} }