From aabd4a25d2cb8b2a6108be6c16315acee08ee712 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 12 Jun 2017 15:53:20 -0400 Subject: [PATCH] NIFI-4043 Initial commit of nifi-redis-bundle NIFI-4061 Initial version of RedisStateProvider - Adding PropertyContext and updating existing contexts to extend it - Added embedded Redis for unit testing - Added wrapped StateProvider with NAR ClassLoader in StandardStateManagerProvider - Updating state-management.xml with config for RedisStateProvider - Renaming tests that use RedisServer to be IT tests so they don't run all the time This closes #1918. --- .../nifi/components/ValidationContext.java | 10 +- .../apache/nifi/context/PropertyContext.java | 41 ++ .../controller/AbstractControllerService.java | 18 +- .../nifi/controller/ConfigurationContext.java | 14 +- .../apache/nifi/processor/ProcessContext.java | 12 +- .../nifi/reporting/ReportingContext.java | 11 +- nifi-assembly/pom.xml | 10 + .../bootstrap/NotificationServiceManager.java | 6 + .../NotificationInitializationContext.java | 14 +- .../NotificationValidationContext.java | 10 + .../StateProviderInitializationContext.java | 12 +- .../nifi/util/MockConfigurationContext.java | 10 + .../apache/nifi/util/MockProcessContext.java | 9 + .../nifi/util/MockReportingContext.java | 10 + .../nifi/util/MockValidationContext.java | 10 + .../reporting/StandardReportingContext.java | 10 + .../scheduling/ConnectableProcessContext.java | 5 + .../service/StandardConfigurationContext.java | 10 + ...ardStateProviderInitializationContext.java | 10 + .../manager/StandardStateManagerProvider.java | 127 +++++- .../processor/StandardProcessContext.java | 10 + .../processor/StandardSchedulingContext.java | 10 + .../processor/StandardValidationContext.java | 10 + .../controller/TestStandardProcessorNode.java | 10 + .../TestWriteAheadLocalStateProvider.java | 10 + .../zookeeper/TestZooKeeperStateProvider.java | 10 + .../nifi/mock/MockConfigurationContext.java | 5 + .../apache/nifi/mock/MockProcessContext.java | 5 + .../main/resources/conf/state-management.xml | 68 +++ .../nifi-redis-extensions/pom.xml | 89 ++++ .../service/RedisConnectionPoolService.java | 93 ++++ ...RedisDistributedMapCacheClientService.java | 327 +++++++++++++ .../nifi/redis/state/RedisStateMap.java | 100 ++++ .../redis/state/RedisStateMapJsonSerDe.java | 85 ++++ .../nifi/redis/state/RedisStateMapSerDe.java | 44 ++ .../nifi/redis/state/RedisStateProvider.java | 299 ++++++++++++ .../apache/nifi/redis/util/RedisAction.java | 30 ++ .../apache/nifi/redis/util/RedisUtils.java | 428 ++++++++++++++++++ ...apache.nifi.components.state.StateProvider | 15 + ...g.apache.nifi.controller.ControllerService | 16 + .../redis/service/FakeRedisProcessor.java | 53 +++ ...RedisDistributedMapCacheClientService.java | 264 +++++++++++ .../TestRedisConnectionPoolService.java | 95 ++++ .../redis/state/ITRedisStateProvider.java | 318 +++++++++++++ .../state/TestRedisStateMapJsonSerDe.java | 79 ++++ .../nifi-redis-bundle/nifi-redis-nar/pom.xml | 46 ++ .../src/main/resources/META-INF/NOTICE | 32 ++ .../nifi-redis-service-api-nar/pom.xml | 46 ++ .../src/main/resources/META-INF/LICENSE | 239 ++++++++++ .../src/main/resources/META-INF/NOTICE | 33 ++ .../nifi-redis-service-api/pom.xml | 44 ++ .../nifi/redis/RedisConnectionPool.java | 44 ++ .../java/org/apache/nifi/redis/RedisType.java | 56 +++ nifi-nar-bundles/nifi-redis-bundle/pom.xml | 41 ++ .../standard/WaitNotifyProtocol.java | 10 +- nifi-nar-bundles/pom.xml | 1 + pom.xml | 12 + 57 files changed, 3372 insertions(+), 64 deletions(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapJsonSerDe.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapSerDe.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/FakeRedisProcessor.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateMapJsonSerDe.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/pom.xml create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisType.java create mode 100644 nifi-nar-bundles/nifi-redis-bundle/pom.xml diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java index 66a54fa956..444d1bd62a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java @@ -18,11 +18,12 @@ package org.apache.nifi.components; import java.util.Map; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.expression.ExpressionLanguageCompiler; -public interface ValidationContext { +public interface ValidationContext extends PropertyContext { /** * @return the {@link ControllerServiceLookup} which can be used to obtain @@ -43,13 +44,6 @@ public interface ValidationContext { */ ExpressionLanguageCompiler newExpressionLanguageCompiler(); - /** - * @param property being validated - * @return a PropertyValue that encapsulates the value configured for the - * given PropertyDescriptor - */ - PropertyValue getProperty(PropertyDescriptor property); - /** * @param value to make a PropertyValue object for * @return a PropertyValue that represents the given value diff --git a/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java b/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java new file mode 100644 index 0000000000..2771927b48 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.context; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; + +import java.util.Map; + +/** + * A context for retrieving a PropertyValue from a PropertyDescriptor. + */ +public interface PropertyContext { + + /** + * Retrieves the current value set for the given descriptor, if a value is + * set - else uses the descriptor to determine the appropriate default value + * + * @param descriptor to lookup the value of + * @return the property value of the given descriptor + */ + PropertyValue getProperty(PropertyDescriptor descriptor); + + + Map getAllProperties(); + +} diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java index 9762f3e7ff..95f0583be0 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java @@ -23,7 +23,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.reporting.InitializationException; public abstract class AbstractControllerService extends AbstractConfigurableComponent implements ControllerService { @@ -33,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp private ComponentLog logger; private StateManager stateManager; private volatile ConfigurationContext configurationContext; + private volatile boolean enabled = false; @Override public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException { @@ -50,7 +50,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp /** * @return the {@link ControllerServiceLookup} that was passed to the - * {@link #init(ProcessorInitializationContext)} method + * {@link #init(ControllerServiceInitializationContext)} method */ protected final ControllerServiceLookup getControllerServiceLookup() { return serviceLookup; @@ -66,6 +66,20 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp protected void init(final ControllerServiceInitializationContext config) throws InitializationException { } + @OnEnabled + public final void enabled() { + this.enabled = true; + } + + @OnDisabled + public final void disabled() { + this.enabled = false; + } + + public boolean isEnabled() { + return this.enabled; + } + /** * @return the logger that has been provided to the component by the * framework in its initialize method diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java index 03965d4c7d..e6b3cb2613 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java @@ -16,23 +16,17 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; + import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; - /** * This context is passed to ControllerServices and Reporting Tasks in order * to expose their configuration to them. */ -public interface ConfigurationContext { - - /** - * @param property to retrieve by name - * @return the configured value for the property with the given name - */ - PropertyValue getProperty(PropertyDescriptor property); +public interface ConfigurationContext extends PropertyContext { /** * @return an unmodifiable map of all configured properties for this diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index c112e8a697..2bbe06a49f 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ControllerServiceLookup; /** @@ -34,16 +35,7 @@ import org.apache.nifi.controller.ControllerServiceLookup; * thread-safe. *

*/ -public interface ProcessContext { - - /** - * Retrieves the current value set for the given descriptor, if a value is - * set - else uses the descriptor to determine the appropriate default value - * - * @param descriptor to lookup the value of - * @return the property value of the given descriptor - */ - PropertyValue getProperty(PropertyDescriptor descriptor); +public interface ProcessContext extends PropertyContext { /** * Retrieves the current value set for the given descriptor, if a value is diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java index f1acfe3b25..8b3ad5629d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java @@ -17,8 +17,8 @@ package org.apache.nifi.reporting; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ControllerServiceLookup; import java.util.Map; @@ -29,7 +29,7 @@ import java.util.Map; * statistics, metrics, and monitoring information, as well as configuration * supplied by the user. */ -public interface ReportingContext { +public interface ReportingContext extends PropertyContext { /** * @return a Map of all known {@link PropertyDescriptor}s to their @@ -39,13 +39,6 @@ public interface ReportingContext { */ Map getProperties(); - /** - * @param propertyName descriptor of property to lookup the value of - * @return PropertyValue that represents the user-configured value for the given - * {@link PropertyDescriptor} - */ - PropertyValue getProperty(PropertyDescriptor propertyName); - /** * @return the {@link EventAccess} object that can be used to obtain * information about specific events and reports that have happened diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index bc9992cea0..8f73ec0e45 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -476,6 +476,16 @@ nifi-hwx-schema-registry-nar nar + + org.apache.nifi + nifi-redis-service-api-nar + nar + + + org.apache.nifi + nifi-redis-nar + nar + diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java index 6203a06ec6..6e917512d3 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -378,6 +379,11 @@ public class NotificationServiceManager { return new StandardPropertyValue(value, null, variableRegistry); } + @Override + public Map getAllProperties() { + return Collections.unmodifiableMap(propertyValues); + } + @Override public String getIdentifier() { return serviceId; diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java index 88e0445334..e505d0b79a 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java @@ -17,19 +17,9 @@ package org.apache.nifi.bootstrap.notification; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; -public interface NotificationInitializationContext { - - /** - * Returns the configured value for the given PropertyDescriptor - * - * @param descriptor the property to fetch the value for - * @return the configured value for the given PropertyDescriptor, or the default value for the PropertyDescriptor - * if no value has been configured. - */ - PropertyValue getProperty(PropertyDescriptor descriptor); +public interface NotificationInitializationContext extends PropertyContext { /** * @return the identifier for the NotificationService diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java index 99d3b2344e..6d3ef537a6 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.bootstrap.notification; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -75,6 +76,15 @@ public class NotificationValidationContext implements ValidationContext { return context.getProperties(); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String getAnnotationData() { throw new UnsupportedOperationException(); diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java index 418249016e..5b90a1c4ed 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java @@ -23,13 +23,14 @@ import javax.net.ssl.SSLContext; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.logging.ComponentLog; /** * This interface defines an initialization context that is passed to a {@link StateProvider} when it * is initialized. */ -public interface StateProviderInitializationContext { +public interface StateProviderInitializationContext extends PropertyContext { /** * @return the identifier if the StateProvider */ @@ -40,15 +41,6 @@ public interface StateProviderInitializationContext { */ Map getProperties(); - /** - * Returns the configured value for the given property - * - * @param property the property to retrieve the value for - * - * @return the configured value for the property. - */ - PropertyValue getProperty(PropertyDescriptor property); - /** * @return the SSL Context that should be used to communicate with remote resources, * or null if no SSLContext has been configured diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java index 74b84ad330..91d805e2e8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.util; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -69,6 +70,15 @@ public class MockConfigurationContext implements ConfigurationContext { return new HashMap<>(this.properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) { if (service == null) { return property; diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 8cbe1ac50f..8651241e0c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -212,6 +212,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S } } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + /** * Validates the current properties, returning ValidationResults for any * invalid properties. All processor defined properties will be validated. diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java index 26ad59010e..f65bc3e3a0 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.util; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -58,6 +59,15 @@ public class MockReportingContext extends MockControllerServiceLookup implements return Collections.unmodifiableMap(properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String configuredValue = properties.get(property); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index fbd2a368bc..564ec54a87 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.util; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -88,6 +89,15 @@ public class MockValidationContext implements ValidationContext, ControllerServi return context.getProperties(); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String getAnnotationData() { return context.getAnnotationData(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index 8f8b2314be..62183cc25d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -40,6 +40,7 @@ import org.apache.nifi.reporting.Severity; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; @@ -107,6 +108,15 @@ public class StandardReportingContext implements ReportingContext, ControllerSer return Collections.unmodifiableMap(properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String configuredValue = properties.get(property); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 0d755b0b46..3116401486 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -194,6 +194,11 @@ public class ConnectableProcessContext implements ProcessContext { return null; } + @Override + public Map getAllProperties() { + return new HashMap<>(); + } + @Override public Map getProperties() { return new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java index 61db8191b5..c188d75adf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.service; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -81,6 +82,15 @@ public class StandardConfigurationContext implements ConfigurationContext { return component.getProperties(); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String getSchedulingPeriod() { return schedulingPeriod; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java index d86a120576..ace92c7735 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.state; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import javax.net.ssl.SSLContext; @@ -46,6 +47,15 @@ public class StandardStateProviderInitializationContext implements StateProvider return Collections.unmodifiableMap(properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue().getValue()); + } + return propValueMap; + } + @Override public PropertyValue getProperty(final PropertyDescriptor property) { return properties.get(property); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index d63ae00b91..b365de25c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -38,6 +38,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.controller.state.ConfigParseException; @@ -48,6 +49,7 @@ import org.apache.nifi.controller.state.config.StateProviderConfiguration; import org.apache.nifi.framework.security.util.SslContextFactory; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContext; import org.apache.nifi.registry.VariableRegistry; @@ -232,7 +234,7 @@ public class StandardStateManagerProvider implements StateManagerProvider{ Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); final Class mgrClass = rawClass.asSubclass(StateProvider.class); - return mgrClass.newInstance(); + return withNarClassLoader(mgrClass.newInstance()); } finally { if (ctxClassLoader != null) { Thread.currentThread().setContextClassLoader(ctxClassLoader); @@ -240,6 +242,129 @@ public class StandardStateManagerProvider implements StateManagerProvider{ } } + /** + * Wrap the provider so that all method calls set the context class loader to the NAR's class loader before + * executing the actual provider. + * + * @param stateProvider the base provider to wrap + * @return the wrapped provider + */ + private static StateProvider withNarClassLoader(final StateProvider stateProvider) { + return new StateProvider() { + @Override + public void initialize(StateProviderInitializationContext context) throws IOException { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.initialize(context); + } + } + + @Override + public void shutdown() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.shutdown(); + } + } + + @Override + public void setState(Map state, String componentId) throws IOException { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.setState(state, componentId); + } + } + + @Override + public StateMap getState(String componentId) throws IOException { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.getState(componentId); + } + } + + @Override + public boolean replace(StateMap oldValue, Map newValue, String componentId) throws IOException { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.replace(oldValue, newValue, componentId); + } + } + + @Override + public void clear(String componentId) throws IOException { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.clear(componentId); + } + } + + @Override + public void onComponentRemoved(String componentId) throws IOException { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.onComponentRemoved(componentId); + } + } + + @Override + public void enable() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.enable(); + } + } + + @Override + public void disable() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.disable(); + } + } + + @Override + public boolean isEnabled() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.isEnabled(); + } + } + + @Override + public Scope[] getSupportedScopes() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.getSupportedScopes(); + } + } + + @Override + public Collection validate(ValidationContext context) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.validate(context); + } + } + + @Override + public PropertyDescriptor getPropertyDescriptor(String name) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.getPropertyDescriptor(name); + } + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + stateProvider.onPropertyModified(descriptor, oldValue, newValue); + } + } + + @Override + public List getPropertyDescriptors() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.getPropertyDescriptors(); + } + } + + @Override + public String getIdentifier() { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + return stateProvider.getIdentifier(); + } + } + }; + } + /** * Returns the State Manager that has been created for the given component ID, or null if none exists * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 83906e2851..2714392691 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.processor; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,6 +126,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService return procNode.getProperties(); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String encrypt(final String unencrypted) { return encryptor.encrypt(unencrypted); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 86518b8036..1f5cfeed77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processor; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -98,6 +99,15 @@ public class StandardSchedulingContext implements SchedulingContext { return processContext.getProperties(); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String encrypt(final String unencrypted) { return processContext.encrypt(unencrypted); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index dfc7965aad..662169c570 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.processor; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -118,6 +119,15 @@ public class StandardValidationContext implements ValidationContext { return Collections.unmodifiableMap(properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String getAnnotationData() { return annotationData; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index c248d25788..32b6f539cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -62,6 +62,7 @@ import java.net.URL; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; @@ -512,6 +513,15 @@ public class TestStandardProcessorNode { return Collections.unmodifiableMap(properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + @Override public String getAnnotationData() { return null; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java index d2e4a0519f..41b7d9d69d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java @@ -20,6 +20,7 @@ package org.apache.nifi.controller.state.providers.local; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -61,6 +62,15 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider return Collections.unmodifiableMap(properties); } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue().getValue()); + } + return propValueMap; + } + @Override public PropertyValue getProperty(final PropertyDescriptor property) { final PropertyValue prop = properties.get(property); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java index d09ee1f4e7..037b35026d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.state.providers.zookeeper; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import javax.net.ssl.SSLContext; @@ -75,6 +76,15 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { return propValueMap; } + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : getProperties().entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue().getValue()); + } + return propValueMap; + } + @Override public PropertyValue getProperty(final PropertyDescriptor property) { final String prop = properties.get(property); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java index d1e73fb7e2..d9a1b37888 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java @@ -31,6 +31,11 @@ public class MockConfigurationContext implements ConfigurationContext { return null; } + @Override + public Map getAllProperties() { + return Collections.emptyMap(); + } + @Override public Map getProperties() { return Collections.emptyMap(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java index cf2e2cf43c..cb173247fe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java @@ -64,6 +64,11 @@ public class MockProcessContext implements ProcessContext { return Collections.emptyMap(); } + @Override + public Map getAllProperties() { + return Collections.emptyMap(); + } + @Override public String encrypt(String unencrypted) { return unencrypted; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml index d7631c2261..dcd7ee6c81 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml @@ -63,4 +63,72 @@ 10 seconds Open + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml new file mode 100644 index 0000000000..e59220df2f --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-redis-bundle + 1.4.0-SNAPSHOT + + + nifi-redis-extensions + jar + + + + + org.apache.nifi + nifi-distributed-cache-client-service-api + provided + + + org.apache.nifi + nifi-redis-service-api + 1.4.0-SNAPSHOT + provided + + + org.springframework.data + spring-data-redis + ${spring.data.redis.version} + provided + + + redis.clients + jedis + 2.9.0 + provided + + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + com.github.kstyrc + embedded-redis + 0.6 + test + + + diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java new file mode 100644 index 0000000000..68169f9c39 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.service; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.redis.RedisConnectionPool; +import org.apache.nifi.redis.RedisType; +import org.apache.nifi.redis.util.RedisUtils; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; + +import java.util.Collection; +import java.util.List; + +@Tags({"redis", "cache"}) +@CapabilityDescription("A service that provides connections to Redis.") +public class RedisConnectionPoolService extends AbstractControllerService implements RedisConnectionPool { + + private volatile PropertyContext context; + private volatile RedisType redisType; + private volatile JedisConnectionFactory connectionFactory; + + @Override + protected List getSupportedPropertyDescriptors() { + return RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + return RedisUtils.validate(validationContext); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + + final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue(); + this.redisType = RedisType.fromDisplayName(redisMode); + } + + @OnDisabled + public void onDisabled() { + if (connectionFactory != null) { + connectionFactory.destroy(); + connectionFactory = null; + redisType = null; + context = null; + } + } + + @Override + public RedisType getRedisType() { + return redisType; + } + + @Override + public RedisConnection getConnection() { + if (connectionFactory == null) { + synchronized (this) { + if (connectionFactory == null) { + connectionFactory = RedisUtils.createConnectionFactory(context, getLogger()); + } + } + } + + return connectionFactory.getConnection(); + } + + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java new file mode 100644 index 0000000000..94b195c213 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.service; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.redis.RedisConnectionPool; +import org.apache.nifi.redis.RedisType; +import org.apache.nifi.redis.util.RedisAction; +import org.apache.nifi.util.Tuple; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +@Tags({ "redis", "distributed", "cache", "map" }) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " + + "the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " + + "can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " + + "provide high-availability configurations.") +public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { + + public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("redis-connection-pool") + .displayName("Redis Connection Pool") + .identifiesControllerService(RedisConnectionPool.class) + .required(true) + .build(); + + static final List PROPERTY_DESCRIPTORS; + static { + final List props = new ArrayList<>(); + props.add(REDIS_CONNECTION_POOL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); + } + + private volatile RedisConnectionPool redisConnectionPool; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(); + + final RedisConnectionPool redisConnectionPool = validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); + if (redisConnectionPool != null) { + final RedisType redisType = redisConnectionPool.getRedisType(); + if (redisType != null && redisType == RedisType.CLUSTER) { + results.add(new ValidationResult.Builder() + .subject(REDIS_CONNECTION_POOL.getDisplayName()) + .valid(false) + .explanation(REDIS_CONNECTION_POOL.getDisplayName() + + " is configured in clustered mode, and this service requires a non-clustered Redis") + .build()); + } + } + + return results; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); + } + + @OnDisabled + public void onDisabled() { + this.redisConnectionPool = null; + } + + @Override + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + return withConnection(redisConnection -> { + final Tuple kv = serialize(key, value, keySerializer, valueSerializer); + return redisConnection.setNX(kv.getKey(), kv.getValue()); + }); + } + + @Override + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { + return withConnection(redisConnection -> { + final Tuple kv = serialize(key, value, keySerializer, valueSerializer); + do { + // start a watch on the key and retrieve the current value + redisConnection.watch(kv.getKey()); + final byte[] existingValue = redisConnection.get(kv.getKey()); + + // start a transaction and perform the put-if-absent + redisConnection.multi(); + redisConnection.setNX(kv.getKey(), kv.getValue()); + + // execute the transaction + final List results = redisConnection.exec(); + + // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry + // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation + if (results.size() > 0) { + final Object firstResult = results.get(0); + if (firstResult instanceof Boolean) { + final Boolean absent = (Boolean) firstResult; + return absent ? null : valueDeserializer.deserialize(existingValue); + } else { + // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop + throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " + + firstResult.getClass().getName() + " with value " + firstResult.toString()); + } + } + } while (isEnabled()); + + return null; + }); + } + + + @Override + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + return redisConnection.exists(k); + }); + } + + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + withConnection(redisConnection -> { + final Tuple kv = serialize(key, value, keySerializer, valueSerializer); + redisConnection.set(kv.getKey(), kv.getValue()); + return null; + }); + } + + @Override + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + final byte[] v = redisConnection.get(k); + return valueDeserializer.deserialize(v); + }); + } + + @Override + public void close() throws IOException { + // nothing to do + } + + @Override + public boolean remove(final K key, final Serializer keySerializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + final long numRemoved = redisConnection.del(k); + return numRemoved > 0; + }); + } + + @Override + public long removeByPattern(final String regex) throws IOException { + return withConnection(redisConnection -> { + long deletedCount = 0; + final List batchKeys = new ArrayList<>(); + + // delete keys in batches of 1000 using the cursor + final Cursor cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build()); + while (cursor.hasNext()) { + batchKeys.add(cursor.next()); + + if (batchKeys.size() == 1000) { + deletedCount += redisConnection.del(getKeys(batchKeys)); + batchKeys.clear(); + } + } + + // delete any left-over keys if some were added to the batch but never reached 1000 + if (batchKeys.size() > 0) { + deletedCount += redisConnection.del(getKeys(batchKeys)); + batchKeys.clear(); + } + + return deletedCount; + }); + } + + /** + * Convert the list of all keys to an array. + */ + private byte[][] getKeys(final List keys) { + final byte[][] allKeysArray = new byte[keys.size()][]; + for (int i=0; i < keys.size(); i++) { + allKeysArray[i] = keys.get(i); + } + return allKeysArray; + } + + // ----------------- Methods from AtomicDistributedMapCacheClient ------------------------ + + @Override + public AtomicCacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + + final byte[] v = redisConnection.get(k); + if (v == null) { + return null; + } + + // for Redis we are going to use the raw bytes of the value as the revision + return new AtomicCacheEntry<>(key, valueDeserializer.deserialize(v), v); + }); + } + + @Override + public boolean replace(final AtomicCacheEntry entry, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + return withConnection(redisConnection -> { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + keySerializer.serialize(entry.getKey(), out); + final byte[] k = out.toByteArray(); + out.reset(); + + valueSerializer.serialize(entry.getValue(), out); + final byte[] newVal = out.toByteArray(); + + // the revision of the cache entry holds the value of the key from a previous fetch + final byte[] prevVal = entry.getRevision().orElse(null); + + boolean replaced = false; + + // start a watch on the key and retrieve the current value + redisConnection.watch(k); + final byte[] currValue = redisConnection.get(k); + + // start a transaction + redisConnection.multi(); + + // compare-and-set + if (Arrays.equals(prevVal, currValue)) { + // if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true, + // status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded + redisConnection.getSet(k, newVal); + } + + // execute the transaction + final List results = redisConnection.exec(); + + // if we have a result then the replace succeeded + if (results.size() > 0) { + replaced = true; + } + + return replaced; + }); + } + + // ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------ + + private Tuple serialize(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + keySerializer.serialize(key, out); + final byte[] k = out.toByteArray(); + + out.reset(); + + valueSerializer.serialize(value, out); + final byte[] v = out.toByteArray(); + + return new Tuple<>(k, v); + } + + private byte[] serialize(final K key, final Serializer keySerializer) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + keySerializer.serialize(key, out); + return out.toByteArray(); + } + + private T withConnection(final RedisAction action) throws IOException { + RedisConnection redisConnection = null; + try { + redisConnection = redisConnectionPool.getConnection(); + return action.execute(redisConnection); + } finally { + if (redisConnection != null) { + try { + redisConnection.close(); + } catch (Exception e) { + getLogger().warn("Error closing connection: " + e.getMessage(), e); + } + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java new file mode 100644 index 0000000000..068b7a603e --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import org.apache.nifi.components.state.StateMap; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * A StateMap implementation for RedisStateProvider. + */ +public class RedisStateMap implements StateMap { + + public static final Long DEFAULT_VERSION = new Long(-1); + public static final Integer DEFAULT_ENCODING = new Integer(1); + + private final Long version; + private final Integer encodingVersion; + private final Map stateValues; + + private RedisStateMap(final Builder builder) { + this.version = builder.version == null ? DEFAULT_VERSION : builder.version; + this.encodingVersion = builder.encodingVersion == null ? DEFAULT_ENCODING : builder.encodingVersion; + this.stateValues = Collections.unmodifiableMap(new TreeMap<>(builder.stateValues)); + Objects.requireNonNull(version, "Version must be non-null"); + Objects.requireNonNull(encodingVersion, "Encoding Version must be non-null"); + Objects.requireNonNull(stateValues, "State Values must be non-null"); + } + + @Override + public long getVersion() { + return version; + } + + @Override + public String get(String key) { + return stateValues.get(key); + } + + @Override + public Map toMap() { + return stateValues; + } + + public Integer getEncodingVersion() { + return encodingVersion; + } + + public static class Builder { + + private Long version; + private Integer encodingVersion; + private Map stateValues = new TreeMap<>(); + + public Builder version(final Long version) { + this.version = version; + return this; + } + + public Builder encodingVersion(final Integer encodingVersion) { + this.encodingVersion = encodingVersion; + return this; + } + + public Builder stateValue(final String name, String value) { + stateValues.put(name, value); + return this; + } + + public Builder stateValues(final Map stateValues) { + this.stateValues.clear(); + if (stateValues != null) { + this.stateValues.putAll(stateValues); + } + return this; + } + + public RedisStateMap build() { + return new RedisStateMap(this); + } + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapJsonSerDe.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapJsonSerDe.java new file mode 100644 index 0000000000..170e7ac0a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapJsonSerDe.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; + +/** + * A RedisStateMapSerDe that uses JSON as the underlying representation. + */ +public class RedisStateMapJsonSerDe implements RedisStateMapSerDe { + + public static final String FIELD_VERSION = "version"; + public static final String FIELD_ENCODING = "encodingVersion"; + public static final String FIELD_STATE_VALUES = "stateValues"; + + private final JsonFactory jsonFactory = new JsonFactory(new ObjectMapper()); + + @Override + public byte[] serialize(final RedisStateMap stateMap) throws IOException { + if (stateMap == null) { + return null; + } + + try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + final JsonGenerator jsonGenerator = jsonFactory.createGenerator(out); + jsonGenerator.writeStartObject(); + jsonGenerator.writeNumberField(FIELD_VERSION, stateMap.getVersion()); + jsonGenerator.writeNumberField(FIELD_ENCODING, stateMap.getEncodingVersion()); + + jsonGenerator.writeObjectFieldStart(FIELD_STATE_VALUES); + for (Map.Entry entry : stateMap.toMap().entrySet()) { + jsonGenerator.writeStringField(entry.getKey(), entry.getValue()); + } + jsonGenerator.writeEndObject(); + + jsonGenerator.writeEndObject(); + jsonGenerator.flush(); + + return out.toByteArray(); + } + } + + @Override + public RedisStateMap deserialize(final byte[] data) throws IOException { + if (data == null || data.length == 0) { + return null; + } + + final RedisStateMap.Builder builder = new RedisStateMap.Builder(); + + try (final JsonParser jsonParser = jsonFactory.createParser(data)) { + final JsonNode rootNode = jsonParser.readValueAsTree(); + builder.version(rootNode.get(FIELD_VERSION).asLong()); + builder.encodingVersion(rootNode.get(FIELD_ENCODING).asInt()); + + final JsonNode stateValuesNode = rootNode.get(FIELD_STATE_VALUES); + stateValuesNode.fields().forEachRemaining(e -> builder.stateValue(e.getKey(), e.getValue().asText())); + } + + return builder.build(); + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapSerDe.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapSerDe.java new file mode 100644 index 0000000000..ce3067dc18 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMapSerDe.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import java.io.IOException; + +/** + * Provides serialization/deserialization of a RedisStateMap. + */ +public interface RedisStateMapSerDe { + + /** + * Serializes the given RedisStateMap. + * + * @param stateMap the RedisStateMap to serialize + * @return the serialized bytes or null if stateMap is null + * @throws IOException if an error occurs when serializing + */ + byte[] serialize(RedisStateMap stateMap) throws IOException; + + /** + * Deserializes the given bytes to a RedisStateMap. + * + * @param data bytes previously stored by RedisStateProvider + * @return a RedisStateMap or null if data is null or length 0 + * @throws IOException if an error occurs when deserializing + */ + RedisStateMap deserialize(byte[] data) throws IOException; + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java new file mode 100644 index 0000000000..c23bb81f14 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.redis.RedisType; +import org.apache.nifi.redis.util.RedisAction; +import org.apache.nifi.redis.util.RedisUtils; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A StateProvider backed by Redis. + */ +public class RedisStateProvider extends AbstractConfigurableComponent implements StateProvider { + + static final int ENCODING_VERSION = 1; + + public static final PropertyDescriptor KEY_PREFIX = new PropertyDescriptor.Builder() + .name("Key Prefix") + .displayName("Key Prefix") + .description("The prefix for each key stored by this state provider. When sharing a single Redis across multiple NiFi instances, " + + "setting a unique value for the Key Prefix will make it easier to identify which instances the keys came from.") + .required(true) + .defaultValue("nifi/components/") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final List STATE_PROVIDER_PROPERTIES; + static { + final List props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS); + props.add(KEY_PREFIX); + STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props); + } + + private String identifier; + private String keyPrefix; + private ComponentLog logger; + private PropertyContext context; + + private volatile boolean enabled; + private volatile JedisConnectionFactory connectionFactory; + + private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe(); + + @Override + public final void initialize(final StateProviderInitializationContext context) throws IOException { + this.context = context; + this.identifier = context.getIdentifier(); + this.logger = context.getLogger(); + + String keyPrefix = context.getProperty(KEY_PREFIX).getValue(); + if (!keyPrefix.endsWith("/")) { + keyPrefix = keyPrefix + "/"; + } + this.keyPrefix = keyPrefix; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return STATE_PROVIDER_PROPERTIES; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(RedisUtils.validate(validationContext)); + + final RedisType redisType = RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue()); + if (redisType != null && redisType == RedisType.CLUSTER) { + results.add(new ValidationResult.Builder() + .subject(RedisUtils.REDIS_MODE.getDisplayName()) + .valid(false) + .explanation(RedisUtils.REDIS_MODE.getDisplayName() + + " is configured in clustered mode, and this service requires a non-clustered Redis") + .build()); + } + + return results; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public void enable() { + enabled = true; + } + + @Override + public void disable() { + enabled = false; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public void shutdown() { + if (connectionFactory != null) { + connectionFactory.destroy(); + connectionFactory = null; + } + } + + @Override + public void setState(final Map state, final String componentId) throws IOException { + verifyEnabled(); + + final StateMap currStateMap = getState(componentId); + + int attempted = 0; + boolean updated = false; + + while (!updated && attempted < 20) { + updated = replace(currStateMap, state, componentId, true); + attempted++; + } + + if (!updated) { + throw new IOException("Unable to update state due to concurrent modifications"); + } + } + + @Override + public StateMap getState(final String componentId) throws IOException { + return withConnection(redisConnection -> { + final byte[] key = getComponentKey(componentId).getBytes(StandardCharsets.UTF_8); + final byte[] value = redisConnection.get(key); + + final RedisStateMap stateMap = serDe.deserialize(value); + if (stateMap == null) { + return new RedisStateMap.Builder().encodingVersion(ENCODING_VERSION).build(); + } else { + return stateMap; + } + }); + } + + @Override + public boolean replace(final StateMap oldValue, final Map newValue, final String componentId) throws IOException { + return replace(oldValue, newValue, componentId, false); + } + + private boolean replace(final StateMap oldValue, final Map newValue, final String componentId, final boolean allowReplaceMissing) throws IOException { + return withConnection(redisConnection -> { + + boolean replaced = false; + + // start a watch on the key and retrieve the current value + final byte[] key = getComponentKey(componentId).getBytes(StandardCharsets.UTF_8); + redisConnection.watch(key); + + final long prevVersion = oldValue == null ? -1L : oldValue.getVersion(); + + final byte[] currValue = redisConnection.get(key); + final RedisStateMap currStateMap = serDe.deserialize(currValue); + final long currVersion = currStateMap == null ? -1L : currStateMap.getVersion(); + + // the replace API expects that you can't call replace on a non-existing value, so unwatch and return + if (!allowReplaceMissing && currVersion == -1) { + redisConnection.unwatch(); + return false; + } + + // start a transaction + redisConnection.multi(); + + // compare-and-set + if (prevVersion == currVersion) { + // build the new RedisStateMap incrementing the version, using latest encoding, and using the passed in values + final RedisStateMap newStateMap = new RedisStateMap.Builder() + .version(currVersion + 1) + .encodingVersion(ENCODING_VERSION) + .stateValues(newValue) + .build(); + + // if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true, + // status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded + redisConnection.getSet(key, serDe.serialize(newStateMap)); + } + + // execute the transaction + final List results = redisConnection.exec(); + + // if we have a result then the replace succeeded + if (results.size() > 0) { + replaced = true; + } + + return replaced; + }); + } + + @Override + public void clear(final String componentId) throws IOException { + int attempted = 0; + boolean updated = false; + + while (!updated && attempted < 20) { + final StateMap currStateMap = getState(componentId); + updated = replace(currStateMap, Collections.emptyMap(), componentId, true); + + final String result = updated ? "successful" : "unsuccessful"; + logger.debug("Attempt # {} to clear state for component {} was {}", new Object[] { attempted + 1, componentId, result}); + + attempted++; + } + + if (!updated) { + throw new IOException("Unable to update state due to concurrent modifications"); + } + } + + @Override + public void onComponentRemoved(final String componentId) throws IOException { + withConnection(redisConnection -> { + final byte[] key = getComponentKey(componentId).getBytes(StandardCharsets.UTF_8); + redisConnection.del(key); + return true; + }); + } + + @Override + public Scope[] getSupportedScopes() { + return new Scope[] {Scope.CLUSTER}; + } + + private String getComponentKey(final String componentId) { + return keyPrefix + componentId; + } + + private void verifyEnabled() throws IOException { + if (!isEnabled()) { + throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster."); + } + } + + // visible for testing + synchronized RedisConnection getRedis() { + if (connectionFactory == null) { + connectionFactory = RedisUtils.createConnectionFactory(context, logger); + } + + return connectionFactory.getConnection(); + } + + private T withConnection(final RedisAction action) throws IOException { + RedisConnection redisConnection = null; + try { + redisConnection = getRedis(); + return action.execute(redisConnection); + } finally { + if (redisConnection != null) { + try { + redisConnection.close(); + } catch (Exception e) { + logger.warn("Error closing connection: " + e.getMessage(), e); + } + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java new file mode 100644 index 0000000000..ada0c6e181 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.util; + +import org.springframework.data.redis.connection.RedisConnection; + +import java.io.IOException; + +/** + * An action to be executed with a RedisConnection. + */ +public interface RedisAction { + + T execute(RedisConnection redisConnection) throws IOException; + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java new file mode 100644 index 0000000000..229c438595 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.util; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.redis.RedisType; +import org.apache.nifi.util.StringUtils; +import org.springframework.data.redis.connection.RedisClusterConfiguration; +import org.springframework.data.redis.connection.RedisSentinelConfiguration; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisShardInfo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class RedisUtils { + + // These properties are shared between the connection pool controller service and the state provider, the name + // is purposely set to be more human-readable since that will be referenced in state-management.xml + + public static final AllowableValue REDIS_MODE_STANDALONE = new AllowableValue(RedisType.STANDALONE.getDisplayName(), RedisType.STANDALONE.getDisplayName(), RedisType.STANDALONE.getDescription()); + public static final AllowableValue REDIS_MODE_SENTINEL = new AllowableValue(RedisType.SENTINEL.getDisplayName(), RedisType.SENTINEL.getDisplayName(), RedisType.SENTINEL.getDescription()); + public static final AllowableValue REDIS_MODE_CLUSTER = new AllowableValue(RedisType.CLUSTER.getDisplayName(), RedisType.CLUSTER.getDisplayName(), RedisType.CLUSTER.getDescription()); + + public static final PropertyDescriptor REDIS_MODE = new PropertyDescriptor.Builder() + .name("Redis Mode") + .displayName("Redis Mode") + .description("The type of Redis being communicated with - standalone, sentinel, or clustered.") + .allowableValues(REDIS_MODE_STANDALONE, REDIS_MODE_SENTINEL, REDIS_MODE_CLUSTER) + .defaultValue(REDIS_MODE_STANDALONE.getValue()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("Connection String") + .displayName("Connection String") + .description("The connection string for Redis. In a standalone instance this value will be of the form hostname:port. " + + "In a sentinel instance this value will be the comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3. " + + "In a clustered instance this value will be the comma-separated list of cluster masters, such as host1:port,host2:port,host3:port.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .name("Database Index") + .displayName("Database Index") + .description("The database index to be used by connections created from this connection pool. " + + "See the databases property in redis.conf, by default databases 0-15 will be available.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("0") + .expressionLanguageSupported(true) + .required(true) + .build(); + + public static final PropertyDescriptor COMMUNICATION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Communication Timeout") + .displayName("Communication Timeout") + .description("The timeout to use when attempting to communicate with Redis.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 seconds") + .required(true) + .build(); + + public static final PropertyDescriptor CLUSTER_MAX_REDIRECTS = new PropertyDescriptor.Builder() + .name("Cluster Max Redirects") + .displayName("Cluster Max Redirects") + .description("The maximum number of redirects that can be performed when clustered.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .required(true) + .build(); + + public static final PropertyDescriptor SENTINEL_MASTER = new PropertyDescriptor.Builder() + .name("Sentinel Master") + .displayName("Sentinel Master") + .description("The name of the sentinel master, require when Mode is set to Sentinel") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .displayName("Password") + .description("The password used to authenticate to the Redis server. See the requirepass property in redis.conf.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .sensitive(true) + .build(); + + public static final PropertyDescriptor POOL_MAX_TOTAL = new PropertyDescriptor.Builder() + .name("Pool - Max Total") + .displayName("Pool - Max Total") + .description("The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout). " + + "A negative value indicates that there is no limit.") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("8") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_MAX_IDLE = new PropertyDescriptor.Builder() + .name("Pool - Max Idle") + .displayName("Pool - Max Idle") + .description("The maximum number of idle connections that can be held in the pool, or a negative value if there is no limit.") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("8") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_MIN_IDLE = new PropertyDescriptor.Builder() + .name("Pool - Min Idle") + .displayName("Pool - Min Idle") + .description("The target for the minimum number of idle connections to maintain in the pool. If the configured value of Min Idle is " + + "greater than the configured value for Max Idle, then the value of Max Idle will be used instead.") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("0") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_BLOCK_WHEN_EXHAUSTED = new PropertyDescriptor.Builder() + .name("Pool - Block When Exhausted") + .displayName("Pool - Block When Exhausted") + .description("Whether or not clients should block and wait when trying to obtain a connection from the pool when the pool has no available connections. " + + "Setting this to false means an error will occur immediately when a client requests a connection and none are available.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_MAX_WAIT_TIME = new PropertyDescriptor.Builder() + .name("Pool - Max Wait Time") + .displayName("Pool - Max Wait Time") + .description("The amount of time to wait for an available connection when Block When Exhausted is set to true.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 seconds") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .name("Pool - Min Evictable Idle Time") + .displayName("Pool - Min Evictable Idle Time") + .description("The minimum amount of time an object may sit idle in the pool before it is eligible for eviction.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("60 seconds") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_TIME_BETWEEN_EVICTION_RUNS = new PropertyDescriptor.Builder() + .name("Pool - Time Between Eviction Runs") + .displayName("Pool - Time Between Eviction Runs") + .description("The amount of time between attempting to evict idle connections from the pool.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 seconds") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_NUM_TESTS_PER_EVICTION_RUN = new PropertyDescriptor.Builder() + .name("Pool - Num Tests Per Eviction Run") + .displayName("Pool - Num Tests Per Eviction Run") + .description("The number of connections to tests per eviction attempt. A negative value indicates to test all connections.") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("-1") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_TEST_ON_CREATE = new PropertyDescriptor.Builder() + .name("Pool - Test On Create") + .displayName("Pool - Test On Create") + .description("Whether or not connections should be tested upon creation.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_TEST_ON_BORROW = new PropertyDescriptor.Builder() + .name("Pool - Test On Borrow") + .displayName("Pool - Test On Borrow") + .description("Whether or not connections should be tested upon borrowing from the pool.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_TEST_ON_RETURN = new PropertyDescriptor.Builder() + .name("Pool - Test On Return") + .displayName("Pool - Test On Return") + .description("Whether or not connections should be tested upon returning to the pool.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor POOL_TEST_WHILE_IDLE = new PropertyDescriptor.Builder() + .name("Pool - Test While Idle") + .displayName("Pool - Test While Idle") + .description("Whether or not connections should be tested while idle.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + public static final List REDIS_CONNECTION_PROPERTY_DESCRIPTORS; + static { + final List props = new ArrayList<>(); + props.add(RedisUtils.REDIS_MODE); + props.add(RedisUtils.CONNECTION_STRING); + props.add(RedisUtils.DATABASE); + props.add(RedisUtils.COMMUNICATION_TIMEOUT); + props.add(RedisUtils.CLUSTER_MAX_REDIRECTS); + props.add(RedisUtils.SENTINEL_MASTER); + props.add(RedisUtils.PASSWORD); + props.add(RedisUtils.POOL_MAX_TOTAL); + props.add(RedisUtils.POOL_MAX_IDLE); + props.add(RedisUtils.POOL_MIN_IDLE); + props.add(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED); + props.add(RedisUtils.POOL_MAX_WAIT_TIME); + props.add(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME); + props.add(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS); + props.add(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN); + props.add(RedisUtils.POOL_TEST_ON_CREATE); + props.add(RedisUtils.POOL_TEST_ON_BORROW); + props.add(RedisUtils.POOL_TEST_ON_RETURN); + props.add(RedisUtils.POOL_TEST_WHILE_IDLE); + REDIS_CONNECTION_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); + } + + + public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger) { + final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue(); + final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); + final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger(); + final String password = context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue(); + final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final JedisPoolConfig poolConfig = createJedisPoolConfig(context); + + JedisConnectionFactory connectionFactory; + + if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) { + final JedisShardInfo jedisShardInfo = createJedisShardInfo(connectionString, timeout, password); + + logger.info("Connecting to Redis in standalone mode at " + connectionString); + connectionFactory = new JedisConnectionFactory(jedisShardInfo); + + } else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) { + final String[] sentinels = connectionString.split("[,]"); + final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue(); + final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels))); + final JedisShardInfo jedisShardInfo = createJedisShardInfo(sentinels[0], timeout, password); + + logger.info("Connecting to Redis in sentinel mode..."); + logger.info("Redis master = " + sentinelMaster); + + for (final String sentinel : sentinels) { + logger.info("Redis sentinel at " + sentinel); + } + + connectionFactory = new JedisConnectionFactory(sentinelConfiguration, poolConfig); + connectionFactory.setShardInfo(jedisShardInfo); + + } else { + final String[] clusterNodes = connectionString.split("[,]"); + final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger(); + + final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes)); + clusterConfiguration.setMaxRedirects(maxRedirects); + + logger.info("Connecting to Redis in clustered mode..."); + for (final String clusterNode : clusterNodes) { + logger.info("Redis cluster node at " + clusterNode); + } + + connectionFactory = new JedisConnectionFactory(clusterConfiguration, poolConfig); + } + + connectionFactory.setUsePool(true); + connectionFactory.setPoolConfig(poolConfig); + connectionFactory.setDatabase(dbIndex); + connectionFactory.setTimeout(timeout); + + if (!StringUtils.isBlank(password)) { + connectionFactory.setPassword(password); + } + + // need to call this to initialize the pool/connections + connectionFactory.afterPropertiesSet(); + return connectionFactory; + } + + private static List getTrimmedValues(final String[] values) { + final List trimmedValues = new ArrayList<>(); + for (final String value : values) { + trimmedValues.add(value.trim()); + } + return trimmedValues; + } + + private static JedisShardInfo createJedisShardInfo(final String hostAndPort, final Integer timeout, final String password) { + final String[] hostAndPortSplit = hostAndPort.split("[:]"); + final String host = hostAndPortSplit[0].trim(); + final Integer port = Integer.parseInt(hostAndPortSplit[1].trim()); + + final JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port); + jedisShardInfo.setConnectionTimeout(timeout); + jedisShardInfo.setSoTimeout(timeout); + + if (!StringUtils.isEmpty(password)) { + jedisShardInfo.setPassword(password); + } + + return jedisShardInfo; + } + + private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) { + final JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger()); + poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger()); + poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger()); + poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean()); + poolConfig.setMaxWaitMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS)); + poolConfig.setMinEvictableIdleTimeMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS)); + poolConfig.setTimeBetweenEvictionRunsMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS)); + poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger()); + poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean()); + poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean()); + poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean()); + poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean()); + return poolConfig; + } + + public static List validate(ValidationContext validationContext) { + final List results = new ArrayList<>(); + + final String redisMode = validationContext.getProperty(RedisUtils.REDIS_MODE).getValue(); + final String connectionString = validationContext.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); + final Integer dbIndex = validationContext.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger(); + + if (StringUtils.isBlank(connectionString)) { + results.add(new ValidationResult.Builder() + .subject(RedisUtils.CONNECTION_STRING.getDisplayName()) + .valid(false) + .explanation("Connection String cannot be blank") + .build()); + } else if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) { + final String[] hostAndPort = connectionString.split("[:]"); + if (hostAndPort == null || hostAndPort.length != 2 || StringUtils.isBlank(hostAndPort[0]) || StringUtils.isBlank(hostAndPort[1]) || !isInteger(hostAndPort[1])) { + results.add(new ValidationResult.Builder() + .subject(RedisUtils.CONNECTION_STRING.getDisplayName()) + .input(connectionString) + .valid(false) + .explanation("Standalone Connection String must be in the form host:port") + .build()); + } + } else { + for (final String connection : connectionString.split("[,]")) { + final String[] hostAndPort = connection.split("[:]"); + if (hostAndPort == null || hostAndPort.length != 2 || StringUtils.isBlank(hostAndPort[0]) || StringUtils.isBlank(hostAndPort[1]) || !isInteger(hostAndPort[1])) { + results.add(new ValidationResult.Builder() + .subject(RedisUtils.CONNECTION_STRING.getDisplayName()) + .input(connection) + .valid(false) + .explanation("Connection String must be in the form host:port,host:port,host:port,etc.") + .build()); + } + } + } + + if (RedisUtils.REDIS_MODE_CLUSTER.getValue().equals(redisMode) && dbIndex > 0) { + results.add(new ValidationResult.Builder() + .subject(RedisUtils.DATABASE.getDisplayName()) + .valid(false) + .explanation("Database Index must be 0 when using clustered Redis") + .build()); + } + + if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) { + final String sentinelMaster = validationContext.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue(); + if (StringUtils.isEmpty(sentinelMaster)) { + results.add(new ValidationResult.Builder() + .subject(RedisUtils.SENTINEL_MASTER.getDisplayName()) + .valid(false) + .explanation("Sentinel Master must be provided when Mode is Sentinel") + .build()); + } + } + + return results; + } + + private static boolean isInteger(final String number) { + try { + Integer.parseInt(number); + return true; + } catch (Exception e) { + return false; + } + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider new file mode 100644 index 0000000000..c7445ab3f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.redis.state.RedisStateProvider \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..5d4073fdd5 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.redis.service.RedisConnectionPoolService +org.apache.nifi.redis.service.RedisDistributedMapCacheClientService \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/FakeRedisProcessor.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/FakeRedisProcessor.java new file mode 100644 index 0000000000..7057c90423 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/FakeRedisProcessor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.service; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.redis.RedisConnectionPool; + +import java.util.Collections; +import java.util.List; + +/** + * Fake processor used for testing RedisConnectionPoolService. + */ +public class FakeRedisProcessor extends AbstractProcessor { + + public static final PropertyDescriptor REDIS_SERVICE = new PropertyDescriptor.Builder() + .name("redis-service") + .displayName("Redis Service") + .identifiesControllerService(RedisConnectionPool.class) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.singletonList(REDIS_SERVICE); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java new file mode 100644 index 0000000000..9d43e67177 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.service; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.redis.util.RedisUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import redis.embedded.RedisServer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is an integration test that is meant to be run against a real Redis instance. + */ +public class ITRedisDistributedMapCacheClientService { + + private TestRedisProcessor proc; + private TestRunner testRunner; + private RedisServer redisServer; + private RedisConnectionPoolService redisConnectionPool; + private RedisDistributedMapCacheClientService redisMapCacheClientService; + private int redisPort; + + @Before + public void setup() throws IOException { + this.redisPort = getAvailablePort(); + + this.redisServer = new RedisServer(redisPort); + redisServer.start(); + + proc = new TestRedisProcessor(); + testRunner = TestRunners.newTestRunner(proc); + } + + private int getAvailablePort() throws IOException { + try (SocketChannel socket = SocketChannel.open()) { + socket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + socket.bind(new InetSocketAddress("localhost", 0)); + return socket.socket().getLocalPort(); + } + } + + @After + public void teardown() throws IOException { + if (redisServer != null) { + redisServer.stop(); + } + } + + @Test + public void testStandaloneRedis() throws InitializationException { + try { + // create, configure, and enable the RedisConnectionPool service + redisConnectionPool = new RedisConnectionPoolService(); + testRunner.addControllerService("redis-connection-pool", redisConnectionPool); + testRunner.setProperty(redisConnectionPool, RedisUtils.CONNECTION_STRING, "localhost:" + redisPort); + + // uncomment this to test using a different database index than the default 0 + //testRunner.setProperty(redisConnectionPool, RedisUtils.DATABASE, "1"); + + // uncomment this to test using a password to authenticate to redis + //testRunner.setProperty(redisConnectionPool, RedisUtils.PASSWORD, "foobared"); + + testRunner.enableControllerService(redisConnectionPool); + + setupRedisMapCacheClientService(); + executeProcessor(); + } finally { + if (redisConnectionPool != null) { + redisConnectionPool.onDisabled(); + } + } + } + + private void setupRedisMapCacheClientService() throws InitializationException { + // create, configure, and enable the RedisDistributedMapCacheClient service + redisMapCacheClientService = new RedisDistributedMapCacheClientService(); + testRunner.addControllerService("redis-map-cache-client", redisMapCacheClientService); + testRunner.setProperty(redisMapCacheClientService, RedisDistributedMapCacheClientService.REDIS_CONNECTION_POOL, "redis-connection-pool"); + testRunner.enableControllerService(redisMapCacheClientService); + testRunner.setProperty(TestRedisProcessor.REDIS_MAP_CACHE, "redis-map-cache-client"); + } + + private void executeProcessor() { + // queue a flow file to trigger the processor and executeProcessor it + testRunner.enqueue("trigger"); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(TestRedisProcessor.REL_SUCCESS, 1); + } + + /** + * Test processor that exercises RedisDistributedMapCacheClient. + */ + private static class TestRedisProcessor extends AbstractProcessor { + + public static final PropertyDescriptor REDIS_MAP_CACHE = new PropertyDescriptor.Builder() + .name("redis-map-cache") + .displayName("Redis Map Cache") + .identifiesControllerService(AtomicDistributedMapCacheClient.class) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").build(); + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.singletonList(REDIS_MAP_CACHE); + } + + @Override + public Set getRelationships() { + return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Serializer stringSerializer = new StringSerializer(); + final Deserializer stringDeserializer = new StringDeserializer(); + + final AtomicDistributedMapCacheClient cacheClient = context.getProperty(REDIS_MAP_CACHE).asControllerService(AtomicDistributedMapCacheClient.class); + + try { + final long timestamp = System.currentTimeMillis(); + final String key = "test-redis-processor-" + timestamp; + final String value = "the time is " + timestamp; + + // verify the key doesn't exists, put the key/value, then verify it exists + Assert.assertFalse(cacheClient.containsKey(key, stringSerializer)); + cacheClient.put(key, value, stringSerializer, stringSerializer); + Assert.assertTrue(cacheClient.containsKey(key, stringSerializer)); + + // verify get returns the expected value we set above + final String retrievedValue = cacheClient.get(key, stringSerializer, stringDeserializer); + Assert.assertEquals(value, retrievedValue); + + // verify remove removes the entry and contains key returns false after + Assert.assertTrue(cacheClient.remove(key, stringSerializer)); + Assert.assertFalse(cacheClient.containsKey(key, stringSerializer)); + + // verify putIfAbsent works the first time and returns false the second time + Assert.assertTrue(cacheClient.putIfAbsent(key, value, stringSerializer, stringSerializer)); + Assert.assertFalse(cacheClient.putIfAbsent(key, "some other value", stringSerializer, stringSerializer)); + Assert.assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer)); + + // verify that getAndPutIfAbsent returns the existing value and doesn't modify it in the cache + final String getAndPutIfAbsentResult = cacheClient.getAndPutIfAbsent(key, value, stringSerializer, stringSerializer, stringDeserializer); + Assert.assertEquals(value, getAndPutIfAbsentResult); + Assert.assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer)); + + // verify that getAndPutIfAbsent on a key that doesn't exist returns null + final String keyThatDoesntExist = key + "_DOES_NOT_EXIST"; + Assert.assertFalse(cacheClient.containsKey(keyThatDoesntExist, stringSerializer)); + final String getAndPutIfAbsentResultWhenDoesntExist = cacheClient.getAndPutIfAbsent(keyThatDoesntExist, value, stringSerializer, stringSerializer, stringDeserializer); + Assert.assertEquals(null, getAndPutIfAbsentResultWhenDoesntExist); + Assert.assertEquals(value, cacheClient.get(keyThatDoesntExist, stringSerializer, stringDeserializer)); + + // verify atomic fetch returns the correct entry + final AtomicCacheEntry entry = cacheClient.fetch(key, stringSerializer, stringDeserializer); + Assert.assertEquals(key, entry.getKey()); + Assert.assertEquals(value, entry.getValue()); + Assert.assertTrue(Arrays.equals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null))); + + final AtomicCacheEntry notLatestEntry = new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), "not previous".getBytes(StandardCharsets.UTF_8)); + + // verify atomic replace does not replace when previous value is not equal + Assert.assertFalse(cacheClient.replace(notLatestEntry, stringSerializer, stringSerializer)); + Assert.assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer)); + + // verify atomic replace does replace when previous value is equal + final String replacementValue = "this value has been replaced"; + entry.setValue(replacementValue); + Assert.assertTrue(cacheClient.replace(entry, stringSerializer, stringSerializer)); + Assert.assertEquals(replacementValue, cacheClient.get(key, stringSerializer, stringDeserializer)); + + // verify atomic replace does replace no value previous existed + final String replaceKeyDoesntExist = key + "_REPLACE_DOES_NOT_EXIST"; + final AtomicCacheEntry entryDoesNotExist = new AtomicCacheEntry<>(replaceKeyDoesntExist, replacementValue, null); + Assert.assertTrue(cacheClient.replace(entryDoesNotExist, stringSerializer, stringSerializer)); + Assert.assertEquals(replacementValue, cacheClient.get(replaceKeyDoesntExist, stringSerializer, stringDeserializer)); + + final int numToDelete = 2000; + for (int i=0; i < numToDelete; i++) { + cacheClient.put(key + "-" + i, value, stringSerializer, stringSerializer); + } + + Assert.assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete); + Assert.assertFalse(cacheClient.containsKey(key, stringSerializer)); + + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + getLogger().error("Routing to failure due to: " + e.getMessage(), e); + session.transfer(flowFile, REL_FAILURE); + } + } + } + + private static class StringSerializer implements Serializer { + @Override + public void serialize(String value, OutputStream output) throws SerializationException, IOException { + if (value != null) { + output.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + } + + private static class StringDeserializer implements Deserializer { + @Override + public String deserialize(byte[] input) throws DeserializationException, IOException { + return input == null ? null : new String(input, StandardCharsets.UTF_8); + } + } +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java new file mode 100644 index 0000000000..2017c3b0ec --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.service; + +import org.apache.nifi.redis.RedisConnectionPool; +import org.apache.nifi.redis.util.RedisUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestRedisConnectionPoolService { + + private TestRunner testRunner; + private FakeRedisProcessor proc; + private RedisConnectionPool redisService; + + @Before + public void setup() throws InitializationException { + proc = new FakeRedisProcessor(); + testRunner = TestRunners.newTestRunner(proc); + + redisService = new RedisConnectionPoolService(); + testRunner.addControllerService("redis-service", redisService); + } + + @Test + public void testValidateConnectionString() { + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, " "); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "${redis.connection}"); + testRunner.assertNotValid(redisService); + + testRunner.setVariable("redis.connection", "localhost:6379"); + testRunner.assertValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost"); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:a"); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379"); + testRunner.assertValid(redisService); + + // standalone can only have one host:port pair + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379,localhost:6378"); + testRunner.assertNotValid(redisService); + + // cluster can have multiple host:port pairs + testRunner.setProperty(redisService, RedisUtils.REDIS_MODE, RedisUtils.REDIS_MODE_CLUSTER.getValue()); + testRunner.assertValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379,localhost"); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "local:host:6379,localhost:6378"); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:a,localhost:b"); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost :6379, localhost :6378, localhost:6377"); + testRunner.assertValid(redisService); + } + + @Test + public void testValidateSentinelMasterRequiredInSentinelMode() { + testRunner.setProperty(redisService, RedisUtils.REDIS_MODE, RedisUtils.REDIS_MODE_SENTINEL.getValue()); + testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379,localhost:6378"); + testRunner.assertNotValid(redisService); + + testRunner.setProperty(redisService, RedisUtils.SENTINEL_MASTER, "mymaster"); + testRunner.assertValid(redisService); + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java new file mode 100644 index 0000000000..6a5fb825b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.redis.util.RedisUtils; +import org.apache.nifi.util.MockComponentLog; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.embedded.RedisServer; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.SocketChannel; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * NOTE: These test cases should be kept in-sync with AbstractTestStateProvider which is in the framework + * and couldn't be extended here. + */ +public class ITRedisStateProvider { + + protected final String componentId = "111111111-1111-1111-1111-111111111111"; + + private RedisServer redisServer; + private RedisStateProvider provider; + + @Before + public void setup() throws Exception { + final int redisPort = getAvailablePort(); + + this.redisServer = new RedisServer(redisPort); + redisServer.start(); + + final Map properties = new HashMap<>(); + properties.put(RedisUtils.CONNECTION_STRING, "localhost:" + redisPort); + this.provider = createProvider(properties); + } + + @After + public void teardown() throws IOException { + if (provider != null) { + try { + provider.clear(componentId); + } catch (IOException e) { + } + provider.disable(); + provider.shutdown(); + } + + if (redisServer != null) { + redisServer.stop(); + } + } + + public StateProvider getProvider() { + return provider; + } + + @Test + public void testSetAndGet() throws IOException { + getProvider().setState(Collections.singletonMap("testSetAndGet", "value"), componentId); + assertEquals("value", getProvider().getState(componentId).get("testSetAndGet")); + } + + @Test + public void testReplaceSuccessful() throws IOException { + final String key = "testReplaceSuccessful"; + final StateProvider provider = getProvider(); + + StateMap map = provider.getState(componentId); + assertNotNull(map); + assertEquals(-1, map.getVersion()); + + assertNotNull(map.toMap()); + assertTrue(map.toMap().isEmpty()); + provider.setState(Collections.singletonMap(key, "value1"), componentId); + + map = provider.getState(componentId); + assertNotNull(map); + assertEquals(0, map.getVersion()); + assertEquals("value1", map.get(key)); + assertEquals("value1", map.toMap().get(key)); + + final Map newMap = new HashMap<>(map.toMap()); + newMap.put(key, "value2"); + assertTrue(provider.replace(map, newMap, componentId)); + + map = provider.getState(componentId); + assertEquals("value2", map.get(key)); + assertEquals(1L, map.getVersion()); + } + + @Test + public void testReplaceWithWrongVersion() throws IOException { + final String key = "testReplaceWithWrongVersion"; + final StateProvider provider = getProvider(); + provider.setState(Collections.singletonMap(key, "value1"), componentId); + + StateMap stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals("value1", stateMap.get(key)); + assertEquals(0, stateMap.getVersion()); + + provider.setState(Collections.singletonMap(key, "intermediate value"), componentId); + + assertFalse(provider.replace(stateMap, Collections.singletonMap(key, "value2"), componentId)); + stateMap = provider.getState(componentId); + assertEquals(key, stateMap.toMap().keySet().iterator().next()); + assertEquals(1, stateMap.toMap().size()); + assertEquals("intermediate value", stateMap.get(key)); + assertEquals(1, stateMap.getVersion()); + } + + + @Test + public void testToMap() throws IOException { + final String key = "testKeySet"; + final StateProvider provider = getProvider(); + Map map = provider.getState(componentId).toMap(); + assertNotNull(map); + assertTrue(map.isEmpty()); + + provider.setState(Collections.singletonMap(key, "value"), componentId); + map = provider.getState(componentId).toMap(); + assertNotNull(map); + assertEquals(1, map.size()); + assertEquals("value", map.get(key)); + + provider.setState(Collections. emptyMap(), componentId); + + final StateMap stateMap = provider.getState(componentId); + map = stateMap.toMap(); + assertNotNull(map); + assertTrue(map.isEmpty()); + assertEquals(1, stateMap.getVersion()); + } + + @Test + public void testClear() throws IOException { + final StateProvider provider = getProvider(); + StateMap stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals(-1L, stateMap.getVersion()); + assertTrue(stateMap.toMap().isEmpty()); + + provider.setState(Collections.singletonMap("testClear", "value"), componentId); + + stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals(0, stateMap.getVersion()); + assertEquals("value", stateMap.get("testClear")); + + provider.clear(componentId); + + stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals(1L, stateMap.getVersion()); + assertTrue(stateMap.toMap().isEmpty()); + } + + @Test + public void testReplaceWithNonExistingValue() throws Exception { + final StateProvider provider = getProvider(); + StateMap stateMap = provider.getState(componentId); + assertNotNull(stateMap); + + final Map newValue = new HashMap<>(); + newValue.put("value", "value"); + + final boolean replaced = provider.replace(stateMap, newValue, componentId); + assertFalse(replaced); + } + + @Test + public void testReplaceWithNonExistingValueAndVersionGreaterThanNegativeOne() throws Exception { + final StateProvider provider = getProvider(); + final StateMap stateMap = new StateMap() { + @Override + public long getVersion() { + return 4; + } + + @Override + public String get(String key) { + return null; + } + + @Override + public Map toMap() { + return Collections.emptyMap(); + } + }; + + final Map newValue = new HashMap<>(); + newValue.put("value", "value"); + + final boolean replaced = provider.replace(stateMap, newValue, componentId); + assertFalse(replaced); + } + + @Test + public void testOnComponentRemoved() throws IOException, InterruptedException { + final StateProvider provider = getProvider(); + final Map newValue = new HashMap<>(); + newValue.put("value", "value"); + + provider.setState(newValue, componentId); + final StateMap stateMap = provider.getState(componentId); + assertEquals(0L, stateMap.getVersion()); + + provider.onComponentRemoved(componentId); + + // wait for the background process to complete + Thread.sleep(1000L); + + final StateMap stateMapAfterRemoval = provider.getState(componentId); + + // version should be -1 because the state has been removed entirely. + assertEquals(-1L, stateMapAfterRemoval.getVersion()); + } + + + private void initializeProvider(final RedisStateProvider provider, final Map properties) throws IOException { + provider.initialize(new StateProviderInitializationContext() { + @Override + public String getIdentifier() { + return "Unit Test Provider Initialization Context"; + } + + @Override + public Map getProperties() { + final Map propValueMap = new HashMap<>(); + for (final Map.Entry entry : properties.entrySet()) { + propValueMap.put(entry.getKey(), new StandardPropertyValue(entry.getValue(), null)); + } + return propValueMap; + } + + @Override + public Map getAllProperties() { + final Map propValueMap = new LinkedHashMap<>(); + for (final Map.Entry entry : properties.entrySet()) { + propValueMap.put(entry.getKey().getName(), entry.getValue()); + } + return propValueMap; + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + String prop = properties.get(property); + + if (prop == null) { + prop = property.getDefaultValue(); + } + + return new StandardPropertyValue(prop, null); + } + + @Override + public SSLContext getSSLContext() { + return null; + } + + @Override + public ComponentLog getLogger() { + return new MockComponentLog("Unit Test RedisStateProvider", provider); + } + }); + } + + private RedisStateProvider createProvider(final Map properties) throws Exception { + final RedisStateProvider provider = new RedisStateProvider(); + initializeProvider(provider, properties); + provider.enable(); + return provider; + } + + private int getAvailablePort() throws IOException { + try (SocketChannel socket = SocketChannel.open()) { + socket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + socket.bind(new InetSocketAddress("localhost", 0)); + return socket.socket().getLocalPort(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateMapJsonSerDe.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateMapJsonSerDe.java new file mode 100644 index 0000000000..f147045919 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateMapJsonSerDe.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestRedisStateMapJsonSerDe { + + private RedisStateMapSerDe serDe; + + @Before + public void setup() { + serDe = new RedisStateMapJsonSerDe(); + } + + @Test + public void testSerializeDeserialize() throws IOException { + final RedisStateMap stateMap = new RedisStateMap.Builder() + .version(2L) + .encodingVersion(3) + .stateValue("field1", "value1") + .stateValue("field2", "value2") + .stateValue("field3", "value3") + .build(); + + final byte[] serialized = serDe.serialize(stateMap); + Assert.assertNotNull(serialized); + + final RedisStateMap deserialized = serDe.deserialize(serialized); + Assert.assertNotNull(deserialized); + Assert.assertEquals(stateMap.getVersion(), deserialized.getVersion()); + Assert.assertEquals(stateMap.getEncodingVersion(), deserialized.getEncodingVersion()); + Assert.assertEquals(stateMap.toMap(), deserialized.toMap()); + } + + @Test + public void testSerializeWhenNull() throws IOException { + Assert.assertNull(serDe.serialize(null)); + } + + @Test + public void testDeserializeWhenNull() throws IOException { + Assert.assertNull(serDe.deserialize(null)); + } + + @Test + public void testDefaultSerialization() throws IOException { + final RedisStateMap stateMap = new RedisStateMap.Builder().build(); + + final byte[] serialized = serDe.serialize(stateMap); + Assert.assertNotNull(serialized); + + final RedisStateMap deserialized = serDe.deserialize(serialized); + Assert.assertNotNull(deserialized); + Assert.assertEquals(RedisStateMap.DEFAULT_VERSION.longValue(), stateMap.getVersion()); + Assert.assertEquals(RedisStateMap.DEFAULT_ENCODING, stateMap.getEncodingVersion()); + Assert.assertNotNull(deserialized.toMap()); + Assert.assertEquals(0, deserialized.toMap().size()); + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/pom.xml new file mode 100644 index 0000000000..de1f5d7854 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-redis-bundle + 1.4.0-SNAPSHOT + + + nifi-redis-nar + 1.4.0-SNAPSHOT + nar + + true + true + + + + + org.apache.nifi + nifi-redis-extensions + 1.4.0-SNAPSHOT + + + org.apache.nifi + nifi-redis-service-api-nar + nar + + + + diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..3a1c566191 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,32 @@ +Apache NiFi +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/pom.xml new file mode 100644 index 0000000000..2e0d8c52c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-redis-bundle + 1.4.0-SNAPSHOT + + + nifi-redis-service-api-nar + 1.4.0-SNAPSHOT + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-redis-service-api + 1.4.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..958de4d6bc --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,239 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core' + which is available under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..b8e3136ba1 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,33 @@ +Apache NiFi +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Pool + The following NOTICE information applies: + Apache Commons Pool + Copyright 1999-2009 The Apache Software Foundation. + +************************ +The MIT License +************************ + +The following binary components are provided under the MIT License. See project link for details. + + (MIT License) Jedis (redis.clients:jedis:2.9.0 - https://github.com/xetorthio/jedis) + + +***************** +Public Domain +***************** + +The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/) \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/pom.xml new file mode 100644 index 0000000000..973b6af18a --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-redis-bundle + 1.4.0-SNAPSHOT + + + nifi-redis-service-api + jar + + + + org.springframework.data + spring-data-redis + ${spring.data.redis.version} + + + redis.clients + jedis + 2.9.0 + + + org.apache.nifi + nifi-api + + + diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java new file mode 100644 index 0000000000..f0e3e163b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisConnectionPool.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis; + +import org.apache.nifi.controller.ControllerService; +import org.springframework.data.redis.connection.RedisConnection; + +/** + * A service that provides connections to Redis using spring-data-redis. + */ +public interface RedisConnectionPool extends ControllerService { + + /** + * Obtains a RedisConnection instance from the pool. + * + * NOTE: Clients are responsible for ensuring the close() method of the connection is called to return it to the pool. + * + * @return a RedisConnection instance + */ + RedisConnection getConnection(); + + /** + * Some Redis operations are only supported in a specific mode. Clients should use this method to ensure + * the connection pool they are using supports their required operations. + * + * @return the type of Redis instance (i.e. standalone, clustered, sentinel) + */ + RedisType getRedisType(); + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisType.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisType.java new file mode 100644 index 0000000000..c7fb5142eb --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-service-api/src/main/java/org/apache/nifi/redis/RedisType.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis; + +/** + * Possible types of Redis instances. + */ +public enum RedisType { + + STANDALONE("Standalone", "A single standalone Redis instance."), + + SENTINEL("Sentinel", "Redis Sentinel which provides high-availability. Described further at https://redis.io/topics/sentinel"), + + CLUSTER("Cluster", "Clustered Redis which provides sharding and replication. Described further at https://redis.io/topics/cluster-spec"); + + private final String displayName; + private final String description; + + RedisType(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + public String getDisplayName() { + return displayName; + } + + public String getDescription() { + return description; + } + + public static RedisType fromDisplayName(final String displayName) { + for (RedisType redisType : values()) { + if (redisType.getDisplayName().equals(displayName)) { + return redisType; + } + } + + throw new IllegalArgumentException("Unknown RedisType: " + displayName); + } + +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/pom.xml new file mode 100644 index 0000000000..5d36212771 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.4.0-SNAPSHOT + + + org.apache.nifi + nifi-redis-bundle + 1.4.0-SNAPSHOT + pom + + + 1.8.3.RELEASE + + + + nifi-redis-service-api + nifi-redis-service-api-nar + nifi-redis-extensions + nifi-redis-nar + + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index cd55d27a61..61834553ff 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -49,8 +49,14 @@ public class WaitNotifyProtocol { private static final int REPLACE_RETRY_WAIT_MILLIS = 10; private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final Serializer stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8)); - private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8); + + private static final Serializer stringSerializer = (value, output) -> { + if (value != null ) { + output.write(value.getBytes(StandardCharsets.UTF_8)); + } + }; + + private final Deserializer stringDeserializer = input -> input == null ? null : new String(input, StandardCharsets.UTF_8); public static class Signal { diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index e8392d403b..e087f4a9b1 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -83,6 +83,7 @@ nifi-cybersecurity-bundle nifi-parquet-bundle nifi-extension-utils + nifi-redis-bundle diff --git a/pom.xml b/pom.xml index bdab77a7a8..9aa9b883d9 100644 --- a/pom.xml +++ b/pom.xml @@ -1434,6 +1434,18 @@ nar + org.apache.nifi + nifi-redis-service-api-nar + 1.4.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-redis-nar + 1.4.0-SNAPSHOT + nar + + org.apache.nifi nifi-properties 1.4.0-SNAPSHOT