modifying the way registering serdes works to hopefully be a bit easier to use

This commit is contained in:
Fangjin Yang 2012-11-12 13:58:43 -08:00
parent c30f04e0d4
commit c20dccd0f4
12 changed files with 75 additions and 20 deletions

View File

@ -62,7 +62,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class BaseNode<T extends BaseNode>
public abstract class QueryableNode<T extends QueryableNode> implements RegisteringNode
{
private final Logger log;
@ -82,7 +82,7 @@ public abstract class BaseNode<T extends BaseNode>
private boolean initialized = false;
public BaseNode(
public QueryableNode(
Logger log,
Properties props,
Lifecycle lifecycle,
@ -176,10 +176,20 @@ public abstract class BaseNode<T extends BaseNode>
@SuppressWarnings("unchecked")
public T registerHandler(Registererer registererer)
{
registererer.register();
registererer.registerSerde();
return (T) this;
}
@Override
public void registerHandlers(Registererer... registererers)
{
for (Registererer registererer : registererers) {
registererer.registerSerde();
registererer.registerSubType(jsonMapper);
registererer.registerSubType(smileMapper);
}
}
public Lifecycle getLifecycle()
{
return lifecycle;

View File

@ -0,0 +1,10 @@
package com.metamx.druid;
import com.metamx.druid.index.v1.serde.Registererer;
/**
*/
public interface RegisteringNode
{
public void registerHandlers(Registererer... registererers);
}

View File

@ -29,7 +29,7 @@ import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseNode;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig;
@ -62,7 +62,7 @@ import java.util.Properties;
/**
*/
public class BrokerNode extends BaseNode<BrokerNode>
public class BrokerNode extends QueryableNode<BrokerNode>
{
private static final Logger log = new Logger(BrokerNode.class);

View File

@ -19,6 +19,8 @@
package com.metamx.druid.index.v1.serde;
import org.codehaus.jackson.map.ObjectMapper;
/**
* This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
@ -29,5 +31,7 @@ package com.metamx.druid.index.v1.serde;
*/
public interface Registererer
{
public void register();
public void registerSerde();
public void registerSubType(ObjectMapper jsonMapper);
}

View File

@ -105,7 +105,7 @@ public class HadoopDruidIndexerConfig
}
);
for (Registererer registererer : registererers) {
registererer.register();
registererer.registerSerde();
}
}

View File

@ -4,6 +4,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.index.v1.serde.Registererer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.joda.time.Interval;
@ -13,7 +16,7 @@ import java.util.List;
/**
*/
public class HadoopDruidIndexerNode
public class HadoopDruidIndexerNode implements RegisteringNode
{
public static Builder builder()
{
@ -59,6 +62,15 @@ public class HadoopDruidIndexerNode
return this;
}
@Override
public void registerHandlers(Registererer... registererers)
{
for (Registererer registererer : registererers) {
registererer.registerSerde();
registererer.registerSubType(HadoopDruidIndexerConfig.jsonMapper);
}
}
@LifecycleStart
public void start() throws Exception
{

View File

@ -253,7 +253,6 @@ public class IndexGeneratorJob implements Jobby
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>
{
private HadoopDruidIndexerConfig config;
private final DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
private List<String> metricNames = Lists.newArrayList();
private Function<String, DateTime> timestampConverter;
private Parser parser;
@ -573,7 +572,7 @@ public class IndexGeneratorJob implements Jobby
final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath);
try {
jsonMapper.writeValue(descriptorOut, segment);
HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
}
finally {
descriptorOut.close();

View File

@ -36,6 +36,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.GuiceServletConfig;
@ -108,7 +109,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class IndexerCoordinatorNode
public class IndexerCoordinatorNode implements RegisteringNode
{
private static final Logger log = new Logger(IndexerCoordinatorNode.class);
@ -186,10 +187,19 @@ public class IndexerCoordinatorNode
public IndexerCoordinatorNode registerHandler(Registererer registererer)
{
registererer.register();
registererer.registerSerde();
return this;
}
@Override
public void registerHandlers(Registererer... registererers)
{
for (Registererer registererer : registererers) {
registererer.registerSerde();
registererer.registerSubType(this.jsonMapper);
}
}
public void init() throws Exception
{
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);

View File

@ -28,6 +28,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.initialization.CuratorConfig;
@ -79,7 +80,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class WorkerNode
public class WorkerNode implements RegisteringNode
{
private static final Logger log = new Logger(WorkerNode.class);
@ -150,10 +151,19 @@ public class WorkerNode
public WorkerNode registerHandler(Registererer registererer)
{
registererer.register();
registererer.registerSerde();
return this;
}
@Override
public void registerHandlers(Registererer... registererers)
{
for (Registererer registererer : registererers) {
registererer.registerSerde();
registererer.registerSubType(jsonMapper);
}
}
public void init() throws Exception
{
initializeEmitter();

View File

@ -28,7 +28,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.ServerNode;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
@ -65,7 +65,7 @@ import java.util.Properties;
/**
*/
public class RealtimeNode extends BaseServerNode<RealtimeNode>
public class RealtimeNode extends ServerNode<RealtimeNode>
{
private static final Logger log = new Logger(RealtimeNode.class);

View File

@ -38,13 +38,13 @@ import java.util.Properties;
/**
*/
public abstract class BaseServerNode<T extends BaseNode> extends BaseNode<T>
public abstract class ServerNode<T extends QueryableNode> extends QueryableNode<T>
{
private final Map<Class<? extends Query>, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap();
private QueryRunnerFactoryConglomerate conglomerate = null;
private StupidPool<ByteBuffer> computeScratchPool = null;
public BaseServerNode(
public ServerNode(
Logger log,
Properties props,
Lifecycle lifecycle,

View File

@ -28,7 +28,7 @@ import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.ServerNode;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.coordination.ServerManager;
@ -59,7 +59,7 @@ import java.util.concurrent.ExecutorService;
/**
*/
public class ComputeNode extends BaseServerNode<ComputeNode>
public class ComputeNode extends ServerNode<ComputeNode>
{
private static final Logger log = new Logger(ComputeNode.class);