From 447e401912100ab9ed73d442e1e8c0c0bb229725 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Mon, 1 Feb 2016 15:26:05 -0500 Subject: [PATCH] NIFI-259: Extending the StateProvider interface to provide a getSupportedScopes method and implemented this based on the capabilities of each of its implementations. Used supported scope to evaluated configurations at startup and prevent issues when trying to make use of state --- .../apache/nifi/components/state/StateProvider.java | 6 ++++++ .../state/manager/StandardStateManagerProvider.java | 7 +++++++ .../local/WriteAheadLocalStateProvider.java | 8 +++++++- .../providers/zookeeper/ZooKeeperStateProvider.java | 13 ++++++++++++- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java index e0243f326d..e1e4352721 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java @@ -124,4 +124,10 @@ public interface StateProvider extends ConfigurableComponent { * @return true if the provider is enabled, false otherwise. */ boolean isEnabled(); + + /** + * Provides a listing of {@link Scope}s supported by the StateProvider + * @return the {@link Scope}s supported by the configuration + */ + Scope[] getSupportedScopes(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index b7671f6c79..ef46f55dfc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import javax.net.ssl.SSLContext; +import org.apache.commons.lang3.ArrayUtils; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -159,6 +160,12 @@ public class StandardStateManagerProvider implements StateManagerProvider { throw new RuntimeException("Cannot create " + providerDescription + " of type " + providerClassName, e); } + if (!ArrayUtils.contains(provider.getSupportedScopes(), scope)) { + throw new RuntimeException("Cannot use " + providerDescription + " ("+providerClassName+") as it only supports scope(s) " + ArrayUtils.toString(provider.getSupportedScopes()) + " but " + + "instance" + + " is configured to use scope " + scope); + } + final Map propertyMap = new HashMap<>(); final Map propertyStringMap = new HashMap<>(); for (final PropertyDescriptor descriptor : provider.getPropertyDescriptors()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java index 7d16a10fbe..da46ed065c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.controller.state.StandardStateMap; @@ -48,7 +49,7 @@ import org.wali.UpdateType; import org.wali.WriteAheadRepository; /** - * Provides state management for local (node-only) state, backed by a write-ahead log + * Provides state management for local (standalone) state, backed by a write-ahead log */ public class WriteAheadLocalStateProvider extends AbstractStateProvider { private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); @@ -180,6 +181,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider { componentProviders.remove(componentId); } + @Override + public Scope[] getSupportedScopes() { + return new Scope[]{Scope.LOCAL}; + } + private static class ComponentProvider { private final AtomicLong versionGenerator; private final WriteAheadRepository wal; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java index 8ce74b435a..5c8b4c4eda 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -37,6 +37,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.controller.state.StandardStateMap; @@ -54,6 +55,11 @@ import org.apache.zookeeper.client.ConnectStringParser; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +/** + * ZooKeeperStateProvider utilizes a ZooKeeper based store, whether provided internally via configuration and enabling of the {@link org.apache.nifi.controller.state.server.ZooKeeperStateServer} + * or through an externally configured location. This implementation caters to a clustered NiFi environment and accordingly only provides {@link Scope#CLUSTER} scoping to enforce + * consistency across configuration interactions. + */ public class ZooKeeperStateProvider extends AbstractStateProvider { static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client."); static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly", @@ -228,7 +234,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { private void verifyEnabled() throws IOException { if (!isEnabled()) { - throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster"); + throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster."); } } @@ -254,6 +260,11 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { } } + @Override + public Scope[] getSupportedScopes() { + return new Scope[]{Scope.CLUSTER}; + } + @Override public void setState(final Map state, final String componentId) throws IOException { setState(state, -1, componentId);