mirror of https://github.com/apache/nifi.git
NIFI-259: Expose the ZK exception code in the event of a KeeperException within the ZooKeeperStateProvider.
This commit is contained in:
parent
447e401912
commit
a931e72787
|
@ -244,16 +244,17 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
ZKUtil.deleteRecursive(getZooKeeper(), getComponentPath(componentId));
|
ZKUtil.deleteRecursive(getZooKeeper(), getComponentPath(componentId));
|
||||||
} catch (final KeeperException ke) {
|
} catch (final KeeperException ke) {
|
||||||
// Node doesn't exist so just ignore
|
// Node doesn't exist so just ignore
|
||||||
if (Code.NONODE == ke.code()) {
|
final Code exceptionCode = ke.code();
|
||||||
|
if (Code.NONODE == exceptionCode) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (Code.SESSIONEXPIRED == ke.code()) {
|
if (Code.SESSIONEXPIRED == exceptionCode) {
|
||||||
invalidateClient();
|
invalidateClient();
|
||||||
onComponentRemoved(componentId);
|
onComponentRemoved(componentId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new IOException("Unable to remove state for component with ID '" + componentId + "' from ZooKeeper", ke);
|
throw new IOException("Unable to remove state for component with ID '" + componentId + " with exception code " + exceptionCode, ke);
|
||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new IOException("Failed to remove state for component with ID '" + componentId + "' from ZooKeeper due to being interrupted", e);
|
throw new IOException("Failed to remove state for component with ID '" + componentId + "' from ZooKeeper due to being interrupted", e);
|
||||||
|
@ -331,7 +332,8 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
try {
|
try {
|
||||||
keeper.setData(path, data, version);
|
keeper.setData(path, data, version);
|
||||||
} catch (final KeeperException ke) {
|
} catch (final KeeperException ke) {
|
||||||
if (ke.code() == Code.NONODE) {
|
final Code exceptionCode = ke.code();
|
||||||
|
if (exceptionCode == Code.NONODE) {
|
||||||
createNode(path, data);
|
createNode(path, data);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
@ -342,17 +344,18 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", e);
|
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", e);
|
||||||
} catch (final KeeperException ke) {
|
} catch (final KeeperException ke) {
|
||||||
if (Code.SESSIONEXPIRED == ke.code()) {
|
final Code exceptionCode = ke.code();
|
||||||
|
if (Code.SESSIONEXPIRED == exceptionCode) {
|
||||||
invalidateClient();
|
invalidateClient();
|
||||||
setState(stateValues, version, componentId);
|
setState(stateValues, version, componentId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (Code.NODEEXISTS == ke.code()) {
|
if (Code.NODEEXISTS == exceptionCode) {
|
||||||
setState(stateValues, version, componentId);
|
setState(stateValues, version, componentId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke);
|
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " with exception code " + exceptionCode, ke);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe);
|
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe);
|
||||||
}
|
}
|
||||||
|
@ -365,20 +368,21 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
} catch (final InterruptedException ie) {
|
} catch (final InterruptedException ie) {
|
||||||
throw new IOException("Failed to update cluster-wide state due to interruption", ie);
|
throw new IOException("Failed to update cluster-wide state due to interruption", ie);
|
||||||
} catch (final KeeperException ke) {
|
} catch (final KeeperException ke) {
|
||||||
if (ke.code() == Code.NONODE) {
|
final Code exceptionCode = ke.code();
|
||||||
|
if (Code.NONODE == exceptionCode) {
|
||||||
final String parentPath = StringUtils.substringBeforeLast(path, "/");
|
final String parentPath = StringUtils.substringBeforeLast(path, "/");
|
||||||
createNode(parentPath, null);
|
createNode(parentPath, null);
|
||||||
createNode(path, data);
|
createNode(path, data);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (Code.SESSIONEXPIRED == ke.code()) {
|
if (Code.SESSIONEXPIRED == exceptionCode) {
|
||||||
invalidateClient();
|
invalidateClient();
|
||||||
createNode(path, data);
|
createNode(path, data);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node already exists. Node must have been created by "someone else". Just set the data.
|
// Node already exists. Node must have been created by "someone else". Just set the data.
|
||||||
if (ke.code() == Code.NODEEXISTS) {
|
if (Code.NODEEXISTS == exceptionCode) {
|
||||||
try {
|
try {
|
||||||
getZooKeeper().setData(path, data, -1);
|
getZooKeeper().setData(path, data, -1);
|
||||||
return;
|
return;
|
||||||
|
@ -392,8 +396,6 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
throw new IOException("Failed to update cluster-wide state due to interruption", ie);
|
throw new IOException("Failed to update cluster-wide state due to interruption", ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
throw ke;
|
throw ke;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -413,15 +415,16 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + ", due to interruption", e);
|
throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + ", due to interruption", e);
|
||||||
} catch (final KeeperException ke) {
|
} catch (final KeeperException ke) {
|
||||||
if (ke.code() == Code.NONODE) {
|
final Code exceptionCode = ke.code();
|
||||||
|
if (Code.NONODE == exceptionCode) {
|
||||||
return new StandardStateMap(null, -1L);
|
return new StandardStateMap(null, -1L);
|
||||||
}
|
}
|
||||||
if (Code.SESSIONEXPIRED == ke.code()) {
|
if (Code.SESSIONEXPIRED == exceptionCode) {
|
||||||
invalidateClient();
|
invalidateClient();
|
||||||
return getState(componentId);
|
return getState(componentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ke);
|
throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + " with exception code " + exceptionCode, ke);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
// provide more context in the error message
|
// provide more context in the error message
|
||||||
throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ioe);
|
throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ioe);
|
||||||
|
|
Loading…
Reference in New Issue