diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index c715de91f97..3236d1aa336 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -40,11 +40,11 @@ import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerView; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; -import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer; -import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; +import com.metamx.druid.coordination.BatchingDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; +import com.metamx.druid.coordination.SingleDataSegmentAnnouncer; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.RequestLogger; @@ -431,13 +431,13 @@ public abstract class QueryableNode extends Registering setAnnouncer( new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( Arrays.asList( - new BatchingCuratorDataSegmentAnnouncer( + new BatchingDataSegmentAnnouncer( getDruidServerMetadata(), getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class), announcer, getJsonMapper() ), - new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) + new SingleDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) ) ) ); diff --git a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java index 583910e3dac..1bb0b8114ee 100644 --- a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java @@ -86,7 +86,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc return; } - log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config); + log.info("Stopping %s with config[%s]", getClass(), config); announcer.unannounce(makeAnnouncementPath()); started = false; diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncer.java similarity index 94% rename from client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncer.java index e8070e71ff7..6b098d41b7f 100644 --- a/client/src/main/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncer.java @@ -22,33 +22,26 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; -import com.metamx.druid.initialization.ZkPathsConfig; import org.apache.curator.utils.ZKPaths; import org.joda.time.DateTime; import java.io.IOException; -import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; /** */ -public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +public class BatchingDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { - private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class); + private static final Logger log = new Logger(BatchingDataSegmentAnnouncer.class); private final ZkDataSegmentAnnouncerConfig config; private final Announcer announcer; @@ -58,7 +51,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno private final Set availableZNodes = Sets.newHashSet(); private final Map segmentLookup = Maps.newHashMap(); - public BatchingCuratorDataSegmentAnnouncer( + public BatchingDataSegmentAnnouncer( DruidServerMetadata server, ZkDataSegmentAnnouncerConfig config, Announcer announcer, diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java similarity index 93% rename from client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java index f9286d1a239..1757ccbe6af 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java @@ -28,15 +28,15 @@ import org.apache.curator.utils.ZKPaths; import java.io.IOException; -public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { - private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class); + private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class); private final Announcer announcer; private final ObjectMapper jsonMapper; private final String servedSegmentsLocation; - public CuratorDataSegmentAnnouncer( + public SingleDataSegmentAnnouncer( DruidServerMetadata server, ZkPathsConfig config, Announcer announcer, diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java index 57981f91785..727f7704771 100644 --- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java +++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +65,7 @@ public class Announcer private final List> toAnnounce = Lists.newArrayList(); private final ConcurrentMap listeners = new MapMaker().makeMap(); private final ConcurrentMap> announcements = new MapMaker().makeMap(); + private final List parentsIBuilt = new CopyOnWriteArrayList(); private boolean started = false; @@ -114,6 +116,15 @@ public class Announcer unannounce(ZKPaths.makePath(basePath, announcementPath)); } } + + for (String parent : parentsIBuilt) { + try { + curator.delete().forPath(parent); + } + catch (Exception e) { + log.info(e, "Unable to delete parent[%s], boooo.", parent); + } + } } } @@ -136,10 +147,19 @@ public class Announcer final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final String parentPath = pathAndNode.getPath(); + boolean buildParentPath = false; ConcurrentMap subPaths = announcements.get(parentPath); if (subPaths == null) { + try { + if (curator.checkExists().forPath(parentPath) == null) { + buildParentPath = true; + } + } + catch (Exception e) { + log.debug(e, "Problem checking if the parent existed, ignoring."); + } // I don't have a watcher on this path yet, create a Map and start watching. announcements.putIfAbsent(parentPath, new MapMaker().makeMap()); @@ -208,17 +228,15 @@ public class Announcer } ); - try { - synchronized (toAnnounce) { - if (started) { - cache.start(); - listeners.put(parentPath, cache); + synchronized (toAnnounce) { + if (started) { + if (buildParentPath) { + createPath(parentPath); } + startCache(cache); + listeners.put(parentPath, cache); } } - catch (Exception e) { - throw Throwables.propagate(e); - } } } @@ -261,7 +279,7 @@ public class Announcer throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); } - synchronized (subPaths) { + synchronized (toAnnounce) { try { byte[] oldBytes = subPaths.get(nodePath); @@ -320,4 +338,26 @@ public class Announcer throw Throwables.propagate(e); } } + + private void startCache(PathChildrenCache cache) + { + try { + cache.start(); + } + catch (Exception e) { + Closeables.closeQuietly(cache); + throw Throwables.propagate(e); + } + } + + private void createPath(String parentPath) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + parentsIBuilt.add(parentPath); + } + catch (Exception e) { + log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + } } diff --git a/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java b/client/src/test/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncerTest.java similarity index 97% rename from client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java rename to client/src/test/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncerTest.java index d4390587268..72b32f26c98 100644 --- a/client/src/test/java/com/metamx/druid/coordination/BatchingCuratorDataSegmentAnnouncerTest.java +++ b/client/src/test/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncerTest.java @@ -47,7 +47,7 @@ import java.util.Set; /** */ -public class BatchingCuratorDataSegmentAnnouncerTest +public class BatchingDataSegmentAnnouncerTest { private static final String testBasePath = "/test"; private static final String testSegmentsPath = "/test/segments/id"; @@ -58,7 +58,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest private ObjectMapper jsonMapper; private Announcer announcer; private SegmentReader segmentReader; - private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer; + private BatchingDataSegmentAnnouncer segmentAnnouncer; private Set testSegments; @Before @@ -84,7 +84,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest announcer.start(); segmentReader = new SegmentReader(cf, jsonMapper); - segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer( + segmentAnnouncer = new BatchingDataSegmentAnnouncer( new DruidServerMetadata( "id", "host", diff --git a/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java b/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java index 21c9ae2eee0..9f4276e164a 100644 --- a/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java +++ b/client/src/test/java/com/metamx/druid/curator/announcement/AnnouncerTest.java @@ -27,6 +27,8 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.test.KillSession; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,7 +62,6 @@ public class AnnouncerTest extends CuratorTestBase public void testSanity() throws Exception { curator.start(); - curator.create().forPath("/somewhere"); Announcer announcer = new Announcer(curator, exec); final byte[] billy = "billy".getBytes(); @@ -163,4 +164,54 @@ public class AnnouncerTest extends CuratorTestBase announcer.stop(); } } + + @Test + public void testCleansUpItsLittleTurdlings() throws Exception + { + curator.start(); + Announcer announcer = new Announcer(curator, exec); + + final byte[] billy = "billy".getBytes(); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + + Assert.assertNull(curator.checkExists().forPath(parent)); + + announcer.announce(testPath, billy); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + + announcer.stop(); + + Assert.assertNull(curator.checkExists().forPath(parent)); + } + + @Test + public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception + { + curator.start(); + Announcer announcer = new Announcer(curator, exec); + + final byte[] billy = "billy".getBytes(); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + curator.create().forPath(parent); + final Stat initialStat = curator.checkExists().forPath(parent); + + announcer.start(); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + announcer.announce(testPath, billy); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + announcer.stop(); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + } diff --git a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java index 1a34c75cdbc..746e7035cbb 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java @@ -93,7 +93,7 @@ public class ZkCoordinatorTest extends CuratorTestBase } }; - announcer = new CuratorDataSegmentAnnouncer( + announcer = new SingleDataSegmentAnnouncer( me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper );