From bcc5f86723c2bae4dbb32aded08056b04801c529 Mon Sep 17 00:00:00 2001 From: Weinan Zhao Date: Wed, 26 Nov 2014 00:14:40 +0800 Subject: [PATCH] Modify according to fjy's review, add reformatting according to eaclipse_formatting.xml --- .../realtime/FireDepartmentMetrics.java | 17 +++ .../segment/realtime/RealtimeManager.java | 107 +++++++++++------- .../segment/realtime/RealtimeManagerTest.java | 1 + 3 files changed, 83 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java index e18034dd679..8a17c322d86 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java @@ -21,6 +21,8 @@ package io.druid.segment.realtime; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Preconditions; + /** */ public class FireDepartmentMetrics @@ -79,4 +81,19 @@ public class FireDepartmentMetrics retVal.rowOutputCount.set(rowOutputCount.get()); return retVal; } + + /** + * merge other FireDepartmentMetrics, will modify this object's data + * @return this object + */ + public FireDepartmentMetrics merge(FireDepartmentMetrics other) + { + Preconditions.checkNotNull(other, "Cannot merge a null FireDepartmentMetrics"); + FireDepartmentMetrics otherSnapshot = other.snapshot(); + processedCount.addAndGet(otherSnapshot.processed()); + thrownAwayCount.addAndGet(otherSnapshot.thrownAway()); + rowOutputCount.addAndGet(otherSnapshot.rowOutput()); + unparseableCount.addAndGet(otherSnapshot.unparseable()); + return this; + } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 435ab0a555b..f41398d1976 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime; +import com.fasterxml.jackson.annotation.JacksonInject; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -33,6 +34,7 @@ import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; +import io.druid.guice.annotations.Processing; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; @@ -55,7 +57,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; /** */ @@ -65,20 +67,24 @@ public class RealtimeManager implements QuerySegmentWalker private final List fireDepartments; private final QueryRunnerFactoryConglomerate conglomerate; + private ExecutorService executorService; + + /** + * key=data source name,value=FireChiefs of all partition of that data source + */ + private final Map> chiefs; - /** - * key=data source name,value=FireChiefs of all partition of that data source - */ - private final Map> chiefs; @Inject public RealtimeManager( List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate + QueryRunnerFactoryConglomerate conglomerate, + @JacksonInject @Processing ExecutorService executorService ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; + this.executorService = executorService; this.chiefs = Maps.newHashMap(); } @@ -89,60 +95,77 @@ public class RealtimeManager implements QuerySegmentWalker for (final FireDepartment fireDepartment : fireDepartments) { DataSchema schema = fireDepartment.getDataSchema(); - final FireChief chief = new FireChief(fireDepartment); - List chiefsOfDataSource = chiefs.get(schema.getDataSource()); - if (chiefsOfDataSource == null){ - chiefsOfDataSource = new ArrayList(); - chiefs.put(schema.getDataSource(), chiefsOfDataSource); - } - chiefsOfDataSource.add(chief); + final FireChief chief = new FireChief(fireDepartment); + List chiefs = this.chiefs.get(schema.getDataSource()); + if (chiefs == null) + { + chiefs = new ArrayList(); + this.chiefs.put(schema.getDataSource(), chiefs); + } + chiefs.add(chief); - chief.setName(String.format("chief-%s", schema.getDataSource())); - chief.setDaemon(true); - chief.init(); - chief.start(); + chief.setName(String.format("chief-%s", schema.getDataSource())); + chief.setDaemon(true); + chief.init(); + chief.start(); } } @LifecycleStop public void stop() { - for (Iterable chiefOfDatasource : chiefs.values()) { - for (FireChief chief: chiefOfDatasource) { - CloseQuietly.close(chief); - } - } + for (Iterable chiefs : this.chiefs.values()) + { + for (FireChief chief : chiefs) + { + CloseQuietly.close(chief); + } + } } public FireDepartmentMetrics getMetrics(String datasource) - { - List chiefsOfDatasource = chiefs.get(datasource); - if (chiefsOfDatasource == null || chiefsOfDatasource.size() == 0) { - return null; - } - return chiefsOfDatasource.get(0).getMetrics(); + { + List chiefs = this.chiefs.get(datasource); + if (chiefs == null) + { + return null; + } + FireDepartmentMetrics snapshot = null; + for (FireChief chief : chiefs) + { + if (snapshot == null) + { + snapshot = chief.getMetrics().snapshot(); + } else + { + snapshot.merge(chief.getMetrics()); + } + } + return snapshot; } @Override public QueryRunner getQueryRunnerForIntervals(final Query query, Iterable intervals) - { - return getQueryRunnerForSegments(query, null); + { + return getQueryRunnerForSegments(query, null); } @Override public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) - { - QueryRunnerFactory> factory = conglomerate.findFactory(query); - Iterable chiefsOfDataSource = chiefs.get(getDataSourceName(query)); - // the SindleThreadExecutor is only used to submit sub queries, instead of run them. 'cause can't find decent way to referrence the QueryExecutorService - return chiefsOfDataSource == null? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners(Executors.newSingleThreadExecutor(), - Iterables.transform(chiefsOfDataSource, new Function>() { - @Override - public QueryRunner apply(FireChief input) { - return input.getQueryRunner(query); - }}))); - } + { + QueryRunnerFactory> factory = conglomerate.findFactory(query); + Iterable chiefsOfDataSource = chiefs.get(getDataSourceName(query)); + return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners(executorService, + Iterables.transform(chiefsOfDataSource, new Function>() + { + @Override + public QueryRunner apply(FireChief input) + { + return input.getQueryRunner(query); + } + }))); + } private String getDataSourceName(Query query) { diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index a2fdeadc9b1..294803beacd 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -118,6 +118,7 @@ public class RealtimeManagerTest tuningConfig ) ), + null, null ); }