Historical and MiddleManager server announcements should not remove parents.

Removing parent paths causes watchers of the "announcements" path to get stuck
and stop seeing new updates.
This commit is contained in:
Gian Merlino 2015-10-26 23:09:38 -07:00
parent 4f746ddf63
commit 4b92752deb
4 changed files with 96 additions and 24 deletions

View File

@ -103,7 +103,7 @@ public class WorkerCuratorCoordinator
ImmutableMap.of("created", new DateTime().toString())
);
announcer.start();
announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker));
announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker), false);
started = true;
}
@ -117,8 +117,6 @@ public class WorkerCuratorCoordinator
if (!started) {
return;
}
announcer.unannounce(getAnnouncementsPathForWorker());
announcer.stop();
started = false;

View File

@ -23,7 +23,6 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -62,8 +61,8 @@ public class Announcer
private final CuratorFramework curator;
private final PathChildrenCacheFactory factory;
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final List<Pair<String, byte[]>> toUpdate = Lists.newArrayList();
private final List<Announceable> toAnnounce = Lists.newArrayList();
private final List<Announceable> toUpdate = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
@ -89,13 +88,13 @@ public class Announcer
started = true;
for (Pair<String, byte[]> pair : toAnnounce) {
announce(pair.lhs, pair.rhs);
for (Announceable announceable : toAnnounce) {
announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated);
}
toAnnounce.clear();
for (Pair<String, byte[]> pair : toUpdate) {
update(pair.lhs, pair.rhs);
for (Announceable announceable : toUpdate) {
update(announceable.path, announceable.bytes);
}
toUpdate.clear();
}
@ -144,17 +143,26 @@ public class Announcer
}
/**
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
* @param path The path to announce at
* @param bytes The payload to announce
* Like announce(path, bytes, true).
*/
public void announce(String path, byte[] bytes)
{
announce(path, bytes, true);
}
/**
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
* @param path The path to announce at
* @param bytes The payload to announce
* @param removeParentIfCreated remove parent of "path" if we had created that parent
*/
public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
{
synchronized (toAnnounce) {
if (!started) {
toAnnounce.add(Pair.of(path, bytes));
toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
return;
}
}
@ -244,7 +252,7 @@ public class Announcer
synchronized (toAnnounce) {
if (started) {
if (buildParentPath) {
createPath(parentPath);
createPath(parentPath, removeParentIfCreated);
}
startCache(cache);
listeners.put(parentPath, cache);
@ -283,7 +291,8 @@ public class Announcer
{
synchronized (toAnnounce) {
if (!started) {
toUpdate.add(Pair.of(path, bytes));
// removeParentsIfCreated is not relevant for updates; use dummy value "false".
toUpdate.add(new Announceable(path, bytes, false));
return;
}
}
@ -327,7 +336,7 @@ public class Announcer
/**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
*
* <p/>
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
*
* @param path the path to unannounce
@ -367,14 +376,31 @@ public class Announcer
}
}
private void createPath(String parentPath)
private void createPath(String parentPath, boolean removeParentsIfCreated)
{
try {
curator.create().creatingParentsIfNeeded().forPath(parentPath);
parentsIBuilt.add(parentPath);
if (removeParentsIfCreated) {
parentsIBuilt.add(parentPath);
}
log.debug("Created parentPath[%s], %s remove on stop.", parentPath, removeParentsIfCreated ? "will" : "will not");
}
catch (Exception e) {
log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
}
}
private static class Announceable
{
final String path;
final byte[] bytes;
final boolean removeParentsIfCreated;
public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated)
{
this.path = path;
this.bytes = bytes;
this.removeParentsIfCreated = removeParentsIfCreated;
}
}
}

View File

@ -66,7 +66,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
try {
final String path = makeAnnouncementPath();
log.info("Announcing self[%s] at [%s]", server, path);
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
announcer.announce(path, jsonMapper.writeValueAsBytes(server), false);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);

View File

@ -192,7 +192,7 @@ public class AnnouncerTest extends CuratorTestBase
try {
Assert.assertNull(curator.checkExists().forPath(parent));
announcer.announce(testPath, billy);
awaitAnnounce(announcer, testPath, billy, true);
Assert.assertNotNull(curator.checkExists().forPath(parent));
}
@ -220,7 +220,7 @@ public class AnnouncerTest extends CuratorTestBase
try {
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.announce(testPath, billy);
awaitAnnounce(announcer, testPath, billy, true);
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
}
@ -231,4 +231,52 @@ public class AnnouncerTest extends CuratorTestBase
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
}
@Test
public void testLeavesBehindTurdlingsWhenToldTo() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
announcer.start();
try {
Assert.assertNull(curator.checkExists().forPath(parent));
awaitAnnounce(announcer, testPath, billy, false);
Assert.assertNotNull(curator.checkExists().forPath(parent));
}
finally {
announcer.stop();
}
Assert.assertNotNull(curator.checkExists().forPath(parent));
}
private void awaitAnnounce(
final Announcer announcer,
final String path,
final byte[] bytes,
boolean removeParentsIfCreated
) throws InterruptedException
{
final CountDownLatch latch = new CountDownLatch(1);
curator.getCuratorListenable().addListener(
new CuratorListener()
{
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
{
if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(path)) {
latch.countDown();
}
}
}
);
announcer.announce(path, bytes, removeParentsIfCreated);
latch.await();
}
}