NIFI-1626: Throw an Exception proactively if too much state is attempting to be stored via ZooKeeperStateProvider

NIFI-1626: Updated State Management section of Developer Guide

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-03-16 13:19:33 -04:00 committed by jpercivall
parent 98395de74f
commit a7b97419e5
5 changed files with 118 additions and 27 deletions

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.exception.StateTooLargeException;
/**
* <p>
@ -60,6 +61,7 @@ public interface StateManager {
* @param state the value to change the state to
* @param scope the scope to use when storing the state
*
* @throws StateTooLargeException if attempting to store more state than is allowed by the backing storage mechanism
* @throws IOException if unable to communicate with the underlying storage mechanism
*/
void setState(Map<String, String> state, Scope scope) throws IOException;
@ -84,6 +86,7 @@ public interface StateManager {
* @return <code>true</code> if the state was updated to the new value, <code>false</code> if the state's value was not
* equal to oldValue
*
* @throws StateTooLargeException if attempting to store more state than is allowed by the backing storage mechanism
* @throws IOException if unable to communicate with the underlying storage mechanism
*/
boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) throws IOException;

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.components.state.exception;
import java.io.IOException;
/**
* Thrown when attempting to store state via the {@link StateManager} but the state being
* stored is larger than is allowed by the backing storage mechanism.
*/
public class StateTooLargeException extends IOException {
private static final long serialVersionUID = 1L;
public StateTooLargeException(final String message) {
super(message);
}
}

View File

@ -570,20 +570,28 @@ relied upon for critical business logic.
From the ProcessContext, ReportingContext, and ControllerServiceInitializationContext, components are
able to call the `getStateManager()` method. This State Manager is responsible for providing a simple API
for storing and retrieving state. As such, the API is designed to be quite similar to the ConcurrentMap
API, which most Java developers are already familiar with.
for storing and retrieving state. This mechanism is intended to provide developers with the ability to
very easily store a set of key/value pairs, retrieve those values, and update them atomically. The state
can be stored local to the node or across all nodes in a cluster. It is important to note, however, that
this mechanism is intended only to provide a mechanism for storing very 'simple' state. As such, the API
simply allows a `Map<String, String>` to be stored and retrieved and for the entire Map to be atomically
replaced. Moreover, the only implementation that is currently supported for storing cluster-wide state is
backed by ZooKeeper. As such, the entire State Map must be less than 1 MB in size, after being serialized.
Attempting to store more than this will result in an Exception being thrown. If the interactions required
by the Processor for managing state are more complex than this (e.g., large amounts of data must be stored
and retrieved, or individual keys must be stored and fetched individually) than a different mechanism should
be used (e.g., communicating with an external database).
[[state_scope]]
==== Scope
One very notable difference between the StateManager API and the ConcurrentMap API, however, is the presence
of a Scope object on each method call of the StateManager. This Scope will either be `Scope.NODE` or `Scope.CLUSTER`.
If NiFi is run in a cluster, this Scope provides important information to the framework about how the operation should
occur.
When communicating with the State Manager, all method calls require that a Scope be provided. This Scope will
either be `Scope.NODE` or `Scope.CLUSTER`. If NiFi is run in a cluster, this Scope provides important information
to the framework about how the operation should occur.
If state as stored using `Scope.CLUSTER`, then all nodes in the cluster will be communicating with the same
state storage mechanism, as if all nodes were to share a single ConcurrentMap. If state is stored and retrieved using
`Scope.NODE`, then each node will see a different representation of the state.
state storage mechanism. If state is stored and retrieved using `Scope.NODE`, then each node will see a different
representation of the state.
It is also worth noting that if NiFi is configured to run as a standalone instance, rather than running in a cluster,
a scope of `Scope.NODE` is always used. This is done in order to allow the developer of a NiFi component to write the code
@ -593,13 +601,12 @@ that the instance is clustered and write the code accordingly.
==== Storing and Retrieving State
State is stored using the StateManager's `set`, `replace`, `putIfAbsent`, `remove`, and `clear` methods. All of these methods,
with the exception of `clear` take as the first argument the key to be set. The key that is used is unique only to the same
instance of the component and for the same Scope. That is, if two Processors store a value using the key _My Key_, those Processors
will not conflict with each other, even if both Processors are of the same type (e.g., both are of type ListFile) and scope. Furthermore,
if a Processor stores a value with the key of _My Key_ using the `Scope.CLUSTER` scope, and then attempts to retrieve the value
using the `Scope.NODE` scope, the value retrieved will be `null`. Each Processor's state, then, is stored in isolation from other
Processors' state. A unique key can be thought of as a triple of <Processor Instance, Key, Scope>.
State is stored using the StateManager's `getState`, `setState`, `replace`, and `clear` methods. All of these methods
require that a Scope be provided. It should be noted that the state that is stored with the Local scope is entirely different
than state stored with a Cluster scope. If a Processor stores a value with the key of _My Key_ using the `Scope.CLUSTER` scope,
and then attempts to retrieve the value using the `Scope.NODE` scope, the value retrieved will be `null` (unless a value was
also stored with the same key using the `Scope.CLUSTER` scope). Each Processor's state, is stored in isolation from other
Processors' state.
It follows, then, that two Processors cannot share the same state. There are, however, some circumstances in which it is very
necessary to share state between two Processors of different types, or two Processors of the same type. This can be accomplished
@ -615,13 +622,9 @@ defined by the `StateManager` interface, that help developers to more easily dev
First, the `MockStateManager` implements the `StateManager` interface, so all of the state can be examined from within a unit test.
Additionally, the `MockStateManager` exposes a handful of `assert*` methods to perform assertions that the State is set as expected.
There are times, however, that state could be updated multiple times during the run of a single invocation of a Processor's `onTrigger`
method. In this case, inspecting the values after running the Processor may not be sufficient. Additionally, we must always remember at each
step to check the value of the stored state, which can become error-prone and burdensome for the developer.
The `MockStateManager` also provides the ability to indicate that the unit test should immediately fail if state is updated for a particular
`Scope`.
For these reasons, the `MockStateManager` provides an additional method, named `failIfStateSet`. This method instructs the State Manager that
the unit test should immediately fail if the state for a given key is ever set, or if it is set to a specific value. The `doNotFailIfStateSet`
method can then be used to instruct the Mock Framework to clear this state and allow state to be set to any value.

View File

@ -40,6 +40,7 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.components.state.exception.StateTooLargeException;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.controller.state.providers.AbstractStateProvider;
import org.apache.nifi.processor.util.StandardValidators;
@ -62,6 +63,8 @@ import org.apache.zookeeper.data.Stat;
* consistency across configuration interactions.
*/
public class ZooKeeperStateProvider extends AbstractStateProvider {
private static final int ONE_MB = 1024 * 1024;
static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client.");
static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly",
"ZNodes will be accessible only by the creator. The creator will have full access to create, read, write, delete, and administer the ZNodes.");
@ -343,6 +346,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
*
* @throws IOException if unable to communicate with ZooKeeper
* @throws NoNodeException if the corresponding ZNode does not exist in ZooKeeper and allowNodeCreation is set to <code>false</code>
* @throws StateTooLargeException if the state to be stored exceeds the maximum size allowed by ZooKeeper (1 MB, after serialization)
*/
private void setState(final Map<String, String> stateValues, final int version, final String componentId, final boolean allowNodeCreation) throws IOException, NoNodeException {
verifyEnabled();
@ -350,13 +354,18 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
try {
final String path = getComponentPath(componentId);
final byte[] data = serialize(stateValues);
if (data.length > ONE_MB) {
throw new StateTooLargeException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId
+ " because the state had " + stateValues.size() + " values, which serialized to " + data.length
+ " bytes, and the maximum allowed by ZooKeeper is 1 MB (" + ONE_MB + " bytes)");
}
final ZooKeeper keeper = getZooKeeper();
try {
keeper.setData(path, data, version);
} catch (final NoNodeException nne) {
if (allowNodeCreation) {
createNode(path, data);
createNode(path, data, componentId, stateValues);
return;
} else {
throw nne;
@ -379,14 +388,22 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
}
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke);
} catch (final StateTooLargeException stle) {
throw stle;
} catch (final IOException ioe) {
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe);
}
}
private void createNode(final String path, final byte[] data) throws IOException, KeeperException {
private void createNode(final String path, final byte[] data, final String componentId, final Map<String, String> stateValues) throws IOException, KeeperException {
try {
if (data != null && data.length > ONE_MB) {
throw new StateTooLargeException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId
+ " because the state had " + stateValues.size() + " values, which serialized to " + data.length
+ " bytes, and the maximum allowed by ZooKeeper is 1 MB (" + ONE_MB + " bytes)");
}
getZooKeeper().create(path, data, acl, CreateMode.PERSISTENT);
} catch (final InterruptedException ie) {
throw new IOException("Failed to update cluster-wide state due to interruption", ie);
@ -394,13 +411,13 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
final Code exceptionCode = ke.code();
if (Code.NONODE == exceptionCode) {
final String parentPath = StringUtils.substringBeforeLast(path, "/");
createNode(parentPath, null);
createNode(path, data);
createNode(parentPath, null, componentId, stateValues);
createNode(path, data, componentId, stateValues);
return;
}
if (Code.SESSIONEXPIRED == exceptionCode) {
invalidateClient();
createNode(path, data);
createNode(path, data, componentId, stateValues);
return;
}
@ -412,7 +429,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
} catch (final KeeperException ke1) {
// Node no longer exists -- it was removed by someone else. Go recreate the node.
if (ke1.code() == Code.NONODE) {
createNode(path, data);
createNode(path, data, componentId, stateValues);
return;
}
} catch (final InterruptedException ie) {

View File

@ -34,6 +34,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.components.state.exception.StateTooLargeException;
import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@ -202,4 +203,39 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
authorizedProvider.shutdown();
}
}
@Test
public void testStateTooLargeExceptionThrown() {
final Map<String, String> state = new HashMap<>();
final StringBuilder sb = new StringBuilder();
// Build a string that is a little less than 64 KB, because that's
// the largest value available for DataOutputStream.writeUTF
for (int i = 0; i < 6500; i++) {
sb.append("0123456789");
}
for (int i = 0; i < 20; i++) {
state.put("numbers." + i, sb.toString());
}
try {
getProvider().setState(state, componentId);
Assert.fail("Expected StateTooLargeException");
} catch (final StateTooLargeException stle) {
// expected behavior.
} catch (final Exception e) {
Assert.fail("Expected StateTooLargeException but " + e.getClass() + " was thrown", e);
}
try {
getProvider().replace(getProvider().getState(componentId), state, componentId);
Assert.fail("Expected StateTooLargeException");
} catch (final StateTooLargeException stle) {
// expected behavior.
} catch (final Exception e) {
Assert.fail("Expected StateTooLargeException");
}
}
}