NIFI-259: Fixed bug that caused StateProvider.replace to return true if the value had never been set, instead of false. Fixed typos in administration-guide

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Aldrin Piri 2016-02-01 16:44:56 -05:00
parent 35d2b921ea
commit 8a668fd344
4 changed files with 83 additions and 14 deletions

View File

@ -410,9 +410,9 @@ If `CreatorOnly` is specified, then only the user that created the data is allow
In order to use the `CreatorOnly` option, NiFi must provide some form of authentication. See the <<zk_access_control>> In order to use the `CreatorOnly` option, NiFi must provide some form of authentication. See the <<zk_access_control>>
section below for more information on how to configure authentication. section below for more information on how to configure authentication.
If NiFi is configured to run in a standalone mode, the `cluster-state-provider` element need not be populated in the _state-management.xml_ If NiFi is configured to run in a standalone mode, the `cluster-provider` element need not be populated in the _state-management.xml_
file and will actually be ignored if they are populated. However, the `local-state-provider` element must always be present and populated. file and will actually be ignored if they are populated. However, the `local-provider` element must always be present and populated.
Additionally, if NiFi is run in a cluster, each node must also have the `cluster-state-provider` element present and properly configured. Additionally, if NiFi is run in a cluster, each node must also have the `cluster-provider` element present and properly configured.
Otherwise, NiFi will fail to startup. Otherwise, NiFi will fail to startup.
While there are not many properties that need to be configured for these providers, they were externalized into a separate _state-providers.xml_ While there are not many properties that need to be configured for these providers, they were externalized into a separate _state-providers.xml_

View File

@ -216,6 +216,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
// see above explanation as to why this method is synchronized. // see above explanation as to why this method is synchronized.
public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue) throws IOException { public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue) throws IOException {
if (stateMap.getVersion() == -1L) {
// state has never been set so return false
return false;
}
if (stateMap != oldValue) { if (stateMap != oldValue) {
return false; return false;
} }

View File

@ -46,6 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZKUtil;
@ -322,6 +323,28 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
} }
private void setState(final Map<String, String> stateValues, final int version, final String componentId) throws IOException { private void setState(final Map<String, String> stateValues, final int version, final String componentId) throws IOException {
try {
setState(stateValues, version, componentId, true);
} catch (final NoNodeException nne) {
// should never happen because we are passing 'true' for allowNodeCreation
throw new IOException("Unable to create Node in ZooKeeper to set state for component with ID " + componentId, nne);
}
}
/**
* Sets the component state to the given stateValues if and only if the version is equal to the version currently
* tracked by ZooKeeper (or if the version is -1, in which case the state will be updated regardless of the version).
*
* @param stateValues the new values to set
* @param version the expected version of the ZNode
* @param componentId the ID of the component whose state is being updated
* @param allowNodeCreation if <code>true</code> and the corresponding ZNode does not exist in ZooKeeper, it will be created; if <code>false</code>
* and the corresponding node does not exist in ZooKeeper, a {@link KeeperException.NoNodeException} will be thrown
*
* @throws IOException if unable to communicate with ZooKeeper
* @throws NoNodeException if the corresponding ZNode does not exist in ZooKeeper and allowNodeCreation is set to <code>false</code>
*/
private void setState(final Map<String, String> stateValues, final int version, final String componentId, final boolean allowNodeCreation) throws IOException, NoNodeException {
verifyEnabled(); verifyEnabled();
try { try {
@ -331,31 +354,31 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
final ZooKeeper keeper = getZooKeeper(); final ZooKeeper keeper = getZooKeeper();
try { try {
keeper.setData(path, data, version); keeper.setData(path, data, version);
} catch (final KeeperException ke) { } catch (final NoNodeException nne) {
final Code exceptionCode = ke.code(); if (allowNodeCreation) {
if (exceptionCode == Code.NONODE) {
createNode(path, data); createNode(path, data);
return; return;
} else { } else {
throw ke; throw nne;
} }
} }
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", e); throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", e);
} catch (final NoNodeException nne) {
throw nne;
} catch (final KeeperException ke) { } catch (final KeeperException ke) {
final Code exceptionCode = ke.code(); if (Code.SESSIONEXPIRED == ke.code()) {
if (Code.SESSIONEXPIRED == exceptionCode) {
invalidateClient(); invalidateClient();
setState(stateValues, version, componentId); setState(stateValues, version, componentId, allowNodeCreation);
return; return;
} }
if (Code.NODEEXISTS == exceptionCode) { if (Code.NODEEXISTS == ke.code()) {
setState(stateValues, version, componentId); setState(stateValues, version, componentId, allowNodeCreation);
return; return;
} }
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " with exception code " + exceptionCode, ke); throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke);
} catch (final IOException ioe) { } catch (final IOException ioe) {
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe); throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe);
} }
@ -436,8 +459,10 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
verifyEnabled(); verifyEnabled();
try { try {
setState(newValue, (int) oldValue.getVersion(), componentId); setState(newValue, (int) oldValue.getVersion(), componentId, false);
return true; return true;
} catch (final NoNodeException nne) {
return false;
} catch (final IOException ioe) { } catch (final IOException ioe) {
final Throwable cause = ioe.getCause(); final Throwable cause = ioe.getCause();
if (cause != null && cause instanceof KeeperException) { if (cause != null && cause instanceof KeeperException) {

View File

@ -148,6 +148,45 @@ public abstract class AbstractTestStateProvider {
assertTrue(stateMap.toMap().isEmpty()); assertTrue(stateMap.toMap().isEmpty());
} }
@Test
public void testReplaceWithNonExistingValue() throws Exception {
final StateProvider provider = getProvider();
StateMap stateMap = provider.getState(componentId);
assertNotNull(stateMap);
final Map<String, String> newValue = new HashMap<>();
newValue.put("value", "value");
final boolean replaced = provider.replace(stateMap, newValue, componentId);
assertFalse(replaced);
}
@Test
public void testReplaceWithNonExistingValueAndVersionGreaterThanNegativeOne() throws Exception {
final StateProvider provider = getProvider();
final StateMap stateMap = new StateMap() {
@Override
public long getVersion() {
return 4;
}
@Override
public String get(String key) {
return null;
}
@Override
public Map<String, String> toMap() {
return Collections.emptyMap();
}
};
final Map<String, String> newValue = new HashMap<>();
newValue.put("value", "value");
final boolean replaced = provider.replace(stateMap, newValue, componentId);
assertFalse(replaced);
}
protected abstract StateProvider getProvider(); protected abstract StateProvider getProvider();
} }