diff --git a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java index 2deb82c687b..f583ba06a21 100644 --- a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java @@ -81,7 +81,10 @@ public class CuratorInventoryManager this.containers = new MapMaker().makeMap(); this.uninitializedInventory = Sets.newConcurrentHashSet(); - this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec)); + //NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event. + //this is a workaround to solve curator's out-of-order events problem + //https://issues.apache.org/jira/browse/CURATOR-191 + this.cacheFactory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec)); } @LifecycleStart @@ -163,6 +166,15 @@ public class CuratorInventoryManager ); } + private byte[] getZkDataForNode(String path) { + try { + return curatorFramework.getData().decompressed().forPath(path); + } catch(Exception ex) { + log.warn(ex, "Exception while getting data for node %s", path); + return null; + } + } + private class ContainerHolder { private final AtomicReference container; @@ -206,9 +218,19 @@ public class CuratorInventoryManager case CHILD_ADDED: synchronized (lock) { final ChildData child = event.getData(); + + byte[] data = getZkDataForNode(child.getPath()); + if(data == null) { + log.info("Ignoring event: Type - %s , Path - %s , Version - %s", + event.getType(), + child.getPath(), + child.getStat().getVersion()); + return; + } + final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); - final ContainerClass container = strategy.deserializeContainer(child.getData()); + final ContainerClass container = strategy.deserializeContainer(data); // This would normally be a race condition, but the only thing that should be mutating the containers // map is this listener, which should never run concurrently. If the same container is going to disappear @@ -257,9 +279,19 @@ public class CuratorInventoryManager case CHILD_UPDATED: synchronized (lock) { final ChildData child = event.getData(); + + byte[] data = getZkDataForNode(child.getPath()); + if (data == null) { + log.info("Ignoring event: Type - %s , Path - %s , Version - %s", + event.getType(), + child.getPath(), + child.getStat().getVersion()); + return; + } + final String containerKey = ZKPaths.getNodeFromPath(child.getPath()); - final ContainerClass container = strategy.deserializeContainer(child.getData()); + final ContainerClass container = strategy.deserializeContainer(data); log.debug("Container[%s] updated.", child.getPath()); ContainerHolder holder = containers.get(containerKey); @@ -335,10 +367,20 @@ public class CuratorInventoryManager switch (event.getType()) { case CHILD_ADDED: { final ChildData child = event.getData(); + + byte[] data = getZkDataForNode(child.getPath()); + if (data == null) { + log.info("Ignoring event: Type - %s , Path - %s , Version - %s", + event.getType(), + child.getPath(), + child.getStat().getVersion()); + return; + } + final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); log.debug("CHILD_ADDED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion()); - final InventoryClass addedInventory = strategy.deserializeInventory(child.getData()); + final InventoryClass addedInventory = strategy.deserializeInventory(data); synchronized (holder) { holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory)); @@ -348,10 +390,20 @@ public class CuratorInventoryManager case CHILD_UPDATED: { final ChildData child = event.getData(); + + byte[] data = getZkDataForNode(child.getPath()); + if (data == null) { + log.info("Ignoring event: Type - %s , Path - %s , Version - %s", + event.getType(), + child.getPath(), + child.getStat().getVersion()); + return; + } + final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath()); log.debug("CHILD_UPDATED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion()); - final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData()); + final InventoryClass updatedInventory = strategy.deserializeInventory(data); synchronized (holder) { holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));