mirror of https://github.com/apache/druid.git
Merge pull request #1161 from himanshug/zk_ood_updates
Fix to maintain correctness when out-of-order ZK updates are received
This commit is contained in:
commit
d8e199a3f5
|
@ -81,7 +81,10 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
||||||
this.containers = new MapMaker().makeMap();
|
this.containers = new MapMaker().makeMap();
|
||||||
this.uninitializedInventory = Sets.newConcurrentHashSet();
|
this.uninitializedInventory = Sets.newConcurrentHashSet();
|
||||||
|
|
||||||
this.cacheFactory = new SimplePathChildrenCacheFactory(true, true, new ShutdownNowIgnoringExecutorService(exec));
|
//NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
|
||||||
|
//this is a workaround to solve curator's out-of-order events problem
|
||||||
|
//https://issues.apache.org/jira/browse/CURATOR-191
|
||||||
|
this.cacheFactory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec));
|
||||||
}
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
|
@ -163,6 +166,15 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private byte[] getZkDataForNode(String path) {
|
||||||
|
try {
|
||||||
|
return curatorFramework.getData().decompressed().forPath(path);
|
||||||
|
} catch(Exception ex) {
|
||||||
|
log.warn(ex, "Exception while getting data for node %s", path);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class ContainerHolder
|
private class ContainerHolder
|
||||||
{
|
{
|
||||||
private final AtomicReference<ContainerClass> container;
|
private final AtomicReference<ContainerClass> container;
|
||||||
|
@ -206,9 +218,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
||||||
case CHILD_ADDED:
|
case CHILD_ADDED:
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
final ChildData child = event.getData();
|
final ChildData child = event.getData();
|
||||||
|
|
||||||
|
byte[] data = getZkDataForNode(child.getPath());
|
||||||
|
if(data == null) {
|
||||||
|
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||||
|
event.getType(),
|
||||||
|
child.getPath(),
|
||||||
|
child.getStat().getVersion());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||||
|
|
||||||
final ContainerClass container = strategy.deserializeContainer(child.getData());
|
final ContainerClass container = strategy.deserializeContainer(data);
|
||||||
|
|
||||||
// This would normally be a race condition, but the only thing that should be mutating the containers
|
// This would normally be a race condition, but the only thing that should be mutating the containers
|
||||||
// map is this listener, which should never run concurrently. If the same container is going to disappear
|
// map is this listener, which should never run concurrently. If the same container is going to disappear
|
||||||
|
@ -257,9 +279,19 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
||||||
case CHILD_UPDATED:
|
case CHILD_UPDATED:
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
final ChildData child = event.getData();
|
final ChildData child = event.getData();
|
||||||
|
|
||||||
|
byte[] data = getZkDataForNode(child.getPath());
|
||||||
|
if (data == null) {
|
||||||
|
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||||
|
event.getType(),
|
||||||
|
child.getPath(),
|
||||||
|
child.getStat().getVersion());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
final String containerKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||||
|
|
||||||
final ContainerClass container = strategy.deserializeContainer(child.getData());
|
final ContainerClass container = strategy.deserializeContainer(data);
|
||||||
|
|
||||||
log.debug("Container[%s] updated.", child.getPath());
|
log.debug("Container[%s] updated.", child.getPath());
|
||||||
ContainerHolder holder = containers.get(containerKey);
|
ContainerHolder holder = containers.get(containerKey);
|
||||||
|
@ -335,10 +367,20 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case CHILD_ADDED: {
|
case CHILD_ADDED: {
|
||||||
final ChildData child = event.getData();
|
final ChildData child = event.getData();
|
||||||
|
|
||||||
|
byte[] data = getZkDataForNode(child.getPath());
|
||||||
|
if (data == null) {
|
||||||
|
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||||
|
event.getType(),
|
||||||
|
child.getPath(),
|
||||||
|
child.getStat().getVersion());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||||
log.debug("CHILD_ADDED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion());
|
log.debug("CHILD_ADDED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion());
|
||||||
|
|
||||||
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
|
final InventoryClass addedInventory = strategy.deserializeInventory(data);
|
||||||
|
|
||||||
synchronized (holder) {
|
synchronized (holder) {
|
||||||
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
|
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
|
||||||
|
@ -348,10 +390,20 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
||||||
|
|
||||||
case CHILD_UPDATED: {
|
case CHILD_UPDATED: {
|
||||||
final ChildData child = event.getData();
|
final ChildData child = event.getData();
|
||||||
|
|
||||||
|
byte[] data = getZkDataForNode(child.getPath());
|
||||||
|
if (data == null) {
|
||||||
|
log.info("Ignoring event: Type - %s , Path - %s , Version - %s",
|
||||||
|
event.getType(),
|
||||||
|
child.getPath(),
|
||||||
|
child.getStat().getVersion());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
final String inventoryKey = ZKPaths.getNodeFromPath(child.getPath());
|
||||||
log.debug("CHILD_UPDATED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion());
|
log.debug("CHILD_UPDATED[%s] with version[%s]", child.getPath(), event.getData().getStat().getVersion());
|
||||||
|
|
||||||
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
|
final InventoryClass updatedInventory = strategy.deserializeInventory(data);
|
||||||
|
|
||||||
synchronized (holder) {
|
synchronized (holder) {
|
||||||
holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
|
holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
|
||||||
|
|
Loading…
Reference in New Issue