Merge pull request #28 from metamx/fjy

Some modifications on how to register things in Druid
This commit is contained in:
cheddar 2012-11-13 15:02:14 -08:00
commit 06ed0e7448
10 changed files with 492 additions and 395 deletions

View File

@ -19,70 +19,19 @@
package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.codehaus.jackson.smile.SmileFactory;
import org.mortbay.jetty.Server;
import org.skife.config.ConfigurationObjectFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class BaseNode<T extends BaseNode>
@Deprecated
public abstract class BaseNode<T extends BaseNode> extends QueryableNode
{
private final Logger log;
private final Lifecycle lifecycle;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private PhoneBook phoneBook = null;
private ServiceEmitter emitter = null;
private List<Monitor> monitors = null;
private Server server = null;
private ZkClient zkClient;
private ScheduledExecutorFactory scheduledExecutorFactory;
private RequestLogger requestLogger;
private boolean initialized = false;
public BaseNode(
protected BaseNode(
Logger log,
Properties props,
Lifecycle lifecycle,
@ -91,323 +40,6 @@ public abstract class BaseNode<T extends BaseNode>
ConfigurationObjectFactory configFactory
)
{
this.log = log;
this.configFactory = configFactory;
this.props = props;
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.smileMapper = smileMapper;
Preconditions.checkNotNull(props, "props");
Preconditions.checkNotNull(lifecycle, "lifecycle");
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
Preconditions.checkNotNull(smileMapper, "smileMapper");
Preconditions.checkNotNull(configFactory, "configFactory");
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
}
@SuppressWarnings("unchecked")
public T setZkClient(ZkClient zkClient)
{
checkFieldNotSetAndSet("zkClient", zkClient);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setPhoneBook(PhoneBook phoneBook)
{
checkFieldNotSetAndSet("phoneBook", phoneBook);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setEmitter(ServiceEmitter emitter)
{
checkFieldNotSetAndSet("emitter", emitter);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setMonitors(List<Monitor> monitors)
{
checkFieldNotSetAndSet("monitors", monitors);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setServer(Server server)
{
checkFieldNotSetAndSet("server", server);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setScheduledExecutorFactory(ScheduledExecutorFactory factory)
{
checkFieldNotSetAndSet("scheduledExecutorFactory", factory);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setRequestLogger(RequestLogger requestLogger)
{
checkFieldNotSetAndSet("requestLogger", requestLogger);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(Class<?>... clazzes)
{
jsonMapper.registerSubtypes(clazzes);
smileMapper.registerSubtypes(clazzes);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(NamedType... namedTypes)
{
jsonMapper.registerSubtypes(namedTypes);
smileMapper.registerSubtypes(namedTypes);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerHandler(Registererer registererer)
{
registererer.register();
return (T) this;
}
public Lifecycle getLifecycle()
{
return lifecycle;
}
public ObjectMapper getJsonMapper()
{
return jsonMapper;
}
public ObjectMapper getSmileMapper()
{
return smileMapper;
}
public Properties getProps()
{
return props;
}
public ConfigurationObjectFactory getConfigFactory()
{
return configFactory;
}
public ZkClient getZkClient()
{
initializeZkClient();
return zkClient;
}
public PhoneBook getPhoneBook()
{
initializePhoneBook();
return phoneBook;
}
public ServiceEmitter getEmitter()
{
initializeEmitter();
return emitter;
}
public List<Monitor> getMonitors()
{
initializeMonitors();
return monitors;
}
public Server getServer()
{
initializeServer();
return server;
}
public ScheduledExecutorFactory getScheduledExecutorFactory()
{
initializeScheduledExecutorFactory();
return scheduledExecutorFactory;
}
public RequestLogger getRequestLogger()
{
initializeRequestLogger();
return requestLogger;
}
private void initializeRequestLogger()
{
if (requestLogger == null) {
try {
setRequestLogger(Initialization.makeRequestLogger(getScheduledExecutorFactory(), getProps()));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
lifecycle.addManagedInstance(requestLogger);
}
}
private void initializeScheduledExecutorFactory()
{
if (scheduledExecutorFactory == null) {
setScheduledExecutorFactory(ScheduledExecutors.createFactory(getLifecycle()));
}
}
private void initializeZkClient()
{
if (zkClient == null) {
setZkClient(Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle));
}
}
private void initializePhoneBook()
{
if (phoneBook == null) {
setPhoneBook(
Initialization.createPhoneBook(
jsonMapper,
getZkClient(),
"PhoneBook--%s",
lifecycle
)
);
}
}
private void initializeServer()
{
if (server == null) {
setServer(Initialization.makeJettyServer(configFactory.build(ServerConfig.class)));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
List<Monitor> theMonitors = Lists.newArrayList();
theMonitors.add(new JvmMonitor());
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "false"))) {
theMonitors.add(new SysMonitor());
}
setMonitors(theMonitors);
}
}
private void initializeEmitter()
{
if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
setEmitter(
new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
)
);
}
EmittingLogger.registerEmitter(emitter);
}
protected void init() throws Exception
{
doInit();
initialized = true;
}
protected abstract void doInit() throws Exception;
@LifecycleStart
public synchronized void start() throws Exception
{
if (! initialized) {
init();
}
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
protected ScheduledExecutorService startMonitoring(List<Monitor> monitors)
{
final ScheduledExecutorService globalScheduledExec = getScheduledExecutorFactory().create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
getConfigFactory().build(MonitorSchedulerConfig.class),
globalScheduledExec,
getEmitter(),
monitors
);
getLifecycle().addManagedInstance(monitorScheduler);
return globalScheduledExec;
}
protected void checkFieldNotSetAndSet(String fieldName, Object value)
{
Class<?> theClazz = this.getClass();
while (theClazz != null && theClazz != Object.class) {
try {
final Field field = theClazz.getDeclaredField(fieldName);
field.setAccessible(true);
Preconditions.checkState(field.get(this) == null, "Cannot set %s once it has already been set.", fieldName);
field.set(this, value);
return;
}
catch (NoSuchFieldException e) {
// Perhaps it is inherited?
theClazz = theClazz.getSuperclass();
}
catch (IllegalAccessException e) {
throw Throwables.propagate(e);
}
}
throw new ISE("Unknown field[%s] on class[%s]", fieldName, this.getClass());
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}

View File

@ -0,0 +1,407 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import org.I0Itec.zkclient.ZkClient;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType;
import org.codehaus.jackson.smile.SmileFactory;
import org.mortbay.jetty.Server;
import org.skife.config.ConfigurationObjectFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public abstract class QueryableNode<T extends QueryableNode> extends RegisteringNode
{
private final Logger log;
private final Lifecycle lifecycle;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private PhoneBook phoneBook = null;
private ServiceEmitter emitter = null;
private List<Monitor> monitors = null;
private Server server = null;
private ZkClient zkClient;
private ScheduledExecutorFactory scheduledExecutorFactory;
private RequestLogger requestLogger;
private boolean initialized = false;
public QueryableNode(
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(Arrays.asList(jsonMapper, smileMapper));
this.log = log;
this.configFactory = configFactory;
this.props = props;
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.smileMapper = smileMapper;
Preconditions.checkNotNull(props, "props");
Preconditions.checkNotNull(lifecycle, "lifecycle");
Preconditions.checkNotNull(jsonMapper, "jsonMapper");
Preconditions.checkNotNull(smileMapper, "smileMapper");
Preconditions.checkNotNull(configFactory, "configFactory");
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
}
@SuppressWarnings("unchecked")
public T setZkClient(ZkClient zkClient)
{
checkFieldNotSetAndSet("zkClient", zkClient);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setPhoneBook(PhoneBook phoneBook)
{
checkFieldNotSetAndSet("phoneBook", phoneBook);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setEmitter(ServiceEmitter emitter)
{
checkFieldNotSetAndSet("emitter", emitter);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setMonitors(List<Monitor> monitors)
{
checkFieldNotSetAndSet("monitors", monitors);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setServer(Server server)
{
checkFieldNotSetAndSet("server", server);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setScheduledExecutorFactory(ScheduledExecutorFactory factory)
{
checkFieldNotSetAndSet("scheduledExecutorFactory", factory);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setRequestLogger(RequestLogger requestLogger)
{
checkFieldNotSetAndSet("requestLogger", requestLogger);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(Class<?>... clazzes)
{
jsonMapper.registerSubtypes(clazzes);
smileMapper.registerSubtypes(clazzes);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerJacksonSubtype(NamedType... namedTypes)
{
jsonMapper.registerSubtypes(namedTypes);
smileMapper.registerSubtypes(namedTypes);
return (T) this;
}
public Lifecycle getLifecycle()
{
return lifecycle;
}
public ObjectMapper getJsonMapper()
{
return jsonMapper;
}
public ObjectMapper getSmileMapper()
{
return smileMapper;
}
public Properties getProps()
{
return props;
}
public ConfigurationObjectFactory getConfigFactory()
{
return configFactory;
}
public ZkClient getZkClient()
{
initializeZkClient();
return zkClient;
}
public PhoneBook getPhoneBook()
{
initializePhoneBook();
return phoneBook;
}
public ServiceEmitter getEmitter()
{
initializeEmitter();
return emitter;
}
public List<Monitor> getMonitors()
{
initializeMonitors();
return monitors;
}
public Server getServer()
{
initializeServer();
return server;
}
public ScheduledExecutorFactory getScheduledExecutorFactory()
{
initializeScheduledExecutorFactory();
return scheduledExecutorFactory;
}
public RequestLogger getRequestLogger()
{
initializeRequestLogger();
return requestLogger;
}
private void initializeRequestLogger()
{
if (requestLogger == null) {
try {
setRequestLogger(Initialization.makeRequestLogger(getScheduledExecutorFactory(), getProps()));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
lifecycle.addManagedInstance(requestLogger);
}
}
private void initializeScheduledExecutorFactory()
{
if (scheduledExecutorFactory == null) {
setScheduledExecutorFactory(ScheduledExecutors.createFactory(getLifecycle()));
}
}
private void initializeZkClient()
{
if (zkClient == null) {
setZkClient(Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle));
}
}
private void initializePhoneBook()
{
if (phoneBook == null) {
setPhoneBook(
Initialization.createPhoneBook(
jsonMapper,
getZkClient(),
"PhoneBook--%s",
lifecycle
)
);
}
}
private void initializeServer()
{
if (server == null) {
setServer(Initialization.makeJettyServer(configFactory.build(ServerConfig.class)));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
List<Monitor> theMonitors = Lists.newArrayList();
theMonitors.add(new JvmMonitor());
if (Boolean.parseBoolean(props.getProperty("druid.monitoring.monitorSystem", "false"))) {
theMonitors.add(new SysMonitor());
}
setMonitors(theMonitors);
}
}
private void initializeEmitter()
{
if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
setEmitter(
new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
)
);
}
EmittingLogger.registerEmitter(emitter);
}
protected void init() throws Exception
{
doInit();
initialized = true;
}
protected abstract void doInit() throws Exception;
@LifecycleStart
public synchronized void start() throws Exception
{
if (! initialized) {
init();
}
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
protected ScheduledExecutorService startMonitoring(List<Monitor> monitors)
{
final ScheduledExecutorService globalScheduledExec = getScheduledExecutorFactory().create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
getConfigFactory().build(MonitorSchedulerConfig.class),
globalScheduledExec,
getEmitter(),
monitors
);
getLifecycle().addManagedInstance(monitorScheduler);
return globalScheduledExec;
}
protected void checkFieldNotSetAndSet(String fieldName, Object value)
{
Class<?> theClazz = this.getClass();
while (theClazz != null && theClazz != Object.class) {
try {
final Field field = theClazz.getDeclaredField(fieldName);
field.setAccessible(true);
Preconditions.checkState(field.get(this) == null, "Cannot set %s once it has already been set.", fieldName);
field.set(this, value);
return;
}
catch (NoSuchFieldException e) {
// Perhaps it is inherited?
theClazz = theClazz.getSuperclass();
}
catch (IllegalAccessException e) {
throw Throwables.propagate(e);
}
}
throw new ISE("Unknown field[%s] on class[%s]", fieldName, this.getClass());
}
}

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.metamx.druid.index.v1.serde.Registererer;
import org.codehaus.jackson.map.ObjectMapper;
import java.util.Arrays;
import java.util.List;
/**
*/
public class RegisteringNode
{
public static void registerHandlers(Iterable<Registererer> registererers, Iterable<ObjectMapper> mappers)
{
for (Registererer registererer : registererers) {
if (!doneRegister) {
registererer.register();
}
for (ObjectMapper mapper : mappers) {
registererer.registerSubType(mapper);
}
}
doneRegister = true;
}
private static boolean doneRegister = false;
private final List<ObjectMapper> mappers;
public RegisteringNode(List<ObjectMapper> mappers)
{
this.mappers = mappers;
}
public RegisteringNode registerHandlers(Registererer... registererers)
{
registerHandlers(Arrays.asList(registererers), mappers);
return this;
}
}

View File

@ -29,7 +29,7 @@ import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseNode;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig;
@ -62,7 +62,7 @@ import java.util.Properties;
/**
*/
public class BrokerNode extends BaseNode<BrokerNode>
public class BrokerNode extends QueryableNode<BrokerNode>
{
private static final Logger log = new Logger(BrokerNode.class);

View File

@ -19,6 +19,8 @@
package com.metamx.druid.index.v1.serde;
import org.codehaus.jackson.map.ObjectMapper;
/**
* This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
@ -30,4 +32,6 @@ package com.metamx.druid.index.v1.serde;
public interface Registererer
{
public void register();
public void registerSubType(ObjectMapper jsonMapper);
}

View File

@ -33,6 +33,7 @@ import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.indexer.data.DataSpec;
import com.metamx.druid.indexer.granularity.GranularitySpec;
@ -58,6 +59,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -104,9 +106,8 @@ public class HadoopDruidIndexerConfig
}
}
);
for (Registererer registererer : registererers) {
registererer.register();
}
RegisteringNode.registerHandlers(registererers, Arrays.asList(jsonMapper));
}
final HadoopDruidIndexerConfig retVal = jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);

View File

@ -253,7 +253,6 @@ public class IndexGeneratorJob implements Jobby
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>
{
private HadoopDruidIndexerConfig config;
private final DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
private List<String> metricNames = Lists.newArrayList();
private Function<String, DateTime> timestampConverter;
private Parser parser;
@ -573,7 +572,7 @@ public class IndexGeneratorJob implements Jobby
final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath);
try {
jsonMapper.writeValue(descriptorOut, segment);
HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
}
finally {
descriptorOut.close();

View File

@ -36,13 +36,13 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
@ -100,6 +100,7 @@ import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@ -108,7 +109,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class IndexerCoordinatorNode
public class IndexerCoordinatorNode extends RegisteringNode
{
private static final Logger log = new Logger(IndexerCoordinatorNode.class);
@ -145,6 +146,8 @@ public class IndexerCoordinatorNode
ConfigurationObjectFactory configFactory
)
{
super(Arrays.asList(jsonMapper));
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.props = props;
@ -184,12 +187,6 @@ public class IndexerCoordinatorNode
this.taskRunnerFactory = taskRunnerFactory;
}
public IndexerCoordinatorNode registerHandler(Registererer registererer)
{
registererer.register();
return this;
}
public void init() throws Exception
{
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);

View File

@ -28,6 +28,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.initialization.CuratorConfig;
@ -71,6 +72,7 @@ import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@ -79,7 +81,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class WorkerNode
public class WorkerNode extends RegisteringNode
{
private static final Logger log = new Logger(WorkerNode.class);
@ -112,6 +114,8 @@ public class WorkerNode
ConfigurationObjectFactory configFactory
)
{
super(Arrays.asList(jsonMapper));
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.props = props;
@ -148,12 +152,6 @@ public class WorkerNode
return this;
}
public WorkerNode registerHandler(Registererer registererer)
{
registererer.register();
return this;
}
public void init() throws Exception
{
initializeEmitter();

View File

@ -38,7 +38,7 @@ import java.util.Properties;
/**
*/
public abstract class BaseServerNode<T extends BaseNode> extends BaseNode<T>
public abstract class BaseServerNode<T extends QueryableNode> extends QueryableNode<T>
{
private final Map<Class<? extends Query>, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap();
private QueryRunnerFactoryConglomerate conglomerate = null;