From 57468d39efe90d0c3c3d250d8513c4f460332a12 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 12 Nov 2012 16:14:48 -0800 Subject: [PATCH] reverting some of the last changes --- .../main/java/com/metamx/druid/BaseNode.java | 26 ++++ .../java/com/metamx/druid/QueryableNode.java | 9 +- .../druid/index/v1/serde/Registererer.java | 2 +- .../indexer/HadoopDruidIndexerConfig.java | 2 +- .../druid/indexer/HadoopDruidIndexerNode.java | 3 +- .../http/IndexerCoordinatorNode.java | 10 +- .../druid/merger/worker/http/WorkerNode.java | 8 +- .../java/com/metamx/druid/BaseServerNode.java | 124 ++++++++++++++++++ 8 files changed, 157 insertions(+), 27 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/BaseNode.java create mode 100644 server/src/main/java/com/metamx/druid/BaseServerNode.java diff --git a/client/src/main/java/com/metamx/druid/BaseNode.java b/client/src/main/java/com/metamx/druid/BaseNode.java new file mode 100644 index 00000000000..1943ce288c4 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/BaseNode.java @@ -0,0 +1,26 @@ +package com.metamx.druid; + +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import org.codehaus.jackson.map.ObjectMapper; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + +/** + */ +@Deprecated +public abstract class BaseNode extends QueryableNode +{ + protected BaseNode( + Logger log, + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory + ) + { + super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + } +} diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 95f3998b7be..31d15ee4cfd 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -173,18 +173,11 @@ public abstract class QueryableNode implements Register return (T) this; } - @SuppressWarnings("unchecked") - public T registerHandler(Registererer registererer) - { - registererer.registerSerde(); - return (T) this; - } - @Override public void registerHandlers(Registererer... registererers) { for (Registererer registererer : registererers) { - registererer.registerSerde(); + registererer.register(); registererer.registerSubType(jsonMapper); registererer.registerSubType(smileMapper); } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java index 6763df8c5ee..4a90f35a288 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java @@ -31,7 +31,7 @@ import org.codehaus.jackson.map.ObjectMapper; */ public interface Registererer { - public void registerSerde(); + public void register(); public void registerSubType(ObjectMapper jsonMapper); } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 7755f700b6b..a6d7a878ae1 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -105,7 +105,7 @@ public class HadoopDruidIndexerConfig } ); for (Registererer registererer : registererers) { - registererer.registerSerde(); + registererer.register(); } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java index 1822b83b15f..9236adcf454 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java @@ -6,7 +6,6 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.RegisteringNode; import com.metamx.druid.index.v1.serde.Registererer; -import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.jsontype.NamedType; import org.joda.time.Interval; @@ -66,7 +65,7 @@ public class HadoopDruidIndexerNode implements RegisteringNode public void registerHandlers(Registererer... registererers) { for (Registererer registererer : registererers) { - registererer.registerSerde(); + registererer.register(); registererer.registerSubType(HadoopDruidIndexerConfig.jsonMapper); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 69dd93d107d..a4c33bf5090 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -185,18 +185,12 @@ public class IndexerCoordinatorNode implements RegisteringNode this.taskRunnerFactory = taskRunnerFactory; } - public IndexerCoordinatorNode registerHandler(Registererer registererer) - { - registererer.registerSerde(); - return this; - } - @Override public void registerHandlers(Registererer... registererers) { for (Registererer registererer : registererers) { - registererer.registerSerde(); - registererer.registerSubType(this.jsonMapper); + registererer.register(); + registererer.registerSubType(jsonMapper); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 2849737a244..a32fcbbd676 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -149,17 +149,11 @@ public class WorkerNode implements RegisteringNode return this; } - public WorkerNode registerHandler(Registererer registererer) - { - registererer.registerSerde(); - return this; - } - @Override public void registerHandlers(Registererer... registererers) { for (Registererer registererer : registererers) { - registererer.registerSerde(); + registererer.register(); registererer.registerSubType(jsonMapper); } } diff --git a/server/src/main/java/com/metamx/druid/BaseServerNode.java b/server/src/main/java/com/metamx/druid/BaseServerNode.java new file mode 100644 index 00000000000..0813762c2d4 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/BaseServerNode.java @@ -0,0 +1,124 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.druid.collect.StupidPool; +import com.metamx.druid.initialization.ServerInit; +import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; +import com.metamx.druid.query.QueryRunnerFactory; +import com.metamx.druid.query.QueryRunnerFactoryConglomerate; +import com.metamx.druid.utils.PropUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.skife.config.ConfigurationObjectFactory; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; + +/** + */ +public abstract class BaseServerNode extends QueryableNode +{ + private final Map, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap(); + private QueryRunnerFactoryConglomerate conglomerate = null; + private StupidPool computeScratchPool = null; + + public BaseServerNode( + Logger log, + Properties props, + Lifecycle lifecycle, + ObjectMapper jsonMapper, + ObjectMapper smileMapper, + ConfigurationObjectFactory configFactory + ) + { + super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + } + + public QueryRunnerFactoryConglomerate getConglomerate() + { + initializeQueryRunnerFactoryConglomerate(); + return conglomerate; + } + + public StupidPool getComputeScratchPool() + { + initializeComputeScratchPool(); + return computeScratchPool; + } + + @SuppressWarnings("unchecked") + public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate) + { + checkFieldNotSetAndSet("conglomerate", conglomerate); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T setComputeScratchPool(StupidPool computeScratchPool) + { + checkFieldNotSetAndSet("computeScratchPool", computeScratchPool); + return (T) this; + } + + @SuppressWarnings("unchecked") + public T registerQueryRunnerFactory(Class queryClazz, QueryRunnerFactory factory) + { + Preconditions.checkState( + conglomerate == null, + "Registering a QueryRunnerFactory only works when a separate conglomerate is not specified." + ); + Preconditions.checkState( + !additionalFactories.containsKey(queryClazz), "Registered factory for class[%s] multiple times", queryClazz + ); + additionalFactories.put(queryClazz, factory); + return (T) this; + } + + private void initializeComputeScratchPool() + { + if (computeScratchPool == null) { + setComputeScratchPool( + ServerInit.makeComputeScratchPool( + PropUtils.getPropertyAsInt(getProps(), "druid.computation.buffer.size", 1024 * 1024 * 1024) + ) + ); + } + } + + private void initializeQueryRunnerFactoryConglomerate() + { + if (conglomerate == null) { + final Map, QueryRunnerFactory> factories = ServerInit.initDefaultQueryTypes( + getConfigFactory(), getComputeScratchPool() + ); + + for (Map.Entry, QueryRunnerFactory> entry : additionalFactories.entrySet()) { + factories.put(entry.getKey(), entry.getValue()); + } + + setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories)); + } + } +}