use num processing threads for realtime query execution

This commit is contained in:
fjy 2013-09-13 13:54:41 -07:00
parent e03c623467
commit 1ec098c010
4 changed files with 54 additions and 18 deletions

View File

@ -189,6 +189,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
injectables.put("segmentPublisher", getSegmentPublisher()); injectables.put("segmentPublisher", getSegmentPublisher());
injectables.put("serverView", getServerView()); injectables.put("serverView", getServerView());
injectables.put("serviceEmitter", getEmitter()); injectables.put("serviceEmitter", getEmitter());
injectables.put("queryExecutorService", getQueryExecutorService());
getJsonMapper().setInjectableValues( getJsonMapper().setInjectableValues(
new InjectableValues() new InjectableValues()

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -92,7 +91,6 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumberSchool implements PlumberSchool public class RealtimePlumberSchool implements PlumberSchool
{ {
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class); private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private static final ListeningExecutorService EXEC = MoreExecutors.sameThreadExecutor();
private final Period windowPeriod; private final Period windowPeriod;
private final File basePersistDirectory; private final File basePersistDirectory;
@ -108,6 +106,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile DataSegmentAnnouncer segmentAnnouncer = null; private volatile DataSegmentAnnouncer segmentAnnouncer = null;
private volatile SegmentPublisher segmentPublisher = null; private volatile SegmentPublisher segmentPublisher = null;
private volatile ServerView serverView = null; private volatile ServerView serverView = null;
private volatile ExecutorService queryExecutorService = null;
@JsonCreator @JsonCreator
public RealtimePlumberSchool( public RealtimePlumberSchool(
@ -175,6 +174,12 @@ public class RealtimePlumberSchool implements PlumberSchool
this.emitter = emitter; this.emitter = emitter;
} }
@JacksonInject("queryExecutorService")
public void setQueryExecutorService(ExecutorService queryExecutorService)
{
this.queryExecutorService = queryExecutorService;
}
@Override @Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{ {
@ -262,7 +267,7 @@ public class RealtimePlumberSchool implements PlumberSchool
return toolchest.mergeResults( return toolchest.mergeResults(
factory.mergeRunners( factory.mergeRunners(
EXEC, queryExecutorService,
FunctionalIterable FunctionalIterable
.create(querySinks) .create(querySinks)
.transform( .transform(
@ -277,7 +282,7 @@ public class RealtimePlumberSchool implements PlumberSchool
emitter, emitter,
builderFn, builderFn,
factory.mergeRunners( factory.mergeRunners(
EXEC, MoreExecutors.sameThreadExecutor(),
Iterables.transform( Iterables.transform(
theSink, theSink,
new Function<FireHydrant, QueryRunner<T>>() new Function<FireHydrant, QueryRunner<T>>()

View File

@ -23,19 +23,24 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.collect.StupidPool; import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.PrioritizedExecutorService;
import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService;
/** /**
*/ */
@ -45,6 +50,7 @@ public abstract class BaseServerNode<T extends QueryableNode> extends QueryableN
private DruidProcessingConfig processingConfig = null; private DruidProcessingConfig processingConfig = null;
private QueryRunnerFactoryConglomerate conglomerate = null; private QueryRunnerFactoryConglomerate conglomerate = null;
private StupidPool<ByteBuffer> computeScratchPool = null; private StupidPool<ByteBuffer> computeScratchPool = null;
private ExecutorService queryExecutorService = null;
public BaseServerNode( public BaseServerNode(
String nodeType, String nodeType,
@ -77,6 +83,12 @@ public abstract class BaseServerNode<T extends QueryableNode> extends QueryableN
return processingConfig; return processingConfig;
} }
public ExecutorService getQueryExecutorService()
{
initializeQueryExecutorService();
return queryExecutorService;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate) public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{ {
@ -98,6 +110,13 @@ public abstract class BaseServerNode<T extends QueryableNode> extends QueryableN
return (T) this; return (T) this;
} }
@SuppressWarnings("unchecked")
public T setQueryExecutorService(ExecutorService queryExecutorService)
{
checkFieldNotSetAndSet("queryExecutorService", queryExecutorService);
return (T) this;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T registerQueryRunnerFactory(Class<? extends Query> queryClazz, QueryRunnerFactory factory) public T registerQueryRunnerFactory(Class<? extends Query> queryClazz, QueryRunnerFactory factory)
{ {
@ -144,4 +163,24 @@ public abstract class BaseServerNode<T extends QueryableNode> extends QueryableN
); );
} }
} }
private void initializeQueryExecutorService()
{
if (queryExecutorService == null) {
final PrioritizedExecutorService innerExecutorService = PrioritizedExecutorService.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
setQueryExecutorService(
new MetricsEmittingExecutorService(
innerExecutorService,
getEmitter(),
new ServiceMetricEvent.Builder()
)
);
}
}
} }

View File

@ -97,21 +97,13 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final List<Monitor> monitors = getMonitors(); final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final PrioritizedExecutorService innerExecutorService = PrioritizedExecutorService.create( final ServerManager serverManager = new ServerManager(
getLifecycle(), getSegmentLoader(),
getConfigFactory().buildWithReplacements( conglomerate,
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
final ExecutorService executorService = new MetricsEmittingExecutorService(
innerExecutorService,
emitter, emitter,
new ServiceMetricEvent.Builder() getQueryExecutorService()
); );
final ServerManager serverManager = new ServerManager(getSegmentLoader(), conglomerate, emitter, executorService);
final ZkCoordinator coordinator = new ZkCoordinator( final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(), getJsonMapper(),
getConfigFactory().build(ZkCoordinatorConfig.class), getConfigFactory().build(ZkCoordinatorConfig.class),
@ -206,5 +198,4 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
return new ComputeNode(props, lifecycle, jsonMapper, smileMapper, configFactory); return new ComputeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
} }
} }
}
}