diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index e4df2b6943..14815b98c2 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -410,9 +410,9 @@ If `CreatorOnly` is specified, then only the user that created the data is allow In order to use the `CreatorOnly` option, NiFi must provide some form of authentication. See the <> section below for more information on how to configure authentication. -If NiFi is configured to run in a standalone mode, the `cluster-state-provider` element need not be populated in the _state-management.xml_ -file and will actually be ignored if they are populated. However, the `local-state-provider` element must always be present and populated. -Additionally, if NiFi is run in a cluster, each node must also have the `cluster-state-provider` element present and properly configured. +If NiFi is configured to run in a standalone mode, the `cluster-provider` element need not be populated in the _state-management.xml_ +file and will actually be ignored if they are populated. However, the `local-provider` element must always be present and populated. +Additionally, if NiFi is run in a cluster, each node must also have the `cluster-provider` element present and properly configured. Otherwise, NiFi will fail to startup. While there are not many properties that need to be configured for these providers, they were externalized into a separate _state-providers.xml_ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java index da46ed065c..fc691fb181 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java @@ -216,6 +216,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { // see above explanation as to why this method is synchronized. public synchronized boolean replace(final StateMap oldValue, final Map newValue) throws IOException { + if (stateMap.getVersion() == -1L) { + // state has never been set so return false + return false; + } + if (stateMap != oldValue) { return false; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java index 27727a7d7d..acc0bc3f51 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -46,6 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZKUtil; @@ -322,6 +323,28 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { } private void setState(final Map stateValues, final int version, final String componentId) throws IOException { + try { + setState(stateValues, version, componentId, true); + } catch (final NoNodeException nne) { + // should never happen because we are passing 'true' for allowNodeCreation + throw new IOException("Unable to create Node in ZooKeeper to set state for component with ID " + componentId, nne); + } + } + + /** + * Sets the component state to the given stateValues if and only if the version is equal to the version currently + * tracked by ZooKeeper (or if the version is -1, in which case the state will be updated regardless of the version). + * + * @param stateValues the new values to set + * @param version the expected version of the ZNode + * @param componentId the ID of the component whose state is being updated + * @param allowNodeCreation if true and the corresponding ZNode does not exist in ZooKeeper, it will be created; if false + * and the corresponding node does not exist in ZooKeeper, a {@link KeeperException.NoNodeException} will be thrown + * + * @throws IOException if unable to communicate with ZooKeeper + * @throws NoNodeException if the corresponding ZNode does not exist in ZooKeeper and allowNodeCreation is set to false + */ + private void setState(final Map stateValues, final int version, final String componentId, final boolean allowNodeCreation) throws IOException, NoNodeException { verifyEnabled(); try { @@ -331,31 +354,31 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { final ZooKeeper keeper = getZooKeeper(); try { keeper.setData(path, data, version); - } catch (final KeeperException ke) { - final Code exceptionCode = ke.code(); - if (exceptionCode == Code.NONODE) { + } catch (final NoNodeException nne) { + if (allowNodeCreation) { createNode(path, data); return; } else { - throw ke; + throw nne; } } } catch (final InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", e); + } catch (final NoNodeException nne) { + throw nne; } catch (final KeeperException ke) { - final Code exceptionCode = ke.code(); - if (Code.SESSIONEXPIRED == exceptionCode) { + if (Code.SESSIONEXPIRED == ke.code()) { invalidateClient(); - setState(stateValues, version, componentId); + setState(stateValues, version, componentId, allowNodeCreation); return; } - if (Code.NODEEXISTS == exceptionCode) { - setState(stateValues, version, componentId); + if (Code.NODEEXISTS == ke.code()) { + setState(stateValues, version, componentId, allowNodeCreation); return; } - throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " with exception code " + exceptionCode, ke); + throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke); } catch (final IOException ioe) { throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe); } @@ -436,8 +459,10 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { verifyEnabled(); try { - setState(newValue, (int) oldValue.getVersion(), componentId); + setState(newValue, (int) oldValue.getVersion(), componentId, false); return true; + } catch (final NoNodeException nne) { + return false; } catch (final IOException ioe) { final Throwable cause = ioe.getCause(); if (cause != null && cause instanceof KeeperException) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java index d724be0815..1cd1f37792 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java @@ -148,6 +148,45 @@ public abstract class AbstractTestStateProvider { assertTrue(stateMap.toMap().isEmpty()); } + @Test + public void testReplaceWithNonExistingValue() throws Exception { + final StateProvider provider = getProvider(); + StateMap stateMap = provider.getState(componentId); + assertNotNull(stateMap); + + final Map newValue = new HashMap<>(); + newValue.put("value", "value"); + + final boolean replaced = provider.replace(stateMap, newValue, componentId); + assertFalse(replaced); + } + + @Test + public void testReplaceWithNonExistingValueAndVersionGreaterThanNegativeOne() throws Exception { + final StateProvider provider = getProvider(); + final StateMap stateMap = new StateMap() { + @Override + public long getVersion() { + return 4; + } + + @Override + public String get(String key) { + return null; + } + + @Override + public Map toMap() { + return Collections.emptyMap(); + } + }; + + final Map newValue = new HashMap<>(); + newValue.put("value", "value"); + + final boolean replaced = provider.replace(stateMap, newValue, componentId); + assertFalse(replaced); + } protected abstract StateProvider getProvider(); }