NIFI-259: Extending the StateProvider interface to provide a getSupportedScopes method and implemented this based on the capabilities of each of its implementations. Used supported scope to evaluated configurations at startup and prevent issues when trying to make use of state

This commit is contained in:
Aldrin Piri 2016-02-01 15:26:05 -05:00
parent 257eca9c46
commit 447e401912
4 changed files with 32 additions and 2 deletions

View File

@ -124,4 +124,10 @@ public interface StateProvider extends ConfigurableComponent {
* @return <code>true</code> if the provider is enabled, <code>false</code> otherwise. * @return <code>true</code> if the provider is enabled, <code>false</code> otherwise.
*/ */
boolean isEnabled(); boolean isEnabled();
/**
* Provides a listing of {@link Scope}s supported by the StateProvider
* @return the {@link Scope}s supported by the configuration
*/
Scope[] getSupportedScopes();
} }

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
@ -159,6 +160,12 @@ public class StandardStateManagerProvider implements StateManagerProvider {
throw new RuntimeException("Cannot create " + providerDescription + " of type " + providerClassName, e); throw new RuntimeException("Cannot create " + providerDescription + " of type " + providerClassName, e);
} }
if (!ArrayUtils.contains(provider.getSupportedScopes(), scope)) {
throw new RuntimeException("Cannot use " + providerDescription + " ("+providerClassName+") as it only supports scope(s) " + ArrayUtils.toString(provider.getSupportedScopes()) + " but " +
"instance"
+ " is configured to use scope " + scope);
}
final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>(); final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>();
final Map<PropertyDescriptor, String> propertyStringMap = new HashMap<>(); final Map<PropertyDescriptor, String> propertyStringMap = new HashMap<>();
for (final PropertyDescriptor descriptor : provider.getPropertyDescriptors()) { for (final PropertyDescriptor descriptor : provider.getPropertyDescriptors()) {

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.controller.state.StandardStateMap; import org.apache.nifi.controller.state.StandardStateMap;
@ -48,7 +49,7 @@ import org.wali.UpdateType;
import org.wali.WriteAheadRepository; import org.wali.WriteAheadRepository;
/** /**
* Provides state management for local (node-only) state, backed by a write-ahead log * Provides state management for local (standalone) state, backed by a write-ahead log
*/ */
public class WriteAheadLocalStateProvider extends AbstractStateProvider { public class WriteAheadLocalStateProvider extends AbstractStateProvider {
private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
@ -180,6 +181,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
componentProviders.remove(componentId); componentProviders.remove(componentId);
} }
@Override
public Scope[] getSupportedScopes() {
return new Scope[]{Scope.LOCAL};
}
private static class ComponentProvider { private static class ComponentProvider {
private final AtomicLong versionGenerator; private final AtomicLong versionGenerator;
private final WriteAheadRepository<StateMapUpdate> wal; private final WriteAheadRepository<StateMapUpdate> wal;

View File

@ -37,6 +37,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.controller.state.StandardStateMap; import org.apache.nifi.controller.state.StandardStateMap;
@ -54,6 +55,11 @@ import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
/**
* ZooKeeperStateProvider utilizes a ZooKeeper based store, whether provided internally via configuration and enabling of the {@link org.apache.nifi.controller.state.server.ZooKeeperStateServer}
* or through an externally configured location. This implementation caters to a clustered NiFi environment and accordingly only provides {@link Scope#CLUSTER} scoping to enforce
* consistency across configuration interactions.
*/
public class ZooKeeperStateProvider extends AbstractStateProvider { public class ZooKeeperStateProvider extends AbstractStateProvider {
static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client."); static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client.");
static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly", static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly",
@ -228,7 +234,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
private void verifyEnabled() throws IOException { private void verifyEnabled() throws IOException {
if (!isEnabled()) { if (!isEnabled()) {
throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster"); throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster.");
} }
} }
@ -254,6 +260,11 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
} }
} }
@Override
public Scope[] getSupportedScopes() {
return new Scope[]{Scope.CLUSTER};
}
@Override @Override
public void setState(final Map<String, String> state, final String componentId) throws IOException { public void setState(final Map<String, String> state, final String componentId) throws IOException {
setState(state, -1, componentId); setState(state, -1, componentId);