From c20dccd0f4fcce2594cce561a3e69a61804c771d Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 12 Nov 2012 13:58:43 -0800 Subject: [PATCH] modifying the way registering serdes works to hopefully be a bit easier to use --- .../druid/{BaseNode.java => QueryableNode.java} | 16 +++++++++++++--- .../java/com/metamx/druid/RegisteringNode.java | 10 ++++++++++ .../java/com/metamx/druid/http/BrokerNode.java | 4 ++-- .../druid/index/v1/serde/Registererer.java | 6 +++++- .../druid/indexer/HadoopDruidIndexerConfig.java | 2 +- .../druid/indexer/HadoopDruidIndexerNode.java | 14 +++++++++++++- .../metamx/druid/indexer/IndexGeneratorJob.java | 3 +-- .../coordinator/http/IndexerCoordinatorNode.java | 14 ++++++++++++-- .../druid/merger/worker/http/WorkerNode.java | 14 ++++++++++++-- .../com/metamx/druid/realtime/RealtimeNode.java | 4 ++-- .../{BaseServerNode.java => ServerNode.java} | 4 ++-- .../java/com/metamx/druid/http/ComputeNode.java | 4 ++-- 12 files changed, 75 insertions(+), 20 deletions(-) rename client/src/main/java/com/metamx/druid/{BaseNode.java => QueryableNode.java} (96%) create mode 100644 client/src/main/java/com/metamx/druid/RegisteringNode.java rename server/src/main/java/com/metamx/druid/{BaseServerNode.java => ServerNode.java} (97%) diff --git a/client/src/main/java/com/metamx/druid/BaseNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java similarity index 96% rename from client/src/main/java/com/metamx/druid/BaseNode.java rename to client/src/main/java/com/metamx/druid/QueryableNode.java index dc45cd6eef8..95f3998b7be 100644 --- a/client/src/main/java/com/metamx/druid/BaseNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -62,7 +62,7 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public abstract class BaseNode +public abstract class QueryableNode implements RegisteringNode { private final Logger log; @@ -82,7 +82,7 @@ public abstract class BaseNode private boolean initialized = false; - public BaseNode( + public QueryableNode( Logger log, Properties props, Lifecycle lifecycle, @@ -176,10 +176,20 @@ public abstract class BaseNode @SuppressWarnings("unchecked") public T registerHandler(Registererer registererer) { - registererer.register(); + registererer.registerSerde(); return (T) this; } + @Override + public void registerHandlers(Registererer... registererers) + { + for (Registererer registererer : registererers) { + registererer.registerSerde(); + registererer.registerSubType(jsonMapper); + registererer.registerSubType(smileMapper); + } + } + public Lifecycle getLifecycle() { return lifecycle; diff --git a/client/src/main/java/com/metamx/druid/RegisteringNode.java b/client/src/main/java/com/metamx/druid/RegisteringNode.java new file mode 100644 index 00000000000..e3ef92a8f05 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/RegisteringNode.java @@ -0,0 +1,10 @@ +package com.metamx.druid; + +import com.metamx.druid.index.v1.serde.Registererer; + +/** + */ +public interface RegisteringNode +{ + public void registerHandlers(Registererer... registererers); +} diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java index b8f1f5c4d31..0bcab531f4d 100644 --- a/client/src/main/java/com/metamx/druid/http/BrokerNode.java +++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java @@ -29,7 +29,7 @@ import com.metamx.common.ISE; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; -import com.metamx.druid.BaseNode; +import com.metamx.druid.QueryableNode; import com.metamx.druid.client.BrokerServerView; import com.metamx.druid.client.CachingClusteredClient; import com.metamx.druid.client.ClientConfig; @@ -62,7 +62,7 @@ import java.util.Properties; /** */ -public class BrokerNode extends BaseNode +public class BrokerNode extends QueryableNode { private static final Logger log = new Logger(BrokerNode.class); diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java index f560dfdc1e6..6763df8c5ee 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/serde/Registererer.java @@ -19,6 +19,8 @@ package com.metamx.druid.index.v1.serde; +import org.codehaus.jackson.map.ObjectMapper; + /** * This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of * another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface @@ -29,5 +31,7 @@ package com.metamx.druid.index.v1.serde; */ public interface Registererer { - public void register(); + public void registerSerde(); + + public void registerSubType(ObjectMapper jsonMapper); } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index a6d7a878ae1..7755f700b6b 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -105,7 +105,7 @@ public class HadoopDruidIndexerConfig } ); for (Registererer registererer : registererers) { - registererer.register(); + registererer.registerSerde(); } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java index 5d7c281aef9..1822b83b15f 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerNode.java @@ -4,6 +4,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.druid.RegisteringNode; +import com.metamx.druid.index.v1.serde.Registererer; +import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.jsontype.NamedType; import org.joda.time.Interval; @@ -13,7 +16,7 @@ import java.util.List; /** */ -public class HadoopDruidIndexerNode +public class HadoopDruidIndexerNode implements RegisteringNode { public static Builder builder() { @@ -59,6 +62,15 @@ public class HadoopDruidIndexerNode return this; } + @Override + public void registerHandlers(Registererer... registererers) + { + for (Registererer registererer : registererers) { + registererer.registerSerde(); + registererer.registerSubType(HadoopDruidIndexerConfig.jsonMapper); + } + } + @LifecycleStart public void start() throws Exception { diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index e5612b0db7f..9914da18c13 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -253,7 +253,6 @@ public class IndexGeneratorJob implements Jobby public static class IndexGeneratorReducer extends Reducer { private HadoopDruidIndexerConfig config; - private final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); private List metricNames = Lists.newArrayList(); private Function timestampConverter; private Parser parser; @@ -573,7 +572,7 @@ public class IndexGeneratorJob implements Jobby final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath); try { - jsonMapper.writeValue(descriptorOut, segment); + HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment); } finally { descriptorOut.close(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index d7ab574b644..69dd93d107d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -36,6 +36,7 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.RegisteringNode; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.GuiceServletConfig; @@ -108,7 +109,7 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class IndexerCoordinatorNode +public class IndexerCoordinatorNode implements RegisteringNode { private static final Logger log = new Logger(IndexerCoordinatorNode.class); @@ -186,10 +187,19 @@ public class IndexerCoordinatorNode public IndexerCoordinatorNode registerHandler(Registererer registererer) { - registererer.register(); + registererer.registerSerde(); return this; } + @Override + public void registerHandlers(Registererer... registererers) + { + for (Registererer registererer : registererers) { + registererer.registerSerde(); + registererer.registerSubType(this.jsonMapper); + } + } + public void init() throws Exception { scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index cc30b914367..2849737a244 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -28,6 +28,7 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.RegisteringNode; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.index.v1.serde.Registererer; import com.metamx.druid.initialization.CuratorConfig; @@ -79,7 +80,7 @@ import java.util.concurrent.ScheduledExecutorService; /** */ -public class WorkerNode +public class WorkerNode implements RegisteringNode { private static final Logger log = new Logger(WorkerNode.class); @@ -150,10 +151,19 @@ public class WorkerNode public WorkerNode registerHandler(Registererer registererer) { - registererer.register(); + registererer.registerSerde(); return this; } + @Override + public void registerHandlers(Registererer... registererers) + { + for (Registererer registererer : registererers) { + registererer.registerSerde(); + registererer.registerSubType(jsonMapper); + } + } + public void init() throws Exception { initializeEmitter(); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java index 60d290992d5..8956151f58b 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java @@ -28,7 +28,7 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; -import com.metamx.druid.BaseServerNode; +import com.metamx.druid.ServerNode; import com.metamx.druid.client.ClientConfig; import com.metamx.druid.client.ClientInventoryManager; import com.metamx.druid.client.MutableServerView; @@ -65,7 +65,7 @@ import java.util.Properties; /** */ -public class RealtimeNode extends BaseServerNode +public class RealtimeNode extends ServerNode { private static final Logger log = new Logger(RealtimeNode.class); diff --git a/server/src/main/java/com/metamx/druid/BaseServerNode.java b/server/src/main/java/com/metamx/druid/ServerNode.java similarity index 97% rename from server/src/main/java/com/metamx/druid/BaseServerNode.java rename to server/src/main/java/com/metamx/druid/ServerNode.java index fcb8a1ab46b..f12fdaefbcc 100644 --- a/server/src/main/java/com/metamx/druid/BaseServerNode.java +++ b/server/src/main/java/com/metamx/druid/ServerNode.java @@ -38,13 +38,13 @@ import java.util.Properties; /** */ -public abstract class BaseServerNode extends BaseNode +public abstract class ServerNode extends QueryableNode { private final Map, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap(); private QueryRunnerFactoryConglomerate conglomerate = null; private StupidPool computeScratchPool = null; - public BaseServerNode( + public ServerNode( Logger log, Properties props, Lifecycle lifecycle, diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 1369b0898d2..2c1531db3b0 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -28,7 +28,7 @@ import com.metamx.common.concurrent.ExecutorServices; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; -import com.metamx.druid.BaseServerNode; +import com.metamx.druid.ServerNode; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.coordination.ServerManager; @@ -59,7 +59,7 @@ import java.util.concurrent.ExecutorService; /** */ -public class ComputeNode extends BaseServerNode +public class ComputeNode extends ServerNode { private static final Logger log = new Logger(ComputeNode.class);