1) Refactor the announcement of segments to all exist inside the DataSegmentAnnouncer

2) Adjust the ExecutorNodes to expose the correct nodeType given the refactorings
This commit is contained in:
cheddar 2013-04-23 17:54:49 -05:00
parent 7370b0f2fc
commit f71b941a1a
26 changed files with 201 additions and 260 deletions

View File

@ -1,46 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
/**
*/
@Deprecated
public abstract class BaseNode<T extends BaseNode> extends QueryableNode
{
protected BaseNode(
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}

View File

@ -33,14 +33,19 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.ServerInventoryThingie;
import com.metamx.druid.client.ServerInventoryThingieConfig;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -78,12 +83,15 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private final ObjectMapper smileMapper;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private final String nodeType;
private DruidServer druidServer = null;
private ServiceEmitter emitter = null;
private List<Monitor> monitors = null;
private Server server = null;
private CuratorFramework curator = null;
private Announcer announcer = null;
private DataSegmentAnnouncer announcer = null;
private ZkPathsConfig zkPaths = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private RequestLogger requestLogger = null;
private ServerInventoryThingie serverInventoryThingie = null;
@ -91,6 +99,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private boolean initialized = false;
public QueryableNode(
String nodeType,
Logger log,
Properties props,
Lifecycle lifecycle,
@ -115,6 +124,13 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
Preconditions.checkNotNull(configFactory, "configFactory");
Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile.");
this.nodeType = nodeType;
}
public T setDruidServer(DruidServer druidServer)
{
checkFieldNotSetAndSet("druidServer", druidServer);
return (T) this;
}
@SuppressWarnings("unchecked")
@ -125,7 +141,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
}
@SuppressWarnings("unchecked")
public T setAnnouncer(Announcer announcer)
public T setAnnouncer(DataSegmentAnnouncer announcer)
{
checkFieldNotSetAndSet("announcer", announcer);
return (T) this;
@ -152,6 +168,13 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return (T) this;
}
@SuppressWarnings("unchecked")
public T setZkPaths(ZkPathsConfig zkPaths)
{
checkFieldNotSetAndSet("zkPaths", zkPaths);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setScheduledExecutorFactory(ScheduledExecutorFactory factory)
{
@ -214,13 +237,19 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return configFactory;
}
public DruidServer getDruidServer()
{
initializeDruidServer();
return druidServer;
}
public CuratorFramework getCuratorFramework()
{
initializeCuratorFramework();
return curator;
}
public Announcer getAnnouncer()
public DataSegmentAnnouncer getAnnouncer()
{
initializeAnnouncer();
return announcer;
@ -244,6 +273,12 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return server;
}
public ZkPathsConfig getZkPaths()
{
initializeZkPaths();
return zkPaths;
}
public ScheduledExecutorFactory getScheduledExecutorFactory()
{
initializeScheduledExecutorFactory();
@ -262,6 +297,13 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return serverInventoryThingie;
}
private void initializeDruidServer()
{
if (druidServer == null) {
setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), nodeType));
}
}
private void initializeServerInventoryThingie()
{
if (serverInventoryThingie == null) {
@ -306,6 +348,13 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
}
}
private void initializeZkPaths()
{
if (zkPaths == null) {
setZkPaths(getConfigFactory().build(ZkPathsConfig.class));
}
}
private void initializeScheduledExecutorFactory()
{
if (scheduledExecutorFactory == null) {
@ -328,7 +377,15 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private void initializeAnnouncer()
{
if (announcer == null) {
setAnnouncer(new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")));
setAnnouncer(
new CuratorDataSegmentAnnouncer(
getDruidServer(),
getZkPaths(),
new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")),
getJsonMapper()
)
);
lifecycle.addManagedInstance(getAnnouncer());
}
}

View File

@ -17,60 +17,50 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.curator.announcement.Announcer;
import com.netflix.curator.framework.CuratorFramework;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
public class CuratorSegmentAnnouncer implements SegmentAnnouncer
public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private static final Logger log = new Logger(CuratorSegmentAnnouncer.class);
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class);
private final Object lock = new Object();
private final ZkSegmentAnnouncerConfig config;
private final DruidServer server;
private final ZkPathsConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation;
private volatile boolean started = false;
public CuratorSegmentAnnouncer(
ZkSegmentAnnouncerConfig config,
public CuratorDataSegmentAnnouncer(
DruidServer server,
ZkPathsConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
this.server = server;
this.config = config;
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), config.getServerName());
}
private Map<String, String> getStringProps()
{
return ImmutableMap.of(
"name", config.getServerName(),
"host", config.getHost(),
"maxSize", String.valueOf(config.getMaxSize()),
"type", config.getServerType()
);
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
}
@LifecycleStart
@ -81,9 +71,9 @@ public class CuratorSegmentAnnouncer implements SegmentAnnouncer
return;
}
log.info("Starting CuratorSegmentAnnouncer for server[%s] with config[%s]", config.getServerName(), config);
log.info("Starting CuratorDataSegmentAnnouncer for server[%s] with config[%s]", server.getName(), config);
try {
announcer.announce(makeAnnouncementPath(), jsonMapper.writeValueAsBytes(getStringProps()));
announcer.announce(makeAnnouncementPath(), jsonMapper.writeValueAsBytes(server));
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
@ -101,7 +91,7 @@ public class CuratorSegmentAnnouncer implements SegmentAnnouncer
return;
}
log.info("Stopping CuratorSegmentAnnouncer with config[%s]", config);
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
announcer.unannounce(makeAnnouncementPath());
started = false;
@ -121,7 +111,7 @@ public class CuratorSegmentAnnouncer implements SegmentAnnouncer
}
private String makeAnnouncementPath() {
return ZKPaths.makePath(config.getAnnouncementsPath(), config.getServerName());
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
}
private String makeServedSegmentPath(DataSegment segment)

View File

@ -1,10 +1,10 @@
package com.metamx.druid.realtime;
package com.metamx.druid.coordination;
import com.metamx.druid.client.DataSegment;
import java.io.IOException;
public interface SegmentAnnouncer
public interface DataSegmentAnnouncer
{
public void announceSegment(DataSegment segment) throws IOException;
public void unannounceSegment(DataSegment segment) throws IOException;

View File

@ -96,7 +96,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
super("broker", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryToolChestWarehouse getWarehouse()

View File

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import java.io.File;
@ -30,8 +30,8 @@ public class RealtimeStandaloneMain
RealtimeNode rn = RealtimeNode.builder().build();
lifecycle.addManagedInstance(rn);
SegmentAnnouncer dummySegmentAnnouncer =
new SegmentAnnouncer()
DataSegmentAnnouncer dummySegmentAnnouncer =
new DataSegmentAnnouncer()
{
@Override
public void announceSegment(DataSegment segment) throws IOException
@ -56,7 +56,7 @@ public class RealtimeStandaloneMain
};
// dummySegmentPublisher will not send updates to db because standalone demo has no db
rn.setSegmentAnnouncer(dummySegmentAnnouncer);
rn.setAnnouncer(dummySegmentAnnouncer);
rn.setSegmentPublisher(dummySegmentPublisher);
rn.setDataSegmentPusher(
new DataSegmentPusher()

View File

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
@ -32,8 +32,8 @@ public class RealtimeStandaloneMain
// register the Firehose
rn.registerJacksonSubtype(new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"));
final SegmentAnnouncer dummySegmentAnnouncer =
new SegmentAnnouncer()
final DataSegmentAnnouncer dummySegmentAnnouncer =
new DataSegmentAnnouncer()
{
@Override
public void announceSegment(DataSegment segment) throws IOException
@ -58,8 +58,8 @@ public class RealtimeStandaloneMain
};
// dummySegmentPublisher will not send updates to db because standalone demo has no db
rn.setSegmentAnnouncer(dummySegmentAnnouncer);
rn.setSegmentPublisher(dummySegmentPublisher);
rn.setAnnouncer(dummySegmentAnnouncer);
rn.setDataSegmentPusher(
new DataSegmentPusher()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
@ -35,7 +36,6 @@ import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -55,7 +55,7 @@ public class TaskToolbox
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final SegmentAnnouncer segmentAnnouncer;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ObjectMapper objectMapper;
@ -68,7 +68,7 @@ public class TaskToolbox
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
SegmentAnnouncer segmentAnnouncer,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ObjectMapper objectMapper
@ -112,7 +112,7 @@ public class TaskToolbox
return dataSegmentKiller;
}
public SegmentAnnouncer getSegmentAnnouncer()
public DataSegmentAnnouncer getSegmentAnnouncer()
{
return segmentAnnouncer;
}

View File

@ -21,13 +21,13 @@ package com.metamx.druid.merger.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -42,7 +42,7 @@ public class TaskToolboxFactory
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final SegmentAnnouncer segmentAnnouncer;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ObjectMapper objectMapper;
@ -54,7 +54,7 @@ public class TaskToolboxFactory
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
SegmentAnnouncer segmentAnnouncer,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ObjectMapper objectMapper
@ -72,11 +72,6 @@ public class TaskToolboxFactory
this.objectMapper = objectMapper;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
public TaskToolbox build(Task task)
{
return new TaskToolbox(

View File

@ -75,6 +75,12 @@ public abstract class AbstractTask implements Task
return groupId;
}
@Override
public String getNodeType()
{
return null;
}
@JsonProperty
@Override
public String getDataSource()

View File

@ -47,7 +47,7 @@ import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.realtime.plumber.VersioningPolicy;
@ -131,6 +131,12 @@ public class RealtimeIndexTask extends AbstractTask
return "index_realtime";
}
@Override
public String getNodeType()
{
return "realtime";
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
@ -192,8 +198,8 @@ public class RealtimeIndexTask extends AbstractTask
// with the coordinator. Right now, we'll block/throw in whatever thread triggered the coordinator behavior,
// which will typically be either the main data processing loop or the persist thread.
// Wrap default SegmentAnnouncer such that we unlock intervals as we unannounce segments
final SegmentAnnouncer lockingSegmentAnnouncer = new SegmentAnnouncer()
// Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
{
@Override
public void announceSegment(final DataSegment segment) throws IOException

View File

@ -76,6 +76,14 @@ public interface Task
*/
public String getType();
/**
* Get the nodeType for if/when this task publishes on zookeeper.
*
* @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
* publish to zookeeper.
*/
public String getNodeType();
/**
* Returns the datasource this task operates on. Each task can operate on only one datasource.
*/

View File

@ -170,6 +170,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
command.add(String.format("-Ddruid.port=%d", childPort));
command.add(config.getMainClass());
command.add(defaultNodeType(task));
command.add(taskFile.toString());
command.add(statusFile.toString());
@ -259,6 +260,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
}
private String defaultNodeType(Task task)
{
final String nodeType = task.getNodeType();
return nodeType == null ? "indexer-executor" : nodeType;
}
@LifecycleStop
public void stop()
{

View File

@ -173,7 +173,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
super("index-coordinator", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public IndexerCoordinatorNode setEmitter(ServiceEmitter emitter)

View File

@ -24,6 +24,8 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
/**
*/
@ -35,16 +37,22 @@ public class ExecutorMain
{
LogLevelAdjuster.register();
if (args.length != 2) {
log.info("Usage: ExecutorMain <task.json> <status.json>");
if (args.length != 3) {
log.info("Usage: ExecutorMain <nodeType> <task.json> <status.json>");
System.exit(2);
}
Iterator<String> arguments = Arrays.asList(args).iterator();
final String nodeType = arguments.next();
final String taskJsonFile = arguments.next();
final String statusJsonFile = arguments.next();
final ExecutorNode node = ExecutorNode.builder()
.build(
nodeType,
new ExecutorLifecycleFactory(
new File(args[0]),
new File(args[1]),
new File(taskJsonFile),
new File(statusJsonFile),
System.in
)
);

View File

@ -33,8 +33,6 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
@ -53,9 +51,9 @@ import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.realtime.CuratorSegmentAnnouncer;
import com.metamx.druid.realtime.RealtimeZkSegmentAnnouncerConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.realtime.RealtimeCuratorDataSegmentAnnouncerConfig;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -114,6 +112,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private ExecutorLifecycle executorLifecycle = null;
public ExecutorNode(
String nodeType,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
@ -122,7 +121,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
ExecutorLifecycleFactory executorLifecycleFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
super(nodeType, log, props, lifecycle, jsonMapper, smileMapper, configFactory);
this.lifecycle = lifecycle;
this.props = props;
@ -192,10 +191,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
monitors
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, monitors
);
lifecycle.addManagedInstance(monitorScheduler);
@ -346,12 +342,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{
if (taskToolboxFactory == null) {
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
final SegmentAnnouncer segmentAnnouncer = new CuratorSegmentAnnouncer(
configFactory.build(RealtimeZkSegmentAnnouncerConfig.class),
getAnnouncer(),
getJsonMapper()
);
lifecycle.addManagedInstance(segmentAnnouncer);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClientFactory(
@ -369,7 +359,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
s3Service,
segmentPusher,
dataSegmentKiller,
segmentAnnouncer,
getAnnouncer(),
getServerInventoryThingie(),
getConglomerate(),
getJsonMapper()
@ -410,7 +400,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
.build()
)
)
);;
);
}
}
@ -446,7 +436,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
return this;
}
public ExecutorNode build(ExecutorLifecycleFactory executorLifecycleFactory)
public ExecutorNode build(String nodeType, ExecutorLifecycleFactory executorLifecycleFactory)
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
@ -469,7 +459,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
configFactory = Config.createFactory(props);
}
return new ExecutorNode(props, lifecycle, jsonMapper, smileMapper, configFactory, executorLifecycleFactory);
return new ExecutorNode(nodeType, props, lifecycle, jsonMapper, smileMapper, configFactory, executorLifecycleFactory);
}
}
}

View File

@ -117,7 +117,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
super("indexer-worker", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public WorkerNode setHttpClient(HttpClient httpClient)

View File

@ -21,11 +21,6 @@ package com.metamx.druid.realtime;
/**
*/
public abstract class RealtimeZkSegmentAnnouncerConfig extends ZkSegmentAnnouncerConfig
public abstract class RealtimeCuratorDataSegmentAnnouncerConfig
{
@Override
public final String getServerType()
{
return "realtime";
}
}

View File

@ -35,9 +35,9 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncerConfig;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.QueryServlet;
@ -73,7 +73,6 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();
private SegmentAnnouncer segmentAnnouncer = null;
private SegmentPublisher segmentPublisher = null;
private DataSegmentPusher dataSegmentPusher = null;
private List<FireDepartment> fireDepartments = null;
@ -88,34 +87,24 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public RealtimeNode setSegmentAnnouncer(SegmentAnnouncer segmentAnnouncer)
{
Preconditions.checkState(this.segmentAnnouncer == null, "Cannot set segmentAnnouncer once it has already been set.");
this.segmentAnnouncer = segmentAnnouncer;
return this;
super("realtime", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public RealtimeNode setSegmentPublisher(SegmentPublisher segmentPublisher)
{
Preconditions.checkState(this.segmentPublisher == null, "Cannot set segmentPublisher once it has already been set.");
this.segmentPublisher = segmentPublisher;
checkFieldNotSetAndSet("segmentPublisher", segmentPublisher);
return this;
}
public RealtimeNode setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
Preconditions.checkState(this.dataSegmentPusher == null, "Cannot set segmentPusher once it has already been set.");
this.dataSegmentPusher = dataSegmentPusher;
checkFieldNotSetAndSet("dataSegmentPusher", dataSegmentPusher);
return this;
}
public RealtimeNode setFireDepartments(List<FireDepartment> fireDepartments)
{
Preconditions.checkState(this.fireDepartments == null, "Cannot set fireDepartments once it has already been set.");
this.fireDepartments = fireDepartments;
checkFieldNotSetAndSet("fireDepartments", fireDepartments);
return this;
}
@ -126,12 +115,6 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
return this;
}
public SegmentAnnouncer getSegmentAnnouncer()
{
initializeSegmentAnnouncer();
return segmentAnnouncer;
}
public SegmentPublisher getSegmentPublisher()
{
initializeSegmentPublisher();
@ -152,21 +135,17 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
protected void doInit() throws Exception
{
initializeSegmentAnnouncer();
initializeSegmentPublisher();
initializeSegmentPusher();
initializeJacksonInjectables();
initializeFireDepartments();
final Lifecycle lifecycle = getLifecycle();
final ServiceEmitter emitter = getEmitter();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final List<Monitor> monitors = getMonitors();
final List<FireDepartment> departments = getFireDepartments();
monitors.add(new RealtimeMetricsMonitor(fireDepartments));
monitors.add(new RealtimeMetricsMonitor(departments));
final RealtimeManager realtimeManager = new RealtimeManager(fireDepartments, conglomerate);
final RealtimeManager realtimeManager = new RealtimeManager(departments, conglomerate);
lifecycle.addManagedInstance(realtimeManager);
startMonitoring(monitors);
@ -208,9 +187,9 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
}
injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
injectables.put("segmentPusher", dataSegmentPusher);
injectables.put("segmentAnnouncer", segmentAnnouncer);
injectables.put("segmentPublisher", segmentPublisher);
injectables.put("segmentPusher", getDataSegmentPusher());
injectables.put("segmentAnnouncer", getAnnouncer());
injectables.put("segmentPublisher", getSegmentPublisher());
injectables.put("serverView", getServerInventoryThingie());
injectables.put("serviceEmitter", getEmitter());
@ -232,9 +211,11 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
{
if (fireDepartments == null) {
try {
fireDepartments = getJsonMapper().readValue(
new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")),
new TypeReference<List<FireDepartment>>(){}
setFireDepartments(
getJsonMapper().<List<FireDepartment>>readValue(
new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")),
new TypeReference<List<FireDepartment>>(){}
)
);
}
catch (IOException e) {
@ -250,19 +231,6 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
}
}
protected void initializeSegmentAnnouncer()
{
if (segmentAnnouncer == null) {
final ZkSegmentAnnouncerConfig zkSegmentAnnouncerConfig = getConfigFactory().build(RealtimeZkSegmentAnnouncerConfig.class);
segmentAnnouncer = new CuratorSegmentAnnouncer(
zkSegmentAnnouncerConfig,
getAnnouncer(),
getJsonMapper()
);
getLifecycle().addManagedInstance(segmentAnnouncer);
}
}
protected void initializeSegmentPublisher()
{
if (segmentPublisher == null) {

View File

@ -1,26 +0,0 @@
package com.metamx.druid.realtime;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class ZkSegmentAnnouncerConfig
{
@Config("druid.host")
public abstract String getServerName();
@Config("druid.host")
public abstract String getHost();
@Config("druid.server.maxSize")
@Default("0")
public abstract long getMaxSize();
public abstract String getServerType();
@Config("druid.zk.paths.announcementsPath")
public abstract String getAnnouncementsPath();
@Config("druid.zk.paths.servedSegmentsPath")
public abstract String getServedSegmentsPath();
}

View File

@ -40,6 +40,7 @@ import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.guava.ThreadRenamingCallable;
import com.metamx.druid.guava.ThreadRenamingRunnable;
import com.metamx.druid.index.QueryableIndex;
@ -57,7 +58,6 @@ import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -97,7 +97,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
private volatile DataSegmentPusher dataSegmentPusher = null;
private volatile SegmentAnnouncer segmentAnnouncer = null;
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
private volatile SegmentPublisher segmentPublisher = null;
private volatile ServerView serverView = null;
@ -144,7 +144,7 @@ public class RealtimePlumberSchool implements PlumberSchool
}
@JacksonInject("segmentAnnouncer")
public void setSegmentAnnouncer(SegmentAnnouncer segmentAnnouncer)
public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer)
{
this.segmentAnnouncer = segmentAnnouncer;
}

View File

@ -47,6 +47,7 @@ public abstract class BaseServerNode<T extends QueryableNode> extends QueryableN
private StupidPool<ByteBuffer> computeScratchPool = null;
public BaseServerNode(
String nodeType,
Logger log,
Properties props,
Lifecycle lifecycle,
@ -55,7 +56,7 @@ public abstract class BaseServerNode<T extends QueryableNode> extends QueryableN
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
super(nodeType, log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryRunnerFactoryConglomerate getConglomerate()

View File

@ -0,0 +1,14 @@
package com.metamx.druid.coordination;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class CuratorDataSegmentAnnouncerConfig
{
@Config("druid.zk.paths.announcementsPath")
public abstract String getAnnouncementsPath();
@Config("druid.zk.paths.servedSegmentsPath")
public abstract String getServedSegmentsPath();
}

View File

@ -54,7 +54,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper;
private final ZkCoordinatorConfig config;
private final DruidServer me;
private final Announcer announcer;
private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator;
private final ServerManager serverManager;
private final ServiceEmitter emitter;
@ -69,7 +69,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
ObjectMapper jsonMapper,
ZkCoordinatorConfig config,
DruidServer me,
Announcer announcer,
DataSegmentAnnouncer announcer,
CuratorFramework curator,
ServerManager serverManager,
ServiceEmitter emitter
@ -112,11 +112,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
loadCache();
announcer.announce(
ZKPaths.makePath(config.getAnnounceLocation(), me.getName()),
jsonMapper.writeValueAsBytes(me.getStringProps())
);
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@ -185,8 +180,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try {
loadQueueCache.close();
curator.delete().guaranteed().forPath(ZKPaths.makePath(config.getAnnounceLocation(), me.getName()));
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -248,36 +241,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(
"Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
announcer.announce(makeSegmentPath(segment), jsonMapper.writeValueAsBytes(segment));
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment)
.emit();
}
catch (JsonProcessingException e) {
log.makeAlert(e, "WTF, exception writing into byte[]???")
.addData("segment", segment)
.emit();
}
}
@Override
public void removeSegment(DataSegment segment)
{
try {
announcer.unannounce(makeSegmentPath(segment));
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
announcer.unannounceSegment(segment);
}
catch (Exception e) {
log.makeAlert("Failed to remove segment")
@ -285,9 +279,4 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.emit();
}
}
private String makeSegmentPath(DataSegment segment)
{
return ZKPaths.makePath(servedSegmentsLocation, segment.getIdentifier());
}
}

View File

@ -72,7 +72,6 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
return new Builder();
}
private DruidServer druidServer;
private SegmentLoader segmentLoader;
public ComputeNode(
@ -83,7 +82,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
super("historical", log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public ComputeNode setSegmentLoader(SegmentLoader segmentLoader)
@ -93,19 +92,6 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
return this;
}
public ComputeNode setDruidServer(DruidServer druidServer)
{
Preconditions.checkState(this.druidServer == null, "Cannot set druidServer once it has already been set.");
this.druidServer = druidServer;
return this;
}
public DruidServer getDruidServer()
{
initializeDruidServer();
return druidServer;
}
public SegmentLoader getSegmentLoader()
{
initializeSegmentLoader();
@ -178,13 +164,6 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
}
}
private void initializeDruidServer()
{
if (druidServer == null) {
setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), "historical"));
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;

View File

@ -52,7 +52,7 @@ public class ZkCoordinatorTest
{
private ZkCoordinator zkCoordinator;
private ServerManager serverManager;
private Announcer announcer;
private DataSegmentAnnouncer announcer;
private CuratorFramework curator;
private File cacheDir;
private final ObjectMapper jsonMapper = new DefaultObjectMapper();