diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 1971bc5158e..f45b473fdfa 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -43,11 +43,11 @@ import com.metamx.druid.client.ServerView; import com.metamx.druid.client.SingleServerInventoryView; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; -import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; -import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; +import com.metamx.druid.coordination.BatchDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; +import com.metamx.druid.coordination.SingleDataSegmentAnnouncer; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.RequestLogger; @@ -456,7 +456,7 @@ public abstract class QueryableNode extends Registering final DataSegmentAnnouncer dataSegmentAnnouncer; if ("batch".equalsIgnoreCase(announcerType)) { - dataSegmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( + dataSegmentAnnouncer = new BatchDataSegmentAnnouncer( getDruidServerMetadata(), config, announcer, @@ -465,13 +465,13 @@ public abstract class QueryableNode extends Registering } else if ("legacy".equalsIgnoreCase(announcerType)) { dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( Arrays.asList( - new BatchingCuratorDataSegmentAnnouncer( + new BatchDataSegmentAnnouncer( getDruidServerMetadata(), config, announcer, getJsonMapper() ), - new CuratorDataSegmentAnnouncer( + new SingleDataSegmentAnnouncer( getDruidServerMetadata(), getZkPaths(), announcer, diff --git a/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java index cc178862928..301c4ebb16e 100644 --- a/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.metamx.common.ISE; +import com.metamx.druid.curator.inventory.InventoryManagerConfig; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; @@ -49,9 +50,27 @@ public class BatchServerInventoryView extends ServerInventoryView>() - { - } + config, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return zkPaths.getAnnouncementsPath(); + } + + @Override + public String getInventoryPath() + { + return zkPaths.getLiveSegmentsPath(); + } + }, + curator, + exec, + jsonMapper, + new TypeReference>() + { + } ); } @@ -85,6 +104,7 @@ public class BatchServerInventoryView extends ServerInventoryView implements ServerView, public ServerInventoryView( final ServerInventoryViewConfig config, - final ZkPathsConfig zkPaths, + final InventoryManagerConfig inventoryManagerConfig, final CuratorFramework curator, final ExecutorService exec, final ObjectMapper jsonMapper, @@ -69,20 +68,7 @@ public abstract class ServerInventoryView implements ServerView, this.config = config; this.inventoryManager = new CuratorInventoryManager( curator, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return zkPaths.getAnnouncementsPath(); - } - - @Override - public String getInventoryPath() - { - return zkPaths.getServedSegmentsPath(); - } - }, + inventoryManagerConfig, exec, new CuratorInventoryManagerStrategy() { @@ -246,6 +232,11 @@ public abstract class ServerInventoryView implements ServerView, segmentCallbacks.put(callback, exec); } + public InventoryManagerConfig getInventoryManagerConfig() + { + return inventoryManager.getConfig(); + } + protected void runSegmentCallbacks( final Function fn ) diff --git a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java index 30bdc30e46e..f858fe7a3dc 100644 --- a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java @@ -21,6 +21,7 @@ package com.metamx.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.curator.inventory.InventoryManagerConfig; import com.metamx.druid.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; @@ -39,9 +40,27 @@ public class SingleServerInventoryView extends ServerInventoryView ) { super( - config, zkPaths, curator, exec, jsonMapper, new TypeReference() - { - } + config, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return zkPaths.getAnnouncementsPath(); + } + + @Override + public String getInventoryPath() + { + return zkPaths.getServedSegmentsPath(); + } + }, + curator, + exec, + jsonMapper, + new TypeReference() + { + } ); } diff --git a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java index 583910e3dac..1bb0b8114ee 100644 --- a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java @@ -86,7 +86,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc return; } - log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); + log.info("Stopping %s with config[%s]", getClass(), config); announcer.unannounce(makeAnnouncementPath()); started = false; diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java similarity index 94% rename from client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java index e8070e71ff7..5911ec2a642 100644 --- a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java @@ -22,33 +22,26 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; -import com.metamx.druid.initialization.ZkPathsConfig; import org.apache.curator.utils.ZKPaths; import org.joda.time.DateTime; import java.io.IOException; -import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; /** */ -public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { - private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); + private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class); private final ZkDataSegmentAnnouncerConfig config; private final Announcer announcer; @@ -58,7 +51,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno private final Set availableZNodes = Sets.newHashSet(); private final Map segmentLookup = Maps.newHashMap(); - public BatchingCuratorDataSegmentAnnouncer( + public BatchDataSegmentAnnouncer( DruidServerMetadata server, ZkDataSegmentAnnouncerConfig config, Announcer announcer, diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java similarity index 93% rename from client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java index f9286d1a239..1757ccbe6af 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java @@ -28,15 +28,15 @@ import org.apache.curator.utils.ZKPaths; import java.io.IOException; -public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { - private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); + private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class); private final Announcer announcer; private final ObjectMapper jsonMapper; private final String servedSegmentsLocation; - public CuratorDataSegmentAnnouncer( + public SingleDataSegmentAnnouncer( DruidServerMetadata server, ZkPathsConfig config, Announcer announcer, diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index 57981f91785..727f7704771 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +65,7 @@ public class Announcer private final List> toAnnounce = Lists.newArrayList(); private final ConcurrentMap listeners = new MapMaker().makeMap(); private final ConcurrentMap> announcements = new MapMaker().makeMap(); + private final List parentsIBuilt = new CopyOnWriteArrayList(); private boolean started = false; @@ -114,6 +116,15 @@ public class Announcer unannounce(ZKPaths.makePath(basePath, announcementPath)); } } + + for (String parent : parentsIBuilt) { + try { + curator.delete().forPath(parent); + } + catch (Exception e) { + log.info(e, "Unable to delete parent[%s], boooo.", parent); + } + } } } @@ -136,10 +147,19 @@ public class Announcer final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final String parentPath = pathAndNode.getPath(); + boolean buildParentPath = false; ConcurrentMap subPaths = announcements.get(parentPath); if (subPaths == null) { + try { + if (curator.checkExists().forPath(parentPath) == null) { + buildParentPath = true; + } + } + catch (Exception e) { + log.debug(e, "Problem checking if the parent existed, ignoring."); + } // I don't have a watcher on this path yet, create a Map and start watching. announcements.putIfAbsent(parentPath, new MapMaker().makeMap()); @@ -208,17 +228,15 @@ public class Announcer } ); - try { - synchronized (toAnnounce) { - if (started) { - cache.start(); - listeners.put(parentPath, cache); + synchronized (toAnnounce) { + if (started) { + if (buildParentPath) { + createPath(parentPath); } + startCache(cache); + listeners.put(parentPath, cache); } } - catch (Exception e) { - throw Throwables.propagate(e); - } } } @@ -261,7 +279,7 @@ public class Announcer throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); } - synchronized (subPaths) { + synchronized (toAnnounce) { try { byte[] oldBytes = subPaths.get(nodePath); @@ -320,4 +338,26 @@ public class Announcer throw Throwables.propagate(e); } } + + private void startCache(PathChildrenCache cache) + { + try { + cache.start(); + } + catch (Exception e) { + Closeables.closeQuietly(cache); + throw Throwables.propagate(e); + } + } + + private void createPath(String parentPath) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + parentsIBuilt.add(parentPath); + } + catch (Exception e) { + log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + } } diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java index 2f713f87554..7c35ad12618 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java @@ -135,6 +135,11 @@ public class CuratorInventoryManager } } + public InventoryManagerConfig getConfig() + { + return config; + } + public ContainerClass getInventoryValue(String containerKey) { final ContainerHolder containerHolder = containers.get(containerKey); diff --git a/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java b/client/src/test/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncerTest.java similarity index 97% rename from client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java rename to client/src/test/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncerTest.java index dc093e53cfb..608fe850a74 100644 --- a/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java +++ b/client/src/test/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncerTest.java @@ -47,7 +47,7 @@ import java.util.Set; /** */ -public class BatchingCuratorDataSegmentAnnouncerTest +public class BatchDataSegmentAnnouncerTest { private static final String testBasePath = "/test"; private static final String testSegmentsPath = "/test/segments/id"; @@ -58,7 +58,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest private ObjectMapper jsonMapper; private Announcer announcer; private SegmentReader segmentReader; - private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer; + private BatchDataSegmentAnnouncer segmentAnnouncer; private Set testSegments; @Before @@ -84,7 +84,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest announcer.start(); segmentReader = new SegmentReader(cf, jsonMapper); - segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( + segmentAnnouncer = new BatchDataSegmentAnnouncer( new DruidServerMetadata( "id", "host", diff --git a/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java b/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java index 21c9ae2eee0..9f4276e164a 100644 --- a/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java +++ b/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java @@ -27,6 +27,8 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.test.KillSession; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,7 +62,6 @@ public class AnnouncerTest extends CuratorTestBase public void testSanity() throws Exception { curator.start(); - curator.create().forPath("/somewhere"); Announcer announcer = new Announcer(curator, exec); final byte[] billy = "billy".getBytes(); @@ -163,4 +164,54 @@ public class AnnouncerTest extends CuratorTestBase announcer.stop(); } } + + @Test + public void testCleansUpItsLittleTurdlings() throws Exception + { + curator.start(); + Announcer announcer = new Announcer(curator, exec); + + final byte[] billy = "billy".getBytes(); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + + Assert.assertNull(curator.checkExists().forPath(parent)); + + announcer.announce(testPath, billy); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + + announcer.stop(); + + Assert.assertNull(curator.checkExists().forPath(parent)); + } + + @Test + public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception + { + curator.start(); + Announcer announcer = new Announcer(curator, exec); + + final byte[] billy = "billy".getBytes(); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + curator.create().forPath(parent); + final Stat initialStat = curator.checkExists().forPath(parent); + + announcer.start(); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + announcer.announce(testPath, billy); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + announcer.stop(); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + } diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 0146af28dd4..cb5c32730ca 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -50,14 +50,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final ZkCoordinatorConfig config; + private final ZkPathsConfig zkPaths; private final DruidServerMetadata me; private final DataSegmentAnnouncer announcer; private final CuratorFramework curator; private final ServerManager serverManager; - private final String loadQueueLocation; - private final String servedSegmentsLocation; - private volatile PathChildrenCache loadQueueCache; private volatile boolean started; @@ -73,13 +71,11 @@ public class ZkCoordinator implements DataSegmentChangeHandler { this.jsonMapper = jsonMapper; this.config = config; + this.zkPaths = zkPaths; this.me = me; this.announcer = announcer; this.curator = curator; this.serverManager = serverManager; - - this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); - this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); } @LifecycleStart @@ -91,6 +87,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler return; } + final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); + final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); + final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName()); + loadQueueCache = new PathChildrenCache( curator, loadQueueLocation, @@ -104,6 +104,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); + curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); if (config.isLoadFromSegmentCacheEnabled()) { loadCache(); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index e14c3b5a7b8..592e76f0d06 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -291,7 +291,7 @@ public class DruidMaster final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); final String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName + ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), to), segmentName ); loadPeon.loadSegment( diff --git a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java index 88f26351991..ef5185ac703 100644 --- a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java +++ b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java @@ -296,6 +296,7 @@ public class DruidSetup createPath(curator, zkPaths.getMasterPath(), out); createPath(curator, zkPaths.getLoadQueuePath(), out); createPath(curator, zkPaths.getServedSegmentsPath(), out); + createPath(curator, zkPaths.getLiveSegmentsPath(), out); createPath(curator, zkPaths.getPropertiesPath(), out); } diff --git a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java index 1a34c75cdbc..746e7035cbb 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java @@ -93,7 +93,7 @@ public class ZkCoordinatorTest extends CuratorTestBase } }; - announcer = new CuratorDataSegmentAnnouncer( + announcer = new SingleDataSegmentAnnouncer( me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper );