From f71b941a1a7dc52f9135e2dfbe87c930ae676799 Mon Sep 17 00:00:00 2001 From: cheddar Date: Tue, 23 Apr 2013 17:54:49 -0500 Subject: [PATCH] 1) Refactor the announcement of segments to all exist inside the DataSegmentAnnouncer 2) Adjust the ExecutorNodes to expose the correct nodeType given the refactorings --- .../main/java/com/metamx/druid/BaseNode.java | 46 ------------- .../java/com/metamx/druid/QueryableNode.java | 65 ++++++++++++++++-- .../CuratorDataSegmentAnnouncer.java | 42 +++++------- .../coordination/DataSegmentAnnouncer.java | 4 +- .../com/metamx/druid/http/BrokerNode.java | 2 +- .../examples/RealtimeStandaloneMain.java | 8 +-- .../examples/RealtimeStandaloneMain.java | 8 +-- .../druid/merger/common/TaskToolbox.java | 8 +-- .../merger/common/TaskToolboxFactory.java | 11 +-- .../merger/common/task/AbstractTask.java | 6 ++ .../merger/common/task/RealtimeIndexTask.java | 12 +++- .../metamx/druid/merger/common/task/Task.java | 8 +++ .../merger/coordinator/ForkingTaskRunner.java | 7 ++ .../http/IndexerCoordinatorNode.java | 2 +- .../merger/worker/executor/ExecutorMain.java | 16 +++-- .../merger/worker/executor/ExecutorNode.java | 30 +++----- .../druid/merger/worker/http/WorkerNode.java | 2 +- ...imeCuratorDataSegmentAnnouncerConfig.java} | 7 +- .../metamx/druid/realtime/RealtimeNode.java | 68 +++++-------------- .../realtime/ZkSegmentAnnouncerConfig.java | 26 ------- .../plumber/RealtimePlumberSchool.java | 6 +- .../java/com/metamx/druid/BaseServerNode.java | 3 +- .../CuratorDataSegmentAnnouncerConfig.java | 14 ++++ .../druid/coordination/ZkCoordinator.java | 35 ++++------ .../com/metamx/druid/http/ComputeNode.java | 23 +------ .../druid/coordination/ZkCoordinatorTest.java | 2 +- 26 files changed, 201 insertions(+), 260 deletions(-) delete mode 100644 client/src/main/java/com/metamx/druid/BaseNode.java rename realtime/src/main/java/com/metamx/druid/realtime/CuratorSegmentAnnouncer.java => client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java (72%) rename realtime/src/main/java/com/metamx/druid/realtime/SegmentAnnouncer.java => client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java (74%) rename realtime/src/main/java/com/metamx/druid/realtime/{RealtimeZkSegmentAnnouncerConfig.java => RealtimeCuratorDataSegmentAnnouncerConfig.java} (83%) delete mode 100644 realtime/src/main/java/com/metamx/druid/realtime/ZkSegmentAnnouncerConfig.java create mode 100644 server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java diff --git a/client/src/main/java/com/metamx/druid/BaseNode.java b/client/src/main/java/com/metamx/druid/BaseNode.java deleted file mode 100644 index 8902d14a65f..00000000000 --- a/client/src/main/java/com/metamx/druid/BaseNode.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; - -import org.skife.config.ConfigurationObjectFactory; - -import java.util.Properties; - -/** - */ -@Deprecated -public abstract class BaseNode extends QueryableNode -{ - protected BaseNode( - Logger log, - Properties props, - Lifecycle lifecycle, - ObjectMapper jsonMapper, - ObjectMapper smileMapper, - ConfigurationObjectFactory configFactory - ) - { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); - } -} diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 26706b6ffff..e7a7ca0595b 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -33,14 +33,19 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DruidServer; +import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.client.ServerInventoryThingie; import com.metamx.druid.client.ServerInventoryThingieConfig; import com.metamx.druid.concurrent.Execs; +import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -78,12 +83,15 @@ public abstract class QueryableNode extends Registering private final ObjectMapper smileMapper; private final Properties props; private final ConfigurationObjectFactory configFactory; + private final String nodeType; + private DruidServer druidServer = null; private ServiceEmitter emitter = null; private List monitors = null; private Server server = null; private CuratorFramework curator = null; - private Announcer announcer = null; + private DataSegmentAnnouncer announcer = null; + private ZkPathsConfig zkPaths = null; private ScheduledExecutorFactory scheduledExecutorFactory = null; private RequestLogger requestLogger = null; private ServerInventoryThingie serverInventoryThingie = null; @@ -91,6 +99,7 @@ public abstract class QueryableNode extends Registering private boolean initialized = false; public QueryableNode( + String nodeType, Logger log, Properties props, Lifecycle lifecycle, @@ -115,6 +124,13 @@ public abstract class QueryableNode extends Registering Preconditions.checkNotNull(configFactory, "configFactory"); Preconditions.checkState(smileMapper.getJsonFactory() instanceof SmileFactory, "smileMapper should use smile."); + this.nodeType = nodeType; + } + + public T setDruidServer(DruidServer druidServer) + { + checkFieldNotSetAndSet("druidServer", druidServer); + return (T) this; } @SuppressWarnings("unchecked") @@ -125,7 +141,7 @@ public abstract class QueryableNode extends Registering } @SuppressWarnings("unchecked") - public T setAnnouncer(Announcer announcer) + public T setAnnouncer(DataSegmentAnnouncer announcer) { checkFieldNotSetAndSet("announcer", announcer); return (T) this; @@ -152,6 +168,13 @@ public abstract class QueryableNode extends Registering return (T) this; } + @SuppressWarnings("unchecked") + public T setZkPaths(ZkPathsConfig zkPaths) + { + checkFieldNotSetAndSet("zkPaths", zkPaths); + return (T) this; + } + @SuppressWarnings("unchecked") public T setScheduledExecutorFactory(ScheduledExecutorFactory factory) { @@ -214,13 +237,19 @@ public abstract class QueryableNode extends Registering return configFactory; } + public DruidServer getDruidServer() + { + initializeDruidServer(); + return druidServer; + } + public CuratorFramework getCuratorFramework() { initializeCuratorFramework(); return curator; } - public Announcer getAnnouncer() + public DataSegmentAnnouncer getAnnouncer() { initializeAnnouncer(); return announcer; @@ -244,6 +273,12 @@ public abstract class QueryableNode extends Registering return server; } + public ZkPathsConfig getZkPaths() + { + initializeZkPaths(); + return zkPaths; + } + public ScheduledExecutorFactory getScheduledExecutorFactory() { initializeScheduledExecutorFactory(); @@ -262,6 +297,13 @@ public abstract class QueryableNode extends Registering return serverInventoryThingie; } + private void initializeDruidServer() + { + if (druidServer == null) { + setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), nodeType)); + } + } + private void initializeServerInventoryThingie() { if (serverInventoryThingie == null) { @@ -306,6 +348,13 @@ public abstract class QueryableNode extends Registering } } + private void initializeZkPaths() + { + if (zkPaths == null) { + setZkPaths(getConfigFactory().build(ZkPathsConfig.class)); + } + } + private void initializeScheduledExecutorFactory() { if (scheduledExecutorFactory == null) { @@ -328,7 +377,15 @@ public abstract class QueryableNode extends Registering private void initializeAnnouncer() { if (announcer == null) { - setAnnouncer(new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"))); + setAnnouncer( + new CuratorDataSegmentAnnouncer( + getDruidServer(), + getZkPaths(), + new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")), + getJsonMapper() + ) + ); + lifecycle.addManagedInstance(getAnnouncer()); } } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/CuratorSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java similarity index 72% rename from realtime/src/main/java/com/metamx/druid/realtime/CuratorSegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java index da092daff9b..38d9981e29d 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/CuratorSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java @@ -17,60 +17,50 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.realtime; +package com.metamx.druid.coordination; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidServer; import com.metamx.druid.curator.announcement.Announcer; -import com.netflix.curator.framework.CuratorFramework; +import com.metamx.druid.initialization.ZkPathsConfig; import com.netflix.curator.utils.ZKPaths; -import org.apache.zookeeper.CreateMode; -import org.joda.time.DateTime; import java.io.IOException; -import java.util.Arrays; import java.util.Map; -public class CuratorSegmentAnnouncer implements SegmentAnnouncer +public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer { - private static final Logger log = new Logger(CuratorSegmentAnnouncer.class); + private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); private final Object lock = new Object(); - private final ZkSegmentAnnouncerConfig config; + private final DruidServer server; + private final ZkPathsConfig config; private final Announcer announcer; private final ObjectMapper jsonMapper; private final String servedSegmentsLocation; private volatile boolean started = false; - public CuratorSegmentAnnouncer( - ZkSegmentAnnouncerConfig config, + public CuratorDataSegmentAnnouncer( + DruidServer server, + ZkPathsConfig config, Announcer announcer, ObjectMapper jsonMapper ) { + this.server = server; this.config = config; this.announcer = announcer; this.jsonMapper = jsonMapper; - this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), config.getServerName()); - } - - private Map getStringProps() - { - return ImmutableMap.of( - "name", config.getServerName(), - "host", config.getHost(), - "maxSize", String.valueOf(config.getMaxSize()), - "type", config.getServerType() - ); + this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName()); } @LifecycleStart @@ -81,9 +71,9 @@ public class CuratorSegmentAnnouncer implements SegmentAnnouncer return; } - log.info("Starting CuratorSegmentAnnouncer for server[%s] with config[%s]", config.getServerName(), config); + log.info("Starting CuratorDataSegmentAnnouncer for server[%s] with config[%s]", server.getName(), config); try { - announcer.announce(makeAnnouncementPath(), jsonMapper.writeValueAsBytes(getStringProps())); + announcer.announce(makeAnnouncementPath(), jsonMapper.writeValueAsBytes(server)); } catch (JsonProcessingException e) { throw Throwables.propagate(e); @@ -101,7 +91,7 @@ public class CuratorSegmentAnnouncer implements SegmentAnnouncer return; } - log.info("Stopping CuratorSegmentAnnouncer with config[%s]", config); + log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); announcer.unannounce(makeAnnouncementPath()); started = false; @@ -121,7 +111,7 @@ public class CuratorSegmentAnnouncer implements SegmentAnnouncer } private String makeAnnouncementPath() { - return ZKPaths.makePath(config.getAnnouncementsPath(), config.getServerName()); + return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); } private String makeServedSegmentPath(DataSegment segment) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/SegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java similarity index 74% rename from realtime/src/main/java/com/metamx/druid/realtime/SegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java index 823a2e2a547..699c4b1e8ce 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/SegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java @@ -1,10 +1,10 @@ -package com.metamx.druid.realtime; +package com.metamx.druid.coordination; import com.metamx.druid.client.DataSegment; import java.io.IOException; -public interface SegmentAnnouncer +public interface DataSegmentAnnouncer { public void announceSegment(DataSegment segment) throws IOException; public void unannounceSegment(DataSegment segment) throws IOException; diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java index 3c6c0b8573c..ba1b4361f99 100644 --- a/client/src/main/java/com/metamx/druid/http/BrokerNode.java +++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java @@ -96,7 +96,7 @@ public class BrokerNode extends QueryableNode ConfigurationObjectFactory configFactory ) { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + super("broker", log, props, lifecycle, jsonMapper, smileMapper, configFactory); } public QueryToolChestWarehouse getWarehouse() diff --git a/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java index 5ecdb99e7ec..9249622da6f 100644 --- a/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/rand/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.realtime.RealtimeNode; -import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.druid.realtime.SegmentPublisher; import java.io.File; @@ -30,8 +30,8 @@ public class RealtimeStandaloneMain RealtimeNode rn = RealtimeNode.builder().build(); lifecycle.addManagedInstance(rn); - SegmentAnnouncer dummySegmentAnnouncer = - new SegmentAnnouncer() + DataSegmentAnnouncer dummySegmentAnnouncer = + new DataSegmentAnnouncer() { @Override public void announceSegment(DataSegment segment) throws IOException @@ -56,7 +56,7 @@ public class RealtimeStandaloneMain }; // dummySegmentPublisher will not send updates to db because standalone demo has no db - rn.setSegmentAnnouncer(dummySegmentAnnouncer); + rn.setAnnouncer(dummySegmentAnnouncer); rn.setSegmentPublisher(dummySegmentPublisher); rn.setDataSegmentPusher( new DataSegmentPusher() diff --git a/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java index 449ab16ed47..98ae0f7de4c 100644 --- a/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.realtime.RealtimeNode; -import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.druid.realtime.SegmentPublisher; import druid.examples.twitter.TwitterSpritzerFirehoseFactory; @@ -32,8 +32,8 @@ public class RealtimeStandaloneMain // register the Firehose rn.registerJacksonSubtype(new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer")); - final SegmentAnnouncer dummySegmentAnnouncer = - new SegmentAnnouncer() + final DataSegmentAnnouncer dummySegmentAnnouncer = + new DataSegmentAnnouncer() { @Override public void announceSegment(DataSegment segment) throws IOException @@ -58,8 +58,8 @@ public class RealtimeStandaloneMain }; // dummySegmentPublisher will not send updates to db because standalone demo has no db - rn.setSegmentAnnouncer(dummySegmentAnnouncer); rn.setSegmentPublisher(dummySegmentPublisher); + rn.setAnnouncer(dummySegmentAnnouncer); rn.setDataSegmentPusher( new DataSegmentPusher() { diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 1b5fa9afe04..77f8c0830fb 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.ServerView; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.MMappedQueryableIndexFactory; @@ -35,7 +36,6 @@ import com.metamx.druid.merger.common.actions.TaskActionClientFactory; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -55,7 +55,7 @@ public class TaskToolbox private final RestS3Service s3Client; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; - private final SegmentAnnouncer segmentAnnouncer; + private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ObjectMapper objectMapper; @@ -68,7 +68,7 @@ public class TaskToolbox RestS3Service s3Client, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, - SegmentAnnouncer segmentAnnouncer, + DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ObjectMapper objectMapper @@ -112,7 +112,7 @@ public class TaskToolbox return dataSegmentKiller; } - public SegmentAnnouncer getSegmentAnnouncer() + public DataSegmentAnnouncer getSegmentAnnouncer() { return segmentAnnouncer; } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java index 0f4fc9f0bbc..044ba541976 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java @@ -21,13 +21,13 @@ package com.metamx.druid.merger.common; import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.druid.client.ServerView; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.merger.common.actions.TaskActionClientFactory; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -42,7 +42,7 @@ public class TaskToolboxFactory private final RestS3Service s3Client; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; - private final SegmentAnnouncer segmentAnnouncer; + private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ObjectMapper objectMapper; @@ -54,7 +54,7 @@ public class TaskToolboxFactory RestS3Service s3Client, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, - SegmentAnnouncer segmentAnnouncer, + DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ObjectMapper objectMapper @@ -72,11 +72,6 @@ public class TaskToolboxFactory this.objectMapper = objectMapper; } - public ObjectMapper getObjectMapper() - { - return objectMapper; - } - public TaskToolbox build(Task task) { return new TaskToolbox( diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index fb2d60c8713..0ab75140303 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -75,6 +75,12 @@ public abstract class AbstractTask implements Task return groupId; } + @Override + public String getNodeType() + { + return null; + } + @JsonProperty @Override public String getDataSource() diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index b0fccb793cd..d4be7db19d5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -47,7 +47,7 @@ import com.metamx.druid.realtime.MinTimeFirehose; import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.Schema; -import com.metamx.druid.realtime.SegmentAnnouncer; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.VersioningPolicy; @@ -131,6 +131,12 @@ public class RealtimeIndexTask extends AbstractTask return "index_realtime"; } + @Override + public String getNodeType() + { + return "realtime"; + } + @Override public QueryRunner getQueryRunner(Query query) { @@ -192,8 +198,8 @@ public class RealtimeIndexTask extends AbstractTask // with the coordinator. Right now, we'll block/throw in whatever thread triggered the coordinator behavior, // which will typically be either the main data processing loop or the persist thread. - // Wrap default SegmentAnnouncer such that we unlock intervals as we unannounce segments - final SegmentAnnouncer lockingSegmentAnnouncer = new SegmentAnnouncer() + // Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments + final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer() { @Override public void announceSegment(final DataSegment segment) throws IOException diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index dec45d3b4db..71ae502c1d7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -76,6 +76,14 @@ public interface Task */ public String getType(); + /** + * Get the nodeType for if/when this task publishes on zookeeper. + * + * @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to + * publish to zookeeper. + */ + public String getNodeType(); + /** * Returns the datasource this task operates on. Each task can operate on only one datasource. */ diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java index 68d8ad12abf..08998345d04 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java @@ -170,6 +170,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider command.add(String.format("-Ddruid.port=%d", childPort)); command.add(config.getMainClass()); + command.add(defaultNodeType(task)); command.add(taskFile.toString()); command.add(statusFile.toString()); @@ -259,6 +260,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider } } + private String defaultNodeType(Task task) + { + final String nodeType = task.getNodeType(); + return nodeType == null ? "indexer-executor" : nodeType; + } + @LifecycleStop public void stop() { diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 152b16a051f..82222bbe40c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -173,7 +173,7 @@ public class IndexerCoordinatorNode extends QueryableNode "); + if (args.length != 3) { + log.info("Usage: ExecutorMain "); System.exit(2); } + Iterator arguments = Arrays.asList(args).iterator(); + final String nodeType = arguments.next(); + final String taskJsonFile = arguments.next(); + final String statusJsonFile = arguments.next(); + final ExecutorNode node = ExecutorNode.builder() .build( + nodeType, new ExecutorLifecycleFactory( - new File(args[0]), - new File(args[1]), + new File(taskJsonFile), + new File(statusJsonFile), System.in ) ); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java index db4f40b6124..5a849714d6e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/executor/ExecutorNode.java @@ -33,8 +33,6 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.BaseServerNode; -import com.metamx.druid.concurrent.Execs; -import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.Initialization; @@ -53,9 +51,9 @@ import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; import com.metamx.druid.merger.worker.config.WorkerConfig; -import com.metamx.druid.realtime.CuratorSegmentAnnouncer; -import com.metamx.druid.realtime.RealtimeZkSegmentAnnouncerConfig; -import com.metamx.druid.realtime.SegmentAnnouncer; +import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; +import com.metamx.druid.realtime.RealtimeCuratorDataSegmentAnnouncerConfig; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -114,6 +112,7 @@ public class ExecutorNode extends BaseServerNode private ExecutorLifecycle executorLifecycle = null; public ExecutorNode( + String nodeType, Properties props, Lifecycle lifecycle, ObjectMapper jsonMapper, @@ -122,7 +121,7 @@ public class ExecutorNode extends BaseServerNode ExecutorLifecycleFactory executorLifecycleFactory ) { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + super(nodeType, log, props, lifecycle, jsonMapper, smileMapper, configFactory); this.lifecycle = lifecycle; this.props = props; @@ -192,10 +191,7 @@ public class ExecutorNode extends BaseServerNode final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); final MonitorScheduler monitorScheduler = new MonitorScheduler( - configFactory.build(MonitorSchedulerConfig.class), - globalScheduledExec, - emitter, - monitors + configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, monitors ); lifecycle.addManagedInstance(monitorScheduler); @@ -346,12 +342,6 @@ public class ExecutorNode extends BaseServerNode { if (taskToolboxFactory == null) { final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service); - final SegmentAnnouncer segmentAnnouncer = new CuratorSegmentAnnouncer( - configFactory.build(RealtimeZkSegmentAnnouncerConfig.class), - getAnnouncer(), - getJsonMapper() - ); - lifecycle.addManagedInstance(segmentAnnouncer); taskToolboxFactory = new TaskToolboxFactory( taskConfig, new RemoteTaskActionClientFactory( @@ -369,7 +359,7 @@ public class ExecutorNode extends BaseServerNode s3Service, segmentPusher, dataSegmentKiller, - segmentAnnouncer, + getAnnouncer(), getServerInventoryThingie(), getConglomerate(), getJsonMapper() @@ -410,7 +400,7 @@ public class ExecutorNode extends BaseServerNode .build() ) ) - );; + ); } } @@ -446,7 +436,7 @@ public class ExecutorNode extends BaseServerNode return this; } - public ExecutorNode build(ExecutorLifecycleFactory executorLifecycleFactory) + public ExecutorNode build(String nodeType, ExecutorLifecycleFactory executorLifecycleFactory) { if (jsonMapper == null && smileMapper == null) { jsonMapper = new DefaultObjectMapper(); @@ -469,7 +459,7 @@ public class ExecutorNode extends BaseServerNode configFactory = Config.createFactory(props); } - return new ExecutorNode(props, lifecycle, jsonMapper, smileMapper, configFactory, executorLifecycleFactory); + return new ExecutorNode(nodeType, props, lifecycle, jsonMapper, smileMapper, configFactory, executorLifecycleFactory); } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 3a8bcd416eb..23d80084747 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -117,7 +117,7 @@ public class WorkerNode extends QueryableNode ConfigurationObjectFactory configFactory ) { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + super("indexer-worker", log, props, lifecycle, jsonMapper, smileMapper, configFactory); } public WorkerNode setHttpClient(HttpClient httpClient) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeZkSegmentAnnouncerConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeCuratorDataSegmentAnnouncerConfig.java similarity index 83% rename from realtime/src/main/java/com/metamx/druid/realtime/RealtimeZkSegmentAnnouncerConfig.java rename to realtime/src/main/java/com/metamx/druid/realtime/RealtimeCuratorDataSegmentAnnouncerConfig.java index e6b8ce9d0b6..36c15715556 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeZkSegmentAnnouncerConfig.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeCuratorDataSegmentAnnouncerConfig.java @@ -21,11 +21,6 @@ package com.metamx.druid.realtime; /** */ -public abstract class RealtimeZkSegmentAnnouncerConfig extends ZkSegmentAnnouncerConfig +public abstract class RealtimeCuratorDataSegmentAnnouncerConfig { - @Override - public final String getServerType() - { - return "realtime"; - } } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java index e5521847f7d..14a04a9d50a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java @@ -35,9 +35,9 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.BaseServerNode; -import com.metamx.druid.client.ServerView; -import com.metamx.druid.concurrent.Execs; -import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; +import com.metamx.druid.coordination.CuratorDataSegmentAnnouncerConfig; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.QueryServlet; @@ -73,7 +73,6 @@ public class RealtimeNode extends BaseServerNode private final Map injectablesMap = Maps.newLinkedHashMap(); - private SegmentAnnouncer segmentAnnouncer = null; private SegmentPublisher segmentPublisher = null; private DataSegmentPusher dataSegmentPusher = null; private List fireDepartments = null; @@ -88,34 +87,24 @@ public class RealtimeNode extends BaseServerNode ConfigurationObjectFactory configFactory ) { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); - } - - public RealtimeNode setSegmentAnnouncer(SegmentAnnouncer segmentAnnouncer) - { - Preconditions.checkState(this.segmentAnnouncer == null, "Cannot set segmentAnnouncer once it has already been set."); - this.segmentAnnouncer = segmentAnnouncer; - return this; + super("realtime", log, props, lifecycle, jsonMapper, smileMapper, configFactory); } public RealtimeNode setSegmentPublisher(SegmentPublisher segmentPublisher) { - Preconditions.checkState(this.segmentPublisher == null, "Cannot set segmentPublisher once it has already been set."); - this.segmentPublisher = segmentPublisher; + checkFieldNotSetAndSet("segmentPublisher", segmentPublisher); return this; } public RealtimeNode setDataSegmentPusher(DataSegmentPusher dataSegmentPusher) { - Preconditions.checkState(this.dataSegmentPusher == null, "Cannot set segmentPusher once it has already been set."); - this.dataSegmentPusher = dataSegmentPusher; + checkFieldNotSetAndSet("dataSegmentPusher", dataSegmentPusher); return this; } public RealtimeNode setFireDepartments(List fireDepartments) { - Preconditions.checkState(this.fireDepartments == null, "Cannot set fireDepartments once it has already been set."); - this.fireDepartments = fireDepartments; + checkFieldNotSetAndSet("fireDepartments", fireDepartments); return this; } @@ -126,12 +115,6 @@ public class RealtimeNode extends BaseServerNode return this; } - public SegmentAnnouncer getSegmentAnnouncer() - { - initializeSegmentAnnouncer(); - return segmentAnnouncer; - } - public SegmentPublisher getSegmentPublisher() { initializeSegmentPublisher(); @@ -152,21 +135,17 @@ public class RealtimeNode extends BaseServerNode protected void doInit() throws Exception { - initializeSegmentAnnouncer(); - initializeSegmentPublisher(); - initializeSegmentPusher(); initializeJacksonInjectables(); - initializeFireDepartments(); - final Lifecycle lifecycle = getLifecycle(); final ServiceEmitter emitter = getEmitter(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); final List monitors = getMonitors(); + final List departments = getFireDepartments(); - monitors.add(new RealtimeMetricsMonitor(fireDepartments)); + monitors.add(new RealtimeMetricsMonitor(departments)); - final RealtimeManager realtimeManager = new RealtimeManager(fireDepartments, conglomerate); + final RealtimeManager realtimeManager = new RealtimeManager(departments, conglomerate); lifecycle.addManagedInstance(realtimeManager); startMonitoring(monitors); @@ -208,9 +187,9 @@ public class RealtimeNode extends BaseServerNode } injectables.put("queryRunnerFactoryConglomerate", getConglomerate()); - injectables.put("segmentPusher", dataSegmentPusher); - injectables.put("segmentAnnouncer", segmentAnnouncer); - injectables.put("segmentPublisher", segmentPublisher); + injectables.put("segmentPusher", getDataSegmentPusher()); + injectables.put("segmentAnnouncer", getAnnouncer()); + injectables.put("segmentPublisher", getSegmentPublisher()); injectables.put("serverView", getServerInventoryThingie()); injectables.put("serviceEmitter", getEmitter()); @@ -232,9 +211,11 @@ public class RealtimeNode extends BaseServerNode { if (fireDepartments == null) { try { - fireDepartments = getJsonMapper().readValue( - new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")), - new TypeReference>(){} + setFireDepartments( + getJsonMapper().>readValue( + new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")), + new TypeReference>(){} + ) ); } catch (IOException e) { @@ -250,19 +231,6 @@ public class RealtimeNode extends BaseServerNode } } - protected void initializeSegmentAnnouncer() - { - if (segmentAnnouncer == null) { - final ZkSegmentAnnouncerConfig zkSegmentAnnouncerConfig = getConfigFactory().build(RealtimeZkSegmentAnnouncerConfig.class); - segmentAnnouncer = new CuratorSegmentAnnouncer( - zkSegmentAnnouncerConfig, - getAnnouncer(), - getJsonMapper() - ); - getLifecycle().addManagedInstance(segmentAnnouncer); - } - } - protected void initializeSegmentPublisher() { if (segmentPublisher == null) { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/ZkSegmentAnnouncerConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/ZkSegmentAnnouncerConfig.java deleted file mode 100644 index cfb5b57bf06..00000000000 --- a/realtime/src/main/java/com/metamx/druid/realtime/ZkSegmentAnnouncerConfig.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.metamx.druid.realtime; - -import org.skife.config.Config; -import org.skife.config.Default; - -public abstract class ZkSegmentAnnouncerConfig -{ - @Config("druid.host") - public abstract String getServerName(); - - @Config("druid.host") - public abstract String getHost(); - - @Config("druid.server.maxSize") - @Default("0") - public abstract long getMaxSize(); - - public abstract String getServerType(); - - @Config("druid.zk.paths.announcementsPath") - public abstract String getAnnouncementsPath(); - - @Config("druid.zk.paths.servedSegmentsPath") - public abstract String getServedSegmentsPath(); - -} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index 7a3cf5e7265..a7dec6c642a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -40,6 +40,7 @@ import com.metamx.druid.Query; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.ServerView; +import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.guava.ThreadRenamingCallable; import com.metamx.druid.guava.ThreadRenamingRunnable; import com.metamx.druid.index.QueryableIndex; @@ -57,7 +58,6 @@ import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireHydrant; import com.metamx.druid.realtime.Schema; -import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -97,7 +97,7 @@ public class RealtimePlumberSchool implements PlumberSchool private volatile RejectionPolicyFactory rejectionPolicyFactory = null; private volatile QueryRunnerFactoryConglomerate conglomerate = null; private volatile DataSegmentPusher dataSegmentPusher = null; - private volatile SegmentAnnouncer segmentAnnouncer = null; + private volatile DataSegmentAnnouncer segmentAnnouncer = null; private volatile SegmentPublisher segmentPublisher = null; private volatile ServerView serverView = null; @@ -144,7 +144,7 @@ public class RealtimePlumberSchool implements PlumberSchool } @JacksonInject("segmentAnnouncer") - public void setSegmentAnnouncer(SegmentAnnouncer segmentAnnouncer) + public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer) { this.segmentAnnouncer = segmentAnnouncer; } diff --git a/server/src/main/java/com/metamx/druid/BaseServerNode.java b/server/src/main/java/com/metamx/druid/BaseServerNode.java index dbbe286e270..efe2f2a19c6 100644 --- a/server/src/main/java/com/metamx/druid/BaseServerNode.java +++ b/server/src/main/java/com/metamx/druid/BaseServerNode.java @@ -47,6 +47,7 @@ public abstract class BaseServerNode extends QueryableN private StupidPool computeScratchPool = null; public BaseServerNode( + String nodeType, Logger log, Properties props, Lifecycle lifecycle, @@ -55,7 +56,7 @@ public abstract class BaseServerNode extends QueryableN ConfigurationObjectFactory configFactory ) { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + super(nodeType, log, props, lifecycle, jsonMapper, smileMapper, configFactory); } public QueryRunnerFactoryConglomerate getConglomerate() diff --git a/server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java b/server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java new file mode 100644 index 00000000000..049b7515fa3 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java @@ -0,0 +1,14 @@ +package com.metamx.druid.coordination; + +import org.skife.config.Config; +import org.skife.config.Default; + +public abstract class CuratorDataSegmentAnnouncerConfig +{ + @Config("druid.zk.paths.announcementsPath") + public abstract String getAnnouncementsPath(); + + @Config("druid.zk.paths.servedSegmentsPath") + public abstract String getServedSegmentsPath(); + +} diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 65de37bdf9f..17b925bef37 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -54,7 +54,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final ZkCoordinatorConfig config; private final DruidServer me; - private final Announcer announcer; + private final DataSegmentAnnouncer announcer; private final CuratorFramework curator; private final ServerManager serverManager; private final ServiceEmitter emitter; @@ -69,7 +69,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler ObjectMapper jsonMapper, ZkCoordinatorConfig config, DruidServer me, - Announcer announcer, + DataSegmentAnnouncer announcer, CuratorFramework curator, ServerManager serverManager, ServiceEmitter emitter @@ -112,11 +112,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler loadCache(); - announcer.announce( - ZKPaths.makePath(config.getAnnounceLocation(), me.getName()), - jsonMapper.writeValueAsBytes(me.getStringProps()) - ); - loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() { @@ -185,8 +180,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler try { loadQueueCache.close(); - - curator.delete().guaranteed().forPath(ZKPaths.makePath(config.getAnnounceLocation(), me.getName())); } catch (Exception e) { throw Throwables.propagate(e); @@ -248,36 +241,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler catch (IOException e) { removeSegment(segment); throw new SegmentLoadingException( - "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile ); } - announcer.announce(makeSegmentPath(segment), jsonMapper.writeValueAsBytes(segment)); + try { + announcer.announceSegment(segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); + } } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") .addData("segment", segment) .emit(); } - catch (JsonProcessingException e) { - log.makeAlert(e, "WTF, exception writing into byte[]???") - .addData("segment", segment) - .emit(); - } } @Override public void removeSegment(DataSegment segment) { try { - announcer.unannounce(makeSegmentPath(segment)); - serverManager.dropSegment(segment); File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); if (!segmentInfoCacheFile.delete()) { log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); } + + announcer.unannounceSegment(segment); } catch (Exception e) { log.makeAlert("Failed to remove segment") @@ -285,9 +279,4 @@ public class ZkCoordinator implements DataSegmentChangeHandler .emit(); } } - - private String makeSegmentPath(DataSegment segment) - { - return ZKPaths.makePath(servedSegmentsLocation, segment.getIdentifier()); - } } diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 57735ab7709..3b591f83bd0 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -72,7 +72,6 @@ public class ComputeNode extends BaseServerNode return new Builder(); } - private DruidServer druidServer; private SegmentLoader segmentLoader; public ComputeNode( @@ -83,7 +82,7 @@ public class ComputeNode extends BaseServerNode ConfigurationObjectFactory configFactory ) { - super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); + super("historical", log, props, lifecycle, jsonMapper, smileMapper, configFactory); } public ComputeNode setSegmentLoader(SegmentLoader segmentLoader) @@ -93,19 +92,6 @@ public class ComputeNode extends BaseServerNode return this; } - public ComputeNode setDruidServer(DruidServer druidServer) - { - Preconditions.checkState(this.druidServer == null, "Cannot set druidServer once it has already been set."); - this.druidServer = druidServer; - return this; - } - - public DruidServer getDruidServer() - { - initializeDruidServer(); - return druidServer; - } - public SegmentLoader getSegmentLoader() { initializeSegmentLoader(); @@ -178,13 +164,6 @@ public class ComputeNode extends BaseServerNode } } - private void initializeDruidServer() - { - if (druidServer == null) { - setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), "historical")); - } - } - public static class Builder { private ObjectMapper jsonMapper = null; diff --git a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java index 39806d0c355..797253cbf02 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java @@ -52,7 +52,7 @@ public class ZkCoordinatorTest { private ZkCoordinator zkCoordinator; private ServerManager serverManager; - private Announcer announcer; + private DataSegmentAnnouncer announcer; private CuratorFramework curator; private File cacheDir; private final ObjectMapper jsonMapper = new DefaultObjectMapper();