mirror of https://github.com/apache/druid.git
on receiving ZK ADD/UPDATE events, get latest data from zookeeper instead of taking it from the event which might be stale due to event coming out of order etc
This commit is contained in:
parent
b167dcf82c
commit
dda2a62ff5
|
@ -81,7 +81,10 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
this.containers = new MapMaker().makeMap();
|
||||
this.uninitializedInventory = Sets.newConcurrentHashSet();
|
||||
|
||||
this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec));
|
||||
//NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
|
||||
//this is a workaround to solve curator's out-of-order events problem
|
||||
//https://issues.apache.org/jira/browse/CURATOR-191
|
||||
this.cacheFactory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec));
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
@ -163,6 +166,15 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
);
|
||||
}
|
||||
|
||||
private byte[] getZkDataForNode(String path) {
|
||||
try {
|
||||
return curatorFramework.getData().decompressed().forPath(path);
|
||||
} catch(Exception ex) {
|
||||
log.warn(ex, "Exception while getting data for node %s", path);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private class ContainerHolder
|
||||
{
|
||||
private final AtomicReference<ContainerClass> container;
|
||||
|
@ -206,9 +218,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
case CHILD_ADDED:
|
||||
synchronized (lock) {
|
||||
final ChildData child = event.getData();
|
||||
|
||||
byte[] data = getZkDataForNode(child.getPath());
|
||||
if(data == null) {
|
||||
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||
event.getType(),
|
||||
child.getPath(),
|
||||
child.getStat().getVersion());
|
||||
return;
|
||||
}
|
||||
|
||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final ContainerClass container = strategy.deserializeContainer(child.getData());
|
||||
final ContainerClass container = strategy.deserializeContainer(data);
|
||||
|
||||
// This would normally be a race condition, but the only thing that should be mutating the containers
|
||||
// map is this listener, which should never run concurrently. If the same container is going to disappear
|
||||
|
@ -257,9 +279,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
case CHILD_UPDATED:
|
||||
synchronized (lock) {
|
||||
final ChildData child = event.getData();
|
||||
|
||||
byte[] data = getZkDataForNode(child.getPath());
|
||||
if (data == null) {
|
||||
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||
event.getType(),
|
||||
child.getPath(),
|
||||
child.getStat().getVersion());
|
||||
return;
|
||||
}
|
||||
|
||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
|
||||
final ContainerClass container = strategy.deserializeContainer(child.getData());
|
||||
final ContainerClass container = strategy.deserializeContainer(data);
|
||||
|
||||
log.debug("Container[%s] updated.", child.getPath());
|
||||
ContainerHolder holder = containers.get(containerKey);
|
||||
|
@ -335,10 +367,20 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
switch (event.getType()) {
|
||||
case CHILD_ADDED: {
|
||||
final ChildData child = event.getData();
|
||||
|
||||
byte[] data = getZkDataForNode(child.getPath());
|
||||
if (data == null) {
|
||||
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||
event.getType(),
|
||||
child.getPath(),
|
||||
child.getStat().getVersion());
|
||||
return;
|
||||
}
|
||||
|
||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
log.debug("CHILD_ADDED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion());
|
||||
|
||||
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
|
||||
final InventoryClass addedInventory = strategy.deserializeInventory(data);
|
||||
|
||||
synchronized (holder) {
|
||||
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
|
||||
|
@ -348,10 +390,20 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
|
||||
case CHILD_UPDATED: {
|
||||
final ChildData child = event.getData();
|
||||
|
||||
byte[] data = getZkDataForNode(child.getPath());
|
||||
if (data == null) {
|
||||
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||
event.getType(),
|
||||
child.getPath(),
|
||||
child.getStat().getVersion());
|
||||
return;
|
||||
}
|
||||
|
||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||
log.debug("CHILD_UPDATED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion());
|
||||
|
||||
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
|
||||
final InventoryClass updatedInventory = strategy.deserializeInventory(data);
|
||||
|
||||
synchronized (holder) {
|
||||
holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
|
||||
|
|
Loading…
Reference in New Issue