diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index 7d3c71e37aa..c92cd6e8efe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -103,7 +103,7 @@ public class WorkerCuratorCoordinator ImmutableMap.of("created", new DateTime().toString()) ); announcer.start(); - announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker)); + announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker), false); started = true; } @@ -117,8 +117,6 @@ public class WorkerCuratorCoordinator if (!started) { return; } - - announcer.unannounce(getAnnouncementsPathForWorker()); announcer.stop(); started = false; diff --git a/server/src/main/java/io/druid/curator/announcement/Announcer.java b/server/src/main/java/io/druid/curator/announcement/Announcer.java index 34c4203a6e7..102dbd8e7aa 100644 --- a/server/src/main/java/io/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/io/druid/curator/announcement/Announcer.java @@ -23,7 +23,6 @@ import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.metamx.common.IAE; import com.metamx.common.ISE; -import com.metamx.common.Pair; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -62,8 +61,8 @@ public class Announcer private final CuratorFramework curator; private final PathChildrenCacheFactory factory; - private final List> toAnnounce = Lists.newArrayList(); - private final List> toUpdate = Lists.newArrayList(); + private final List toAnnounce = Lists.newArrayList(); + private final List toUpdate = Lists.newArrayList(); private final ConcurrentMap listeners = new MapMaker().makeMap(); private final ConcurrentMap> announcements = new MapMaker().makeMap(); private final List parentsIBuilt = new CopyOnWriteArrayList(); @@ -89,13 +88,13 @@ public class Announcer started = true; - for (Pair pair : toAnnounce) { - announce(pair.lhs, pair.rhs); + for (Announceable announceable : toAnnounce) { + announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated); } toAnnounce.clear(); - for (Pair pair : toUpdate) { - update(pair.lhs, pair.rhs); + for (Announceable announceable : toUpdate) { + update(announceable.path, announceable.bytes); } toUpdate.clear(); } @@ -144,17 +143,26 @@ public class Announcer } /** - * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node - * and monitor it to make sure that it always exists until it is unannounced or this object is closed. - * - * @param path The path to announce at - * @param bytes The payload to announce + * Like announce(path, bytes, true). */ public void announce(String path, byte[] bytes) + { + announce(path, bytes, true); + } + + /** + * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node + * and monitor it to make sure that it always exists until it is unannounced or this object is closed. + * + * @param path The path to announce at + * @param bytes The payload to announce + * @param removeParentIfCreated remove parent of "path" if we had created that parent + */ + public void announce(String path, byte[] bytes, boolean removeParentIfCreated) { synchronized (toAnnounce) { if (!started) { - toAnnounce.add(Pair.of(path, bytes)); + toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); return; } } @@ -244,7 +252,7 @@ public class Announcer synchronized (toAnnounce) { if (started) { if (buildParentPath) { - createPath(parentPath); + createPath(parentPath, removeParentIfCreated); } startCache(cache); listeners.put(parentPath, cache); @@ -283,7 +291,8 @@ public class Announcer { synchronized (toAnnounce) { if (!started) { - toUpdate.add(Pair.of(path, bytes)); + // removeParentsIfCreated is not relevant for updates; use dummy value "false". + toUpdate.add(new Announceable(path, bytes, false)); return; } } @@ -327,7 +336,7 @@ public class Announcer /** * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. - * + *

* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. * * @param path the path to unannounce @@ -367,14 +376,31 @@ public class Announcer } } - private void createPath(String parentPath) + private void createPath(String parentPath, boolean removeParentsIfCreated) { try { curator.create().creatingParentsIfNeeded().forPath(parentPath); - parentsIBuilt.add(parentPath); + if (removeParentsIfCreated) { + parentsIBuilt.add(parentPath); + } + log.debug("Created parentPath[%s], %s remove on stop.", parentPath, removeParentsIfCreated ? "will" : "will not"); } catch (Exception e) { log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); } } + + private static class Announceable + { + final String path; + final byte[] bytes; + final boolean removeParentsIfCreated; + + public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) + { + this.path = path; + this.bytes = bytes; + this.removeParentsIfCreated = removeParentsIfCreated; + } + } } diff --git a/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java index 7aed45195e6..12223f70cd2 100644 --- a/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java @@ -66,7 +66,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc try { final String path = makeAnnouncementPath(); log.info("Announcing self[%s] at [%s]", server, path); - announcer.announce(path, jsonMapper.writeValueAsBytes(server)); + announcer.announce(path, jsonMapper.writeValueAsBytes(server), false); } catch (JsonProcessingException e) { throw Throwables.propagate(e); diff --git a/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java b/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java index 2dddc63fc10..624f898a7ae 100644 --- a/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java +++ b/server/src/test/java/io/druid/curator/announcement/AnnouncerTest.java @@ -192,7 +192,7 @@ public class AnnouncerTest extends CuratorTestBase try { Assert.assertNull(curator.checkExists().forPath(parent)); - announcer.announce(testPath, billy); + awaitAnnounce(announcer, testPath, billy, true); Assert.assertNotNull(curator.checkExists().forPath(parent)); } @@ -220,7 +220,7 @@ public class AnnouncerTest extends CuratorTestBase try { Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); - announcer.announce(testPath, billy); + awaitAnnounce(announcer, testPath, billy, true); Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); } @@ -231,4 +231,52 @@ public class AnnouncerTest extends CuratorTestBase Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); } + @Test + public void testLeavesBehindTurdlingsWhenToldTo() throws Exception + { + curator.start(); + Announcer announcer = new Announcer(curator, exec); + + final byte[] billy = "billy".getBytes(); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assert.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, false); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + + private void awaitAnnounce( + final Announcer announcer, + final String path, + final byte[] bytes, + boolean removeParentsIfCreated + ) throws InterruptedException + { + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + new CuratorListener() + { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception + { + if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(path)) { + latch.countDown(); + } + } + } + ); + announcer.announce(path, bytes, removeParentsIfCreated); + latch.await(); + } }