Make the Announcer closing and unannouncing atomic.

This commit is contained in:
Charles Allen 2015-02-11 08:17:44 -08:00
parent c5e99bf6ec
commit bd500fcf4b
1 changed files with 16 additions and 5 deletions

View File

@ -32,6 +32,8 @@ import io.druid.curator.ShutdownNowIgnoringExecutorService;
import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -121,12 +123,21 @@ public class Announcer
} }
} }
for (String parent : parentsIBuilt) { if (!parentsIBuilt.isEmpty()) {
CuratorTransaction transaction = curator.inTransaction();
for (String parent : parentsIBuilt) {
try {
transaction = transaction.delete().forPath(parent).and();
}
catch (Exception e) {
log.info(e, "Unable to delete parent[%s], boooo.", parent);
}
}
try { try {
curator.delete().forPath(parent); ((CuratorTransactionFinal) transaction).commit();
} }
catch (Exception e) { catch (Exception e) {
log.info(e, "Unable to delete parent[%s], boooo.", parent); log.info(e, "Unable to commit transaction. Please feed the hamsters");
} }
} }
} }
@ -316,7 +327,7 @@ public class Announcer
/** /**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
* *
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
* *
* @param path the path to unannounce * @param path the path to unannounce
@ -335,7 +346,7 @@ public class Announcer
} }
try { try {
curator.delete().guaranteed().forPath(path); curator.inTransaction().delete().forPath(path).and().commit();
} }
catch (KeeperException.NoNodeException e) { catch (KeeperException.NoNodeException e) {
log.info("node[%s] didn't exist anyway...", path); log.info("node[%s] didn't exist anyway...", path);