Merge pull request #1868 from gianm/fix-announcements

Historical and MiddleManager server announcements should not remove parents.
This commit is contained in:
Fangjin Yang 2015-10-27 14:50:05 -07:00
commit ea2267e08c
4 changed files with 96 additions and 24 deletions

View File

@ -103,7 +103,7 @@ public class WorkerCuratorCoordinator
ImmutableMap.of("created", new DateTime().toString()) ImmutableMap.of("created", new DateTime().toString())
); );
announcer.start(); announcer.start();
announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker)); announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker), false);
started = true; started = true;
} }
@ -117,8 +117,6 @@ public class WorkerCuratorCoordinator
if (!started) { if (!started) {
return; return;
} }
announcer.unannounce(getAnnouncementsPathForWorker());
announcer.stop(); announcer.stop();
started = false; started = false;

View File

@ -23,7 +23,6 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -62,8 +61,8 @@ public class Announcer
private final CuratorFramework curator; private final CuratorFramework curator;
private final PathChildrenCacheFactory factory; private final PathChildrenCacheFactory factory;
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList(); private final List<Announceable> toAnnounce = Lists.newArrayList();
private final List<Pair<String, byte[]>> toUpdate = Lists.newArrayList(); private final List<Announceable> toUpdate = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap(); private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap(); private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>(); private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
@ -89,13 +88,13 @@ public class Announcer
started = true; started = true;
for (Pair<String, byte[]> pair : toAnnounce) { for (Announceable announceable : toAnnounce) {
announce(pair.lhs, pair.rhs); announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated);
} }
toAnnounce.clear(); toAnnounce.clear();
for (Pair<String, byte[]> pair : toUpdate) { for (Announceable announceable : toUpdate) {
update(pair.lhs, pair.rhs); update(announceable.path, announceable.bytes);
} }
toUpdate.clear(); toUpdate.clear();
} }
@ -144,17 +143,26 @@ public class Announcer
} }
/** /**
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node * Like announce(path, bytes, true).
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
* @param path The path to announce at
* @param bytes The payload to announce
*/ */
public void announce(String path, byte[] bytes) public void announce(String path, byte[] bytes)
{
announce(path, bytes, true);
}
/**
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
* @param path The path to announce at
* @param bytes The payload to announce
* @param removeParentIfCreated remove parent of "path" if we had created that parent
*/
public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
{ {
synchronized (toAnnounce) { synchronized (toAnnounce) {
if (!started) { if (!started) {
toAnnounce.add(Pair.of(path, bytes)); toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
return; return;
} }
} }
@ -244,7 +252,7 @@ public class Announcer
synchronized (toAnnounce) { synchronized (toAnnounce) {
if (started) { if (started) {
if (buildParentPath) { if (buildParentPath) {
createPath(parentPath); createPath(parentPath, removeParentIfCreated);
} }
startCache(cache); startCache(cache);
listeners.put(parentPath, cache); listeners.put(parentPath, cache);
@ -283,7 +291,8 @@ public class Announcer
{ {
synchronized (toAnnounce) { synchronized (toAnnounce) {
if (!started) { if (!started) {
toUpdate.add(Pair.of(path, bytes)); // removeParentsIfCreated is not relevant for updates; use dummy value "false".
toUpdate.add(new Announceable(path, bytes, false));
return; return;
} }
} }
@ -327,7 +336,7 @@ public class Announcer
/** /**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
* * <p/>
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
* *
* @param path the path to unannounce * @param path the path to unannounce
@ -367,14 +376,31 @@ public class Announcer
} }
} }
private void createPath(String parentPath) private void createPath(String parentPath, boolean removeParentsIfCreated)
{ {
try { try {
curator.create().creatingParentsIfNeeded().forPath(parentPath); curator.create().creatingParentsIfNeeded().forPath(parentPath);
parentsIBuilt.add(parentPath); if (removeParentsIfCreated) {
parentsIBuilt.add(parentPath);
}
log.debug("Created parentPath[%s], %s remove on stop.", parentPath, removeParentsIfCreated ? "will" : "will not");
} }
catch (Exception e) { catch (Exception e) {
log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
} }
} }
private static class Announceable
{
final String path;
final byte[] bytes;
final boolean removeParentsIfCreated;
public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated)
{
this.path = path;
this.bytes = bytes;
this.removeParentsIfCreated = removeParentsIfCreated;
}
}
} }

View File

@ -66,7 +66,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
try { try {
final String path = makeAnnouncementPath(); final String path = makeAnnouncementPath();
log.info("Announcing self[%s] at [%s]", server, path); log.info("Announcing self[%s] at [%s]", server, path);
announcer.announce(path, jsonMapper.writeValueAsBytes(server)); announcer.announce(path, jsonMapper.writeValueAsBytes(server), false);
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -192,7 +192,7 @@ public class AnnouncerTest extends CuratorTestBase
try { try {
Assert.assertNull(curator.checkExists().forPath(parent)); Assert.assertNull(curator.checkExists().forPath(parent));
announcer.announce(testPath, billy); awaitAnnounce(announcer, testPath, billy, true);
Assert.assertNotNull(curator.checkExists().forPath(parent)); Assert.assertNotNull(curator.checkExists().forPath(parent));
} }
@ -220,7 +220,7 @@ public class AnnouncerTest extends CuratorTestBase
try { try {
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.announce(testPath, billy); awaitAnnounce(announcer, testPath, billy, true);
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
} }
@ -231,4 +231,52 @@ public class AnnouncerTest extends CuratorTestBase
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
} }
@Test
public void testLeavesBehindTurdlingsWhenToldTo() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
announcer.start();
try {
Assert.assertNull(curator.checkExists().forPath(parent));
awaitAnnounce(announcer, testPath, billy, false);
Assert.assertNotNull(curator.checkExists().forPath(parent));
}
finally {
announcer.stop();
}
Assert.assertNotNull(curator.checkExists().forPath(parent));
}
private void awaitAnnounce(
final Announcer announcer,
final String path,
final byte[] bytes,
boolean removeParentsIfCreated
) throws InterruptedException
{
final CountDownLatch latch = new CountDownLatch(1);
curator.getCuratorListenable().addListener(
new CuratorListener()
{
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
{
if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(path)) {
latch.countDown();
}
}
}
);
announcer.announce(path, bytes, removeParentsIfCreated);
latch.await();
}
} }