NIFI-4043 Initial commit of nifi-redis-bundle

NIFI-4061 Initial version of RedisStateProvider
- Adding PropertyContext and updating existing contexts to extend it
- Added embedded Redis for unit testing
- Added wrapped StateProvider with NAR ClassLoader in StandardStateManagerProvider
- Updating state-management.xml with config for RedisStateProvider
- Renaming tests that use RedisServer to be IT tests so they don't run all the time

This closes #1918.
This commit is contained in:
Bryan Bende 2017-06-12 15:53:20 -04:00 committed by Mark Payne
parent 6bc6f955c0
commit aabd4a25d2
57 changed files with 3372 additions and 64 deletions

View File

@ -18,11 +18,12 @@ package org.apache.nifi.components;
import java.util.Map;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
public interface ValidationContext {
public interface ValidationContext extends PropertyContext {
/**
* @return the {@link ControllerServiceLookup} which can be used to obtain
@ -43,13 +44,6 @@ public interface ValidationContext {
*/
ExpressionLanguageCompiler newExpressionLanguageCompiler();
/**
* @param property being validated
* @return a PropertyValue that encapsulates the value configured for the
* given PropertyDescriptor
*/
PropertyValue getProperty(PropertyDescriptor property);
/**
* @param value to make a PropertyValue object for
* @return a PropertyValue that represents the given value

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.context;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import java.util.Map;
/**
* A context for retrieving a PropertyValue from a PropertyDescriptor.
*/
public interface PropertyContext {
/**
* Retrieves the current value set for the given descriptor, if a value is
* set - else uses the descriptor to determine the appropriate default value
*
* @param descriptor to lookup the value of
* @return the property value of the given descriptor
*/
PropertyValue getProperty(PropertyDescriptor descriptor);
Map<String,String> getAllProperties();
}

View File

@ -23,7 +23,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.reporting.InitializationException;
public abstract class AbstractControllerService extends AbstractConfigurableComponent implements ControllerService {
@ -33,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
private ComponentLog logger;
private StateManager stateManager;
private volatile ConfigurationContext configurationContext;
private volatile boolean enabled = false;
@Override
public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
@ -50,7 +50,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
/**
* @return the {@link ControllerServiceLookup} that was passed to the
* {@link #init(ProcessorInitializationContext)} method
* {@link #init(ControllerServiceInitializationContext)} method
*/
protected final ControllerServiceLookup getControllerServiceLookup() {
return serviceLookup;
@ -66,6 +66,20 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
protected void init(final ControllerServiceInitializationContext config) throws InitializationException {
}
@OnEnabled
public final void enabled() {
this.enabled = true;
}
@OnDisabled
public final void disabled() {
this.enabled = false;
}
public boolean isEnabled() {
return this.enabled;
}
/**
* @return the logger that has been provided to the component by the
* framework in its initialize method

View File

@ -16,23 +16,17 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
/**
* This context is passed to ControllerServices and Reporting Tasks in order
* to expose their configuration to them.
*/
public interface ConfigurationContext {
/**
* @param property to retrieve by name
* @return the configured value for the property with the given name
*/
PropertyValue getProperty(PropertyDescriptor property);
public interface ConfigurationContext extends PropertyContext {
/**
* @return an unmodifiable map of all configured properties for this

View File

@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceLookup;
/**
@ -34,16 +35,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
* thread-safe.
* </p>
*/
public interface ProcessContext {
/**
* Retrieves the current value set for the given descriptor, if a value is
* set - else uses the descriptor to determine the appropriate default value
*
* @param descriptor to lookup the value of
* @return the property value of the given descriptor
*/
PropertyValue getProperty(PropertyDescriptor descriptor);
public interface ProcessContext extends PropertyContext {
/**
* Retrieves the current value set for the given descriptor, if a value is

View File

@ -17,8 +17,8 @@
package org.apache.nifi.reporting;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import java.util.Map;
@ -29,7 +29,7 @@ import java.util.Map;
* statistics, metrics, and monitoring information, as well as configuration
* supplied by the user.
*/
public interface ReportingContext {
public interface ReportingContext extends PropertyContext {
/**
* @return a Map of all known {@link PropertyDescriptor}s to their
@ -39,13 +39,6 @@ public interface ReportingContext {
*/
Map<PropertyDescriptor, String> getProperties();
/**
* @param propertyName descriptor of property to lookup the value of
* @return PropertyValue that represents the user-configured value for the given
* {@link PropertyDescriptor}
*/
PropertyValue getProperty(PropertyDescriptor propertyName);
/**
* @return the {@link EventAccess} object that can be used to obtain
* information about specific events and reports that have happened

View File

@ -476,6 +476,16 @@
<artifactId>nifi-hwx-schema-registry-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-service-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<profiles>
<profile>

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -378,6 +379,11 @@ public class NotificationServiceManager {
return new StandardPropertyValue(value, null, variableRegistry);
}
@Override
public Map<String,String> getAllProperties() {
return Collections.unmodifiableMap(propertyValues);
}
@Override
public String getIdentifier() {
return serviceId;

View File

@ -17,19 +17,9 @@
package org.apache.nifi.bootstrap.notification;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
public interface NotificationInitializationContext {
/**
* Returns the configured value for the given PropertyDescriptor
*
* @param descriptor the property to fetch the value for
* @return the configured value for the given PropertyDescriptor, or the default value for the PropertyDescriptor
* if no value has been configured.
*/
PropertyValue getProperty(PropertyDescriptor descriptor);
public interface NotificationInitializationContext extends PropertyContext {
/**
* @return the identifier for the NotificationService

View File

@ -17,6 +17,7 @@
package org.apache.nifi.bootstrap.notification;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -75,6 +76,15 @@ public class NotificationValidationContext implements ValidationContext {
return context.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getAnnotationData() {
throw new UnsupportedOperationException();

View File

@ -23,13 +23,14 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
/**
* This interface defines an initialization context that is passed to a {@link StateProvider} when it
* is initialized.
*/
public interface StateProviderInitializationContext {
public interface StateProviderInitializationContext extends PropertyContext {
/**
* @return the identifier if the StateProvider
*/
@ -40,15 +41,6 @@ public interface StateProviderInitializationContext {
*/
Map<PropertyDescriptor, PropertyValue> getProperties();
/**
* Returns the configured value for the given property
*
* @param property the property to retrieve the value for
*
* @return the configured value for the property.
*/
PropertyValue getProperty(PropertyDescriptor property);
/**
* @return the SSL Context that should be used to communicate with remote resources,
* or <code>null</code> if no SSLContext has been configured

View File

@ -17,6 +17,7 @@
package org.apache.nifi.util;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -69,6 +70,15 @@ public class MockConfigurationContext implements ConfigurationContext {
return new HashMap<>(this.properties);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) {
if (service == null) {
return property;

View File

@ -212,6 +212,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
}
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
/**
* Validates the current properties, returning ValidationResults for any
* invalid properties. All processor defined properties will be validated.

View File

@ -19,6 +19,7 @@ package org.apache.nifi.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -58,6 +59,15 @@ public class MockReportingContext extends MockControllerServiceLookup implements
return Collections.unmodifiableMap(properties);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.util;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -88,6 +89,15 @@ public class MockValidationContext implements ValidationContext, ControllerServi
return context.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getAnnotationData() {
return context.getAnnotationData();

View File

@ -40,6 +40,7 @@ import org.apache.nifi.reporting.Severity;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@ -107,6 +108,15 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
return Collections.unmodifiableMap(properties);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);

View File

@ -194,6 +194,11 @@ public class ConnectableProcessContext implements ProcessContext {
return null;
}
@Override
public Map<String, String> getAllProperties() {
return new HashMap<>();
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return new HashMap<>();

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.service;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -81,6 +82,15 @@ public class StandardConfigurationContext implements ConfigurationContext {
return component.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getSchedulingPeriod() {
return schedulingPeriod;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.state;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
@ -46,6 +47,15 @@ public class StandardStateProviderInitializationContext implements StateProvider
return Collections.unmodifiableMap(properties);
}
@Override
public Map<String,String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
}
return propValueMap;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
return properties.get(property);

View File

@ -38,6 +38,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.controller.state.ConfigParseException;
@ -48,6 +49,7 @@ import org.apache.nifi.controller.state.config.StateProviderConfiguration;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.registry.VariableRegistry;
@ -232,7 +234,7 @@ public class StandardStateManagerProvider implements StateManagerProvider{
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class);
return mgrClass.newInstance();
return withNarClassLoader(mgrClass.newInstance());
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
@ -240,6 +242,129 @@ public class StandardStateManagerProvider implements StateManagerProvider{
}
}
/**
* Wrap the provider so that all method calls set the context class loader to the NAR's class loader before
* executing the actual provider.
*
* @param stateProvider the base provider to wrap
* @return the wrapped provider
*/
private static StateProvider withNarClassLoader(final StateProvider stateProvider) {
return new StateProvider() {
@Override
public void initialize(StateProviderInitializationContext context) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.initialize(context);
}
}
@Override
public void shutdown() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.shutdown();
}
}
@Override
public void setState(Map<String, String> state, String componentId) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.setState(state, componentId);
}
}
@Override
public StateMap getState(String componentId) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.getState(componentId);
}
}
@Override
public boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.replace(oldValue, newValue, componentId);
}
}
@Override
public void clear(String componentId) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.clear(componentId);
}
}
@Override
public void onComponentRemoved(String componentId) throws IOException {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.onComponentRemoved(componentId);
}
}
@Override
public void enable() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.enable();
}
}
@Override
public void disable() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.disable();
}
}
@Override
public boolean isEnabled() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.isEnabled();
}
}
@Override
public Scope[] getSupportedScopes() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.getSupportedScopes();
}
}
@Override
public Collection<ValidationResult> validate(ValidationContext context) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.validate(context);
}
}
@Override
public PropertyDescriptor getPropertyDescriptor(String name) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.getPropertyDescriptor(name);
}
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
stateProvider.onPropertyModified(descriptor, oldValue, newValue);
}
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.getPropertyDescriptors();
}
}
@Override
public String getIdentifier() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return stateProvider.getIdentifier();
}
}
};
}
/**
* Returns the State Manager that has been created for the given component ID, or <code>null</code> if none exists
*

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processor;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -125,6 +126,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return procNode.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String encrypt(final String unencrypted) {
return encryptor.encrypt(unencrypted);

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processor;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -98,6 +99,15 @@ public class StandardSchedulingContext implements SchedulingContext {
return processContext.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String encrypt(final String unencrypted) {
return processContext.encrypt(unencrypted);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processor;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -118,6 +119,15 @@ public class StandardValidationContext implements ValidationContext {
return Collections.unmodifiableMap(properties);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getAnnotationData() {
return annotationData;

View File

@ -62,6 +62,7 @@ import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@ -512,6 +513,15 @@ public class TestStandardProcessorNode {
return Collections.unmodifiableMap(properties);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getAnnotationData() {
return null;

View File

@ -20,6 +20,7 @@ package org.apache.nifi.controller.state.providers.local;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@ -61,6 +62,15 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider
return Collections.unmodifiableMap(properties);
}
@Override
public Map<String,String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
}
return propValueMap;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final PropertyValue prop = properties.get(property);

View File

@ -18,6 +18,7 @@ package org.apache.nifi.controller.state.providers.zookeeper;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
@ -75,6 +76,15 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
return propValueMap;
}
@Override
public Map<String,String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
}
return propValueMap;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String prop = properties.get(property);

View File

@ -31,6 +31,11 @@ public class MockConfigurationContext implements ConfigurationContext {
return null;
}
@Override
public Map<String, String> getAllProperties() {
return Collections.emptyMap();
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return Collections.emptyMap();

View File

@ -64,6 +64,11 @@ public class MockProcessContext implements ProcessContext {
return Collections.emptyMap();
}
@Override
public Map<String, String> getAllProperties() {
return Collections.emptyMap();
}
@Override
public String encrypt(String unencrypted) {
return unencrypted;

View File

@ -63,4 +63,72 @@
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>
<!--
Cluster State Provider that stores state in Redis. This can be used as an alternative to the ZooKeeper State Provider.
This provider requires the following properties:
Redis Mode - The type of Redis instance:
- Standalone
- Sentinel
- Cluster (currently not supported for state-management due to use of WATCH command which Redis does not support in clustered mode)
Connection String - The connection string for Redis.
- In a standalone instance this value will be of the form hostname:port.
- In a sentinel instance this value will be the comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3.
- In a clustered instance this value will be the comma-separated list of cluster masters, such as host1:port,host2:port,host3:port.
This provider has the following optional properties:
Key Prefix - The prefix for each key stored by this state provider. When sharing a single Redis across multiple NiFi instances, setting a unique
value for the Key Prefix will make it easier to identify which instances the keys came from (default nifi/components/).
Database Index - The database index to be used by connections created from this connection pool.
See the databases property in redis.conf, by default databases 0-15 will be available.
Communication Timeout - The timeout to use when attempting to communicate with Redis.
Cluster Max Redirects - The maximum number of redirects that can be performed when clustered.
Sentinel Master - The name of the sentinel master, require when Mode is set to Sentinel.
Password - The password used to authenticate to the Redis server. See the requirepass property in redis.conf.
Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout).
A negative value indicates that there is no limit.
Pool - Max Idle - The maximum number of idle connections that can be held in the pool, or a negative value if there is no limit.
Pool - Min Idle - The target for the minimum number of idle connections to maintain in the pool. If the configured value of Min Idle is
greater than the configured value for Max Idle, then the value of Max Idle will be used instead.
Pool - Block When Exhausted - Whether or not clients should block and wait when trying to obtain a connection from the pool when the pool
has no available connections. Setting this to false means an error will occur immediately when a client requests a connection and
none are available.
Pool - Max Wait Time - The amount of time to wait for an available connection when Block When Exhausted is set to true.
Pool - Min Evictable Idle Time - The minimum amount of time an object may sit idle in the pool before it is eligible for eviction.
Pool - Time Between Eviction Runs - The amount of time between attempting to evict idle connections from the pool.
Pool - Num Tests Per Eviction Run - The number of connections to tests per eviction attempt. A negative value indicates to test all connections.
Pool - Test On Create - Whether or not connections should be tested upon creation (default false).
Pool - Test On Borrow - Whether or not connections should be tested upon borrowing from the pool (default false).
Pool - Test On Return - Whether or not connections should be tested upon returning to the pool (default false).
Pool - Test While Idle - Whether or not connections should be tested while idle (default true).
<cluster-provider>
<id>redis-provider</id>
<class>org.apache.nifi.redis.state.RedisStateProvider</class>
<property name="Redis Mode">Standalone</property>
<property name="Connection String">localhost:6379</property>
</cluster-provider>
-->
</stateManagement>

View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-redis-extensions</artifactId>
<packaging>jar</packaging>
<dependencies>
<!-- Provided deps from nifi-redis-service-api -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-service-api</artifactId>
<version>1.4.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${spring.data.redis.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<scope>provided</scope>
</dependency>
<!-- End Provided deps from nifi-redis-service-api -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.6</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.service;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisUtils;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import java.util.Collection;
import java.util.List;
@Tags({"redis", "cache"})
@CapabilityDescription("A service that provides connections to Redis.")
public class RedisConnectionPoolService extends AbstractControllerService implements RedisConnectionPool {
private volatile PropertyContext context;
private volatile RedisType redisType;
private volatile JedisConnectionFactory connectionFactory;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return RedisUtils.validate(validationContext);
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.context = context;
final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
this.redisType = RedisType.fromDisplayName(redisMode);
}
@OnDisabled
public void onDisabled() {
if (connectionFactory != null) {
connectionFactory.destroy();
connectionFactory = null;
redisType = null;
context = null;
}
}
@Override
public RedisType getRedisType() {
return redisType;
}
@Override
public RedisConnection getConnection() {
if (connectionFactory == null) {
synchronized (this) {
if (connectionFactory == null) {
connectionFactory = RedisUtils.createConnectionFactory(context, getLogger());
}
}
}
return connectionFactory.getConnection();
}
}

View File

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.service;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
"the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
"can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " +
"provide high-availability configurations.")
public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("redis-connection-pool")
.displayName("Redis Connection Pool")
.identifiesControllerService(RedisConnectionPool.class)
.required(true)
.build();
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(REDIS_CONNECTION_POOL);
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
}
private volatile RedisConnectionPool redisConnectionPool;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final RedisConnectionPool redisConnectionPool = validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
if (redisConnectionPool != null) {
final RedisType redisType = redisConnectionPool.getRedisType();
if (redisType != null && redisType == RedisType.CLUSTER) {
results.add(new ValidationResult.Builder()
.subject(REDIS_CONNECTION_POOL.getDisplayName())
.valid(false)
.explanation(REDIS_CONNECTION_POOL.getDisplayName()
+ " is configured in clustered mode, and this service requires a non-clustered Redis")
.build());
}
}
return results;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
}
@OnDisabled
public void onDisabled() {
this.redisConnectionPool = null;
}
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
return redisConnection.setNX(kv.getKey(), kv.getValue());
});
}
@Override
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
do {
// start a watch on the key and retrieve the current value
redisConnection.watch(kv.getKey());
final byte[] existingValue = redisConnection.get(kv.getKey());
// start a transaction and perform the put-if-absent
redisConnection.multi();
redisConnection.setNX(kv.getKey(), kv.getValue());
// execute the transaction
final List<Object> results = redisConnection.exec();
// if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry
// if the results list has results, then the transaction succeeded and it should have the result of the setNX operation
if (results.size() > 0) {
final Object firstResult = results.get(0);
if (firstResult instanceof Boolean) {
final Boolean absent = (Boolean) firstResult;
return absent ? null : valueDeserializer.deserialize(existingValue);
} else {
// this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop
throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got "
+ firstResult.getClass().getName() + " with value " + firstResult.toString());
}
}
} while (isEnabled());
return null;
});
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
return redisConnection.exists(k);
});
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
redisConnection.set(kv.getKey(), kv.getValue());
return null;
});
}
@Override
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final byte[] v = redisConnection.get(k);
return valueDeserializer.deserialize(v);
});
}
@Override
public void close() throws IOException {
// nothing to do
}
@Override
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final long numRemoved = redisConnection.del(k);
return numRemoved > 0;
});
}
@Override
public long removeByPattern(final String regex) throws IOException {
return withConnection(redisConnection -> {
long deletedCount = 0;
final List<byte[]> batchKeys = new ArrayList<>();
// delete keys in batches of 1000 using the cursor
final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
while (cursor.hasNext()) {
batchKeys.add(cursor.next());
if (batchKeys.size() == 1000) {
deletedCount += redisConnection.del(getKeys(batchKeys));
batchKeys.clear();
}
}
// delete any left-over keys if some were added to the batch but never reached 1000
if (batchKeys.size() > 0) {
deletedCount += redisConnection.del(getKeys(batchKeys));
batchKeys.clear();
}
return deletedCount;
});
}
/**
* Convert the list of all keys to an array.
*/
private byte[][] getKeys(final List<byte[]> keys) {
final byte[][] allKeysArray = new byte[keys.size()][];
for (int i=0; i < keys.size(); i++) {
allKeysArray[i] = keys.get(i);
}
return allKeysArray;
}
// ----------------- Methods from AtomicDistributedMapCacheClient ------------------------
@Override
public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final byte[] v = redisConnection.get(k);
if (v == null) {
return null;
}
// for Redis we are going to use the raw bytes of the value as the revision
return new AtomicCacheEntry<>(key, valueDeserializer.deserialize(v), v);
});
}
@Override
public <K, V> boolean replace(final AtomicCacheEntry<K, V, byte[]> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withConnection(redisConnection -> {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(entry.getKey(), out);
final byte[] k = out.toByteArray();
out.reset();
valueSerializer.serialize(entry.getValue(), out);
final byte[] newVal = out.toByteArray();
// the revision of the cache entry holds the value of the key from a previous fetch
final byte[] prevVal = entry.getRevision().orElse(null);
boolean replaced = false;
// start a watch on the key and retrieve the current value
redisConnection.watch(k);
final byte[] currValue = redisConnection.get(k);
// start a transaction
redisConnection.multi();
// compare-and-set
if (Arrays.equals(prevVal, currValue)) {
// if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true,
// status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded
redisConnection.getSet(k, newVal);
}
// execute the transaction
final List<Object> results = redisConnection.exec();
// if we have a result then the replace succeeded
if (results.size() > 0) {
replaced = true;
}
return replaced;
});
}
// ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------
private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
final byte[] k = out.toByteArray();
out.reset();
valueSerializer.serialize(value, out);
final byte[] v = out.toByteArray();
return new Tuple<>(k, v);
}
private <K> byte[] serialize(final K key, final Serializer<K> keySerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
return out.toByteArray();
}
private <T> T withConnection(final RedisAction<T> action) throws IOException {
RedisConnection redisConnection = null;
try {
redisConnection = redisConnectionPool.getConnection();
return action.execute(redisConnection);
} finally {
if (redisConnection != null) {
try {
redisConnection.close();
} catch (Exception e) {
getLogger().warn("Error closing connection: " + e.getMessage(), e);
}
}
}
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.state;
import org.apache.nifi.components.state.StateMap;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
/**
* A StateMap implementation for RedisStateProvider.
*/
public class RedisStateMap implements StateMap {
public static final Long DEFAULT_VERSION = new Long(-1);
public static final Integer DEFAULT_ENCODING = new Integer(1);
private final Long version;
private final Integer encodingVersion;
private final Map<String,String> stateValues;
private RedisStateMap(final Builder builder) {
this.version = builder.version == null ? DEFAULT_VERSION : builder.version;
this.encodingVersion = builder.encodingVersion == null ? DEFAULT_ENCODING : builder.encodingVersion;
this.stateValues = Collections.unmodifiableMap(new TreeMap<>(builder.stateValues));
Objects.requireNonNull(version, "Version must be non-null");
Objects.requireNonNull(encodingVersion, "Encoding Version must be non-null");
Objects.requireNonNull(stateValues, "State Values must be non-null");
}
@Override
public long getVersion() {
return version;
}
@Override
public String get(String key) {
return stateValues.get(key);
}
@Override
public Map<String, String> toMap() {
return stateValues;
}
public Integer getEncodingVersion() {
return encodingVersion;
}
public static class Builder {
private Long version;
private Integer encodingVersion;
private Map<String,String> stateValues = new TreeMap<>();
public Builder version(final Long version) {
this.version = version;
return this;
}
public Builder encodingVersion(final Integer encodingVersion) {
this.encodingVersion = encodingVersion;
return this;
}
public Builder stateValue(final String name, String value) {
stateValues.put(name, value);
return this;
}
public Builder stateValues(final Map<String,String> stateValues) {
this.stateValues.clear();
if (stateValues != null) {
this.stateValues.putAll(stateValues);
}
return this;
}
public RedisStateMap build() {
return new RedisStateMap(this);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.state;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* A RedisStateMapSerDe that uses JSON as the underlying representation.
*/
public class RedisStateMapJsonSerDe implements RedisStateMapSerDe {
public static final String FIELD_VERSION = "version";
public static final String FIELD_ENCODING = "encodingVersion";
public static final String FIELD_STATE_VALUES = "stateValues";
private final JsonFactory jsonFactory = new JsonFactory(new ObjectMapper());
@Override
public byte[] serialize(final RedisStateMap stateMap) throws IOException {
if (stateMap == null) {
return null;
}
try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
final JsonGenerator jsonGenerator = jsonFactory.createGenerator(out);
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField(FIELD_VERSION, stateMap.getVersion());
jsonGenerator.writeNumberField(FIELD_ENCODING, stateMap.getEncodingVersion());
jsonGenerator.writeObjectFieldStart(FIELD_STATE_VALUES);
for (Map.Entry<String,String> entry : stateMap.toMap().entrySet()) {
jsonGenerator.writeStringField(entry.getKey(), entry.getValue());
}
jsonGenerator.writeEndObject();
jsonGenerator.writeEndObject();
jsonGenerator.flush();
return out.toByteArray();
}
}
@Override
public RedisStateMap deserialize(final byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
final RedisStateMap.Builder builder = new RedisStateMap.Builder();
try (final JsonParser jsonParser = jsonFactory.createParser(data)) {
final JsonNode rootNode = jsonParser.readValueAsTree();
builder.version(rootNode.get(FIELD_VERSION).asLong());
builder.encodingVersion(rootNode.get(FIELD_ENCODING).asInt());
final JsonNode stateValuesNode = rootNode.get(FIELD_STATE_VALUES);
stateValuesNode.fields().forEachRemaining(e -> builder.stateValue(e.getKey(), e.getValue().asText()));
}
return builder.build();
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.state;
import java.io.IOException;
/**
* Provides serialization/deserialization of a RedisStateMap.
*/
public interface RedisStateMapSerDe {
/**
* Serializes the given RedisStateMap.
*
* @param stateMap the RedisStateMap to serialize
* @return the serialized bytes or null if stateMap is null
* @throws IOException if an error occurs when serializing
*/
byte[] serialize(RedisStateMap stateMap) throws IOException;
/**
* Deserializes the given bytes to a RedisStateMap.
*
* @param data bytes previously stored by RedisStateProvider
* @return a RedisStateMap or null if data is null or length 0
* @throws IOException if an error occurs when deserializing
*/
RedisStateMap deserialize(byte[] data) throws IOException;
}

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.state;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.redis.util.RedisUtils;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A StateProvider backed by Redis.
*/
public class RedisStateProvider extends AbstractConfigurableComponent implements StateProvider {
static final int ENCODING_VERSION = 1;
public static final PropertyDescriptor KEY_PREFIX = new PropertyDescriptor.Builder()
.name("Key Prefix")
.displayName("Key Prefix")
.description("The prefix for each key stored by this state provider. When sharing a single Redis across multiple NiFi instances, " +
"setting a unique value for the Key Prefix will make it easier to identify which instances the keys came from.")
.required(true)
.defaultValue("nifi/components/")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
props.add(KEY_PREFIX);
STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
}
private String identifier;
private String keyPrefix;
private ComponentLog logger;
private PropertyContext context;
private volatile boolean enabled;
private volatile JedisConnectionFactory connectionFactory;
private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe();
@Override
public final void initialize(final StateProviderInitializationContext context) throws IOException {
this.context = context;
this.identifier = context.getIdentifier();
this.logger = context.getLogger();
String keyPrefix = context.getProperty(KEY_PREFIX).getValue();
if (!keyPrefix.endsWith("/")) {
keyPrefix = keyPrefix + "/";
}
this.keyPrefix = keyPrefix;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return STATE_PROVIDER_PROPERTIES;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(RedisUtils.validate(validationContext));
final RedisType redisType = RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue());
if (redisType != null && redisType == RedisType.CLUSTER) {
results.add(new ValidationResult.Builder()
.subject(RedisUtils.REDIS_MODE.getDisplayName())
.valid(false)
.explanation(RedisUtils.REDIS_MODE.getDisplayName()
+ " is configured in clustered mode, and this service requires a non-clustered Redis")
.build());
}
return results;
}
@Override
public String getIdentifier() {
return identifier;
}
@Override
public void enable() {
enabled = true;
}
@Override
public void disable() {
enabled = false;
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public void shutdown() {
if (connectionFactory != null) {
connectionFactory.destroy();
connectionFactory = null;
}
}
@Override
public void setState(final Map<String, String> state, final String componentId) throws IOException {
verifyEnabled();
final StateMap currStateMap = getState(componentId);
int attempted = 0;
boolean updated = false;
while (!updated && attempted < 20) {
updated = replace(currStateMap, state, componentId, true);
attempted++;
}
if (!updated) {
throw new IOException("Unable to update state due to concurrent modifications");
}
}
@Override
public StateMap getState(final String componentId) throws IOException {
return withConnection(redisConnection -> {
final byte[] key = getComponentKey(componentId).getBytes(StandardCharsets.UTF_8);
final byte[] value = redisConnection.get(key);
final RedisStateMap stateMap = serDe.deserialize(value);
if (stateMap == null) {
return new RedisStateMap.Builder().encodingVersion(ENCODING_VERSION).build();
} else {
return stateMap;
}
});
}
@Override
public boolean replace(final StateMap oldValue, final Map<String, String> newValue, final String componentId) throws IOException {
return replace(oldValue, newValue, componentId, false);
}
private boolean replace(final StateMap oldValue, final Map<String, String> newValue, final String componentId, final boolean allowReplaceMissing) throws IOException {
return withConnection(redisConnection -> {
boolean replaced = false;
// start a watch on the key and retrieve the current value
final byte[] key = getComponentKey(componentId).getBytes(StandardCharsets.UTF_8);
redisConnection.watch(key);
final long prevVersion = oldValue == null ? -1L : oldValue.getVersion();
final byte[] currValue = redisConnection.get(key);
final RedisStateMap currStateMap = serDe.deserialize(currValue);
final long currVersion = currStateMap == null ? -1L : currStateMap.getVersion();
// the replace API expects that you can't call replace on a non-existing value, so unwatch and return
if (!allowReplaceMissing && currVersion == -1) {
redisConnection.unwatch();
return false;
}
// start a transaction
redisConnection.multi();
// compare-and-set
if (prevVersion == currVersion) {
// build the new RedisStateMap incrementing the version, using latest encoding, and using the passed in values
final RedisStateMap newStateMap = new RedisStateMap.Builder()
.version(currVersion + 1)
.encodingVersion(ENCODING_VERSION)
.stateValues(newValue)
.build();
// if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true,
// status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded
redisConnection.getSet(key, serDe.serialize(newStateMap));
}
// execute the transaction
final List<Object> results = redisConnection.exec();
// if we have a result then the replace succeeded
if (results.size() > 0) {
replaced = true;
}
return replaced;
});
}
@Override
public void clear(final String componentId) throws IOException {
int attempted = 0;
boolean updated = false;
while (!updated && attempted < 20) {
final StateMap currStateMap = getState(componentId);
updated = replace(currStateMap, Collections.emptyMap(), componentId, true);
final String result = updated ? "successful" : "unsuccessful";
logger.debug("Attempt # {} to clear state for component {} was {}", new Object[] { attempted + 1, componentId, result});
attempted++;
}
if (!updated) {
throw new IOException("Unable to update state due to concurrent modifications");
}
}
@Override
public void onComponentRemoved(final String componentId) throws IOException {
withConnection(redisConnection -> {
final byte[] key = getComponentKey(componentId).getBytes(StandardCharsets.UTF_8);
redisConnection.del(key);
return true;
});
}
@Override
public Scope[] getSupportedScopes() {
return new Scope[] {Scope.CLUSTER};
}
private String getComponentKey(final String componentId) {
return keyPrefix + componentId;
}
private void verifyEnabled() throws IOException {
if (!isEnabled()) {
throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster.");
}
}
// visible for testing
synchronized RedisConnection getRedis() {
if (connectionFactory == null) {
connectionFactory = RedisUtils.createConnectionFactory(context, logger);
}
return connectionFactory.getConnection();
}
private <T> T withConnection(final RedisAction<T> action) throws IOException {
RedisConnection redisConnection = null;
try {
redisConnection = getRedis();
return action.execute(redisConnection);
} finally {
if (redisConnection != null) {
try {
redisConnection.close();
} catch (Exception e) {
logger.warn("Error closing connection: " + e.getMessage(), e);
}
}
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.util;
import org.springframework.data.redis.connection.RedisConnection;
import java.io.IOException;
/**
* An action to be executed with a RedisConnection.
*/
public interface RedisAction<T> {
T execute(RedisConnection redisConnection) throws IOException;
}

View File

@ -0,0 +1,428 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.util;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.util.StringUtils;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RedisUtils {
// These properties are shared between the connection pool controller service and the state provider, the name
// is purposely set to be more human-readable since that will be referenced in state-management.xml
public static final AllowableValue REDIS_MODE_STANDALONE = new AllowableValue(RedisType.STANDALONE.getDisplayName(), RedisType.STANDALONE.getDisplayName(), RedisType.STANDALONE.getDescription());
public static final AllowableValue REDIS_MODE_SENTINEL = new AllowableValue(RedisType.SENTINEL.getDisplayName(), RedisType.SENTINEL.getDisplayName(), RedisType.SENTINEL.getDescription());
public static final AllowableValue REDIS_MODE_CLUSTER = new AllowableValue(RedisType.CLUSTER.getDisplayName(), RedisType.CLUSTER.getDisplayName(), RedisType.CLUSTER.getDescription());
public static final PropertyDescriptor REDIS_MODE = new PropertyDescriptor.Builder()
.name("Redis Mode")
.displayName("Redis Mode")
.description("The type of Redis being communicated with - standalone, sentinel, or clustered.")
.allowableValues(REDIS_MODE_STANDALONE, REDIS_MODE_SENTINEL, REDIS_MODE_CLUSTER)
.defaultValue(REDIS_MODE_STANDALONE.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor.Builder()
.name("Connection String")
.displayName("Connection String")
.description("The connection string for Redis. In a standalone instance this value will be of the form hostname:port. " +
"In a sentinel instance this value will be the comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3. " +
"In a clustered instance this value will be the comma-separated list of cluster masters, such as host1:port,host2:port,host3:port.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder()
.name("Database Index")
.displayName("Database Index")
.description("The database index to be used by connections created from this connection pool. " +
"See the databases property in redis.conf, by default databases 0-15 will be available.")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("0")
.expressionLanguageSupported(true)
.required(true)
.build();
public static final PropertyDescriptor COMMUNICATION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communication Timeout")
.displayName("Communication Timeout")
.description("The timeout to use when attempting to communicate with Redis.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.required(true)
.build();
public static final PropertyDescriptor CLUSTER_MAX_REDIRECTS = new PropertyDescriptor.Builder()
.name("Cluster Max Redirects")
.displayName("Cluster Max Redirects")
.description("The maximum number of redirects that can be performed when clustered.")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.required(true)
.build();
public static final PropertyDescriptor SENTINEL_MASTER = new PropertyDescriptor.Builder()
.name("Sentinel Master")
.displayName("Sentinel Master")
.description("The name of the sentinel master, require when Mode is set to Sentinel")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.displayName("Password")
.description("The password used to authenticate to the Redis server. See the requirepass property in redis.conf.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.sensitive(true)
.build();
public static final PropertyDescriptor POOL_MAX_TOTAL = new PropertyDescriptor.Builder()
.name("Pool - Max Total")
.displayName("Pool - Max Total")
.description("The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout). " +
"A negative value indicates that there is no limit.")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("8")
.required(true)
.build();
public static final PropertyDescriptor POOL_MAX_IDLE = new PropertyDescriptor.Builder()
.name("Pool - Max Idle")
.displayName("Pool - Max Idle")
.description("The maximum number of idle connections that can be held in the pool, or a negative value if there is no limit.")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("8")
.required(true)
.build();
public static final PropertyDescriptor POOL_MIN_IDLE = new PropertyDescriptor.Builder()
.name("Pool - Min Idle")
.displayName("Pool - Min Idle")
.description("The target for the minimum number of idle connections to maintain in the pool. If the configured value of Min Idle is " +
"greater than the configured value for Max Idle, then the value of Max Idle will be used instead.")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("0")
.required(true)
.build();
public static final PropertyDescriptor POOL_BLOCK_WHEN_EXHAUSTED = new PropertyDescriptor.Builder()
.name("Pool - Block When Exhausted")
.displayName("Pool - Block When Exhausted")
.description("Whether or not clients should block and wait when trying to obtain a connection from the pool when the pool has no available connections. " +
"Setting this to false means an error will occur immediately when a client requests a connection and none are available.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final PropertyDescriptor POOL_MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Pool - Max Wait Time")
.displayName("Pool - Max Wait Time")
.description("The amount of time to wait for an available connection when Block When Exhausted is set to true.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 seconds")
.required(true)
.build();
public static final PropertyDescriptor POOL_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.name("Pool - Min Evictable Idle Time")
.displayName("Pool - Min Evictable Idle Time")
.description("The minimum amount of time an object may sit idle in the pool before it is eligible for eviction.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("60 seconds")
.required(true)
.build();
public static final PropertyDescriptor POOL_TIME_BETWEEN_EVICTION_RUNS = new PropertyDescriptor.Builder()
.name("Pool - Time Between Eviction Runs")
.displayName("Pool - Time Between Eviction Runs")
.description("The amount of time between attempting to evict idle connections from the pool.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 seconds")
.required(true)
.build();
public static final PropertyDescriptor POOL_NUM_TESTS_PER_EVICTION_RUN = new PropertyDescriptor.Builder()
.name("Pool - Num Tests Per Eviction Run")
.displayName("Pool - Num Tests Per Eviction Run")
.description("The number of connections to tests per eviction attempt. A negative value indicates to test all connections.")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("-1")
.required(true)
.build();
public static final PropertyDescriptor POOL_TEST_ON_CREATE = new PropertyDescriptor.Builder()
.name("Pool - Test On Create")
.displayName("Pool - Test On Create")
.description("Whether or not connections should be tested upon creation.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor POOL_TEST_ON_BORROW = new PropertyDescriptor.Builder()
.name("Pool - Test On Borrow")
.displayName("Pool - Test On Borrow")
.description("Whether or not connections should be tested upon borrowing from the pool.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor POOL_TEST_ON_RETURN = new PropertyDescriptor.Builder()
.name("Pool - Test On Return")
.displayName("Pool - Test On Return")
.description("Whether or not connections should be tested upon returning to the pool.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor POOL_TEST_WHILE_IDLE = new PropertyDescriptor.Builder()
.name("Pool - Test While Idle")
.displayName("Pool - Test While Idle")
.description("Whether or not connections should be tested while idle.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final List<PropertyDescriptor> REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(RedisUtils.REDIS_MODE);
props.add(RedisUtils.CONNECTION_STRING);
props.add(RedisUtils.DATABASE);
props.add(RedisUtils.COMMUNICATION_TIMEOUT);
props.add(RedisUtils.CLUSTER_MAX_REDIRECTS);
props.add(RedisUtils.SENTINEL_MASTER);
props.add(RedisUtils.PASSWORD);
props.add(RedisUtils.POOL_MAX_TOTAL);
props.add(RedisUtils.POOL_MAX_IDLE);
props.add(RedisUtils.POOL_MIN_IDLE);
props.add(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED);
props.add(RedisUtils.POOL_MAX_WAIT_TIME);
props.add(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME);
props.add(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS);
props.add(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN);
props.add(RedisUtils.POOL_TEST_ON_CREATE);
props.add(RedisUtils.POOL_TEST_ON_BORROW);
props.add(RedisUtils.POOL_TEST_ON_RETURN);
props.add(RedisUtils.POOL_TEST_WHILE_IDLE);
REDIS_CONNECTION_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
}
public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger) {
final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
final String password = context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue();
final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
JedisConnectionFactory connectionFactory;
if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
final JedisShardInfo jedisShardInfo = createJedisShardInfo(connectionString, timeout, password);
logger.info("Connecting to Redis in standalone mode at " + connectionString);
connectionFactory = new JedisConnectionFactory(jedisShardInfo);
} else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
final String[] sentinels = connectionString.split("[,]");
final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels)));
final JedisShardInfo jedisShardInfo = createJedisShardInfo(sentinels[0], timeout, password);
logger.info("Connecting to Redis in sentinel mode...");
logger.info("Redis master = " + sentinelMaster);
for (final String sentinel : sentinels) {
logger.info("Redis sentinel at " + sentinel);
}
connectionFactory = new JedisConnectionFactory(sentinelConfiguration, poolConfig);
connectionFactory.setShardInfo(jedisShardInfo);
} else {
final String[] clusterNodes = connectionString.split("[,]");
final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes));
clusterConfiguration.setMaxRedirects(maxRedirects);
logger.info("Connecting to Redis in clustered mode...");
for (final String clusterNode : clusterNodes) {
logger.info("Redis cluster node at " + clusterNode);
}
connectionFactory = new JedisConnectionFactory(clusterConfiguration, poolConfig);
}
connectionFactory.setUsePool(true);
connectionFactory.setPoolConfig(poolConfig);
connectionFactory.setDatabase(dbIndex);
connectionFactory.setTimeout(timeout);
if (!StringUtils.isBlank(password)) {
connectionFactory.setPassword(password);
}
// need to call this to initialize the pool/connections
connectionFactory.afterPropertiesSet();
return connectionFactory;
}
private static List<String> getTrimmedValues(final String[] values) {
final List<String> trimmedValues = new ArrayList<>();
for (final String value : values) {
trimmedValues.add(value.trim());
}
return trimmedValues;
}
private static JedisShardInfo createJedisShardInfo(final String hostAndPort, final Integer timeout, final String password) {
final String[] hostAndPortSplit = hostAndPort.split("[:]");
final String host = hostAndPortSplit[0].trim();
final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
final JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port);
jedisShardInfo.setConnectionTimeout(timeout);
jedisShardInfo.setSoTimeout(timeout);
if (!StringUtils.isEmpty(password)) {
jedisShardInfo.setPassword(password);
}
return jedisShardInfo;
}
private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) {
final JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
poolConfig.setMaxWaitMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
poolConfig.setMinEvictableIdleTimeMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
poolConfig.setTimeBetweenEvictionRunsMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS));
poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
return poolConfig;
}
public static List<ValidationResult> validate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String redisMode = validationContext.getProperty(RedisUtils.REDIS_MODE).getValue();
final String connectionString = validationContext.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
final Integer dbIndex = validationContext.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
if (StringUtils.isBlank(connectionString)) {
results.add(new ValidationResult.Builder()
.subject(RedisUtils.CONNECTION_STRING.getDisplayName())
.valid(false)
.explanation("Connection String cannot be blank")
.build());
} else if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
final String[] hostAndPort = connectionString.split("[:]");
if (hostAndPort == null || hostAndPort.length != 2 || StringUtils.isBlank(hostAndPort[0]) || StringUtils.isBlank(hostAndPort[1]) || !isInteger(hostAndPort[1])) {
results.add(new ValidationResult.Builder()
.subject(RedisUtils.CONNECTION_STRING.getDisplayName())
.input(connectionString)
.valid(false)
.explanation("Standalone Connection String must be in the form host:port")
.build());
}
} else {
for (final String connection : connectionString.split("[,]")) {
final String[] hostAndPort = connection.split("[:]");
if (hostAndPort == null || hostAndPort.length != 2 || StringUtils.isBlank(hostAndPort[0]) || StringUtils.isBlank(hostAndPort[1]) || !isInteger(hostAndPort[1])) {
results.add(new ValidationResult.Builder()
.subject(RedisUtils.CONNECTION_STRING.getDisplayName())
.input(connection)
.valid(false)
.explanation("Connection String must be in the form host:port,host:port,host:port,etc.")
.build());
}
}
}
if (RedisUtils.REDIS_MODE_CLUSTER.getValue().equals(redisMode) && dbIndex > 0) {
results.add(new ValidationResult.Builder()
.subject(RedisUtils.DATABASE.getDisplayName())
.valid(false)
.explanation("Database Index must be 0 when using clustered Redis")
.build());
}
if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
final String sentinelMaster = validationContext.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
if (StringUtils.isEmpty(sentinelMaster)) {
results.add(new ValidationResult.Builder()
.subject(RedisUtils.SENTINEL_MASTER.getDisplayName())
.valid(false)
.explanation("Sentinel Master must be provided when Mode is Sentinel")
.build());
}
}
return results;
}
private static boolean isInteger(final String number) {
try {
Integer.parseInt(number);
return true;
} catch (Exception e) {
return false;
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.redis.state.RedisStateProvider

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.redis.service.RedisConnectionPoolService
org.apache.nifi.redis.service.RedisDistributedMapCacheClientService

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import java.util.Collections;
import java.util.List;
/**
* Fake processor used for testing RedisConnectionPoolService.
*/
public class FakeRedisProcessor extends AbstractProcessor {
public static final PropertyDescriptor REDIS_SERVICE = new PropertyDescriptor.Builder()
.name("redis-service")
.displayName("Redis Service")
.identifiesControllerService(RedisConnectionPool.class)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(REDIS_SERVICE);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import redis.embedded.RedisServer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This is an integration test that is meant to be run against a real Redis instance.
*/
public class ITRedisDistributedMapCacheClientService {
private TestRedisProcessor proc;
private TestRunner testRunner;
private RedisServer redisServer;
private RedisConnectionPoolService redisConnectionPool;
private RedisDistributedMapCacheClientService redisMapCacheClientService;
private int redisPort;
@Before
public void setup() throws IOException {
this.redisPort = getAvailablePort();
this.redisServer = new RedisServer(redisPort);
redisServer.start();
proc = new TestRedisProcessor();
testRunner = TestRunners.newTestRunner(proc);
}
private int getAvailablePort() throws IOException {
try (SocketChannel socket = SocketChannel.open()) {
socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.bind(new InetSocketAddress("localhost", 0));
return socket.socket().getLocalPort();
}
}
@After
public void teardown() throws IOException {
if (redisServer != null) {
redisServer.stop();
}
}
@Test
public void testStandaloneRedis() throws InitializationException {
try {
// create, configure, and enable the RedisConnectionPool service
redisConnectionPool = new RedisConnectionPoolService();
testRunner.addControllerService("redis-connection-pool", redisConnectionPool);
testRunner.setProperty(redisConnectionPool, RedisUtils.CONNECTION_STRING, "localhost:" + redisPort);
// uncomment this to test using a different database index than the default 0
//testRunner.setProperty(redisConnectionPool, RedisUtils.DATABASE, "1");
// uncomment this to test using a password to authenticate to redis
//testRunner.setProperty(redisConnectionPool, RedisUtils.PASSWORD, "foobared");
testRunner.enableControllerService(redisConnectionPool);
setupRedisMapCacheClientService();
executeProcessor();
} finally {
if (redisConnectionPool != null) {
redisConnectionPool.onDisabled();
}
}
}
private void setupRedisMapCacheClientService() throws InitializationException {
// create, configure, and enable the RedisDistributedMapCacheClient service
redisMapCacheClientService = new RedisDistributedMapCacheClientService();
testRunner.addControllerService("redis-map-cache-client", redisMapCacheClientService);
testRunner.setProperty(redisMapCacheClientService, RedisDistributedMapCacheClientService.REDIS_CONNECTION_POOL, "redis-connection-pool");
testRunner.enableControllerService(redisMapCacheClientService);
testRunner.setProperty(TestRedisProcessor.REDIS_MAP_CACHE, "redis-map-cache-client");
}
private void executeProcessor() {
// queue a flow file to trigger the processor and executeProcessor it
testRunner.enqueue("trigger");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(TestRedisProcessor.REL_SUCCESS, 1);
}
/**
* Test processor that exercises RedisDistributedMapCacheClient.
*/
private static class TestRedisProcessor extends AbstractProcessor {
public static final PropertyDescriptor REDIS_MAP_CACHE = new PropertyDescriptor.Builder()
.name("redis-map-cache")
.displayName("Redis Map Cache")
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(REDIS_MAP_CACHE);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final AtomicDistributedMapCacheClient cacheClient = context.getProperty(REDIS_MAP_CACHE).asControllerService(AtomicDistributedMapCacheClient.class);
try {
final long timestamp = System.currentTimeMillis();
final String key = "test-redis-processor-" + timestamp;
final String value = "the time is " + timestamp;
// verify the key doesn't exists, put the key/value, then verify it exists
Assert.assertFalse(cacheClient.containsKey(key, stringSerializer));
cacheClient.put(key, value, stringSerializer, stringSerializer);
Assert.assertTrue(cacheClient.containsKey(key, stringSerializer));
// verify get returns the expected value we set above
final String retrievedValue = cacheClient.get(key, stringSerializer, stringDeserializer);
Assert.assertEquals(value, retrievedValue);
// verify remove removes the entry and contains key returns false after
Assert.assertTrue(cacheClient.remove(key, stringSerializer));
Assert.assertFalse(cacheClient.containsKey(key, stringSerializer));
// verify putIfAbsent works the first time and returns false the second time
Assert.assertTrue(cacheClient.putIfAbsent(key, value, stringSerializer, stringSerializer));
Assert.assertFalse(cacheClient.putIfAbsent(key, "some other value", stringSerializer, stringSerializer));
Assert.assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify that getAndPutIfAbsent returns the existing value and doesn't modify it in the cache
final String getAndPutIfAbsentResult = cacheClient.getAndPutIfAbsent(key, value, stringSerializer, stringSerializer, stringDeserializer);
Assert.assertEquals(value, getAndPutIfAbsentResult);
Assert.assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify that getAndPutIfAbsent on a key that doesn't exist returns null
final String keyThatDoesntExist = key + "_DOES_NOT_EXIST";
Assert.assertFalse(cacheClient.containsKey(keyThatDoesntExist, stringSerializer));
final String getAndPutIfAbsentResultWhenDoesntExist = cacheClient.getAndPutIfAbsent(keyThatDoesntExist, value, stringSerializer, stringSerializer, stringDeserializer);
Assert.assertEquals(null, getAndPutIfAbsentResultWhenDoesntExist);
Assert.assertEquals(value, cacheClient.get(keyThatDoesntExist, stringSerializer, stringDeserializer));
// verify atomic fetch returns the correct entry
final AtomicCacheEntry<String,String,byte[]> entry = cacheClient.fetch(key, stringSerializer, stringDeserializer);
Assert.assertEquals(key, entry.getKey());
Assert.assertEquals(value, entry.getValue());
Assert.assertTrue(Arrays.equals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null)));
final AtomicCacheEntry<String,String,byte[]> notLatestEntry = new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), "not previous".getBytes(StandardCharsets.UTF_8));
// verify atomic replace does not replace when previous value is not equal
Assert.assertFalse(cacheClient.replace(notLatestEntry, stringSerializer, stringSerializer));
Assert.assertEquals(value, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify atomic replace does replace when previous value is equal
final String replacementValue = "this value has been replaced";
entry.setValue(replacementValue);
Assert.assertTrue(cacheClient.replace(entry, stringSerializer, stringSerializer));
Assert.assertEquals(replacementValue, cacheClient.get(key, stringSerializer, stringDeserializer));
// verify atomic replace does replace no value previous existed
final String replaceKeyDoesntExist = key + "_REPLACE_DOES_NOT_EXIST";
final AtomicCacheEntry<String,String,byte[]> entryDoesNotExist = new AtomicCacheEntry<>(replaceKeyDoesntExist, replacementValue, null);
Assert.assertTrue(cacheClient.replace(entryDoesNotExist, stringSerializer, stringSerializer));
Assert.assertEquals(replacementValue, cacheClient.get(replaceKeyDoesntExist, stringSerializer, stringDeserializer));
final int numToDelete = 2000;
for (int i=0; i < numToDelete; i++) {
cacheClient.put(key + "-" + i, value, stringSerializer, stringSerializer);
}
Assert.assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete);
Assert.assertFalse(cacheClient.containsKey(key, stringSerializer));
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Routing to failure due to: " + e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
}
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(String value, OutputStream output) throws SerializationException, IOException {
if (value != null) {
output.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}
private static class StringDeserializer implements Deserializer<String> {
@Override
public String deserialize(byte[] input) throws DeserializationException, IOException {
return input == null ? null : new String(input, StandardCharsets.UTF_8);
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.service;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestRedisConnectionPoolService {
private TestRunner testRunner;
private FakeRedisProcessor proc;
private RedisConnectionPool redisService;
@Before
public void setup() throws InitializationException {
proc = new FakeRedisProcessor();
testRunner = TestRunners.newTestRunner(proc);
redisService = new RedisConnectionPoolService();
testRunner.addControllerService("redis-service", redisService);
}
@Test
public void testValidateConnectionString() {
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, " ");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "${redis.connection}");
testRunner.assertNotValid(redisService);
testRunner.setVariable("redis.connection", "localhost:6379");
testRunner.assertValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:a");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379");
testRunner.assertValid(redisService);
// standalone can only have one host:port pair
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379,localhost:6378");
testRunner.assertNotValid(redisService);
// cluster can have multiple host:port pairs
testRunner.setProperty(redisService, RedisUtils.REDIS_MODE, RedisUtils.REDIS_MODE_CLUSTER.getValue());
testRunner.assertValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379,localhost");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "local:host:6379,localhost:6378");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:a,localhost:b");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost :6379, localhost :6378, localhost:6377");
testRunner.assertValid(redisService);
}
@Test
public void testValidateSentinelMasterRequiredInSentinelMode() {
testRunner.setProperty(redisService, RedisUtils.REDIS_MODE, RedisUtils.REDIS_MODE_SENTINEL.getValue());
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379,localhost:6378");
testRunner.assertNotValid(redisService);
testRunner.setProperty(redisService, RedisUtils.SENTINEL_MASTER, "mymaster");
testRunner.assertValid(redisService);
}
}

View File

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.state;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.util.MockComponentLog;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.embedded.RedisServer;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* NOTE: These test cases should be kept in-sync with AbstractTestStateProvider which is in the framework
* and couldn't be extended here.
*/
public class ITRedisStateProvider {
protected final String componentId = "111111111-1111-1111-1111-111111111111";
private RedisServer redisServer;
private RedisStateProvider provider;
@Before
public void setup() throws Exception {
final int redisPort = getAvailablePort();
this.redisServer = new RedisServer(redisPort);
redisServer.start();
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(RedisUtils.CONNECTION_STRING, "localhost:" + redisPort);
this.provider = createProvider(properties);
}
@After
public void teardown() throws IOException {
if (provider != null) {
try {
provider.clear(componentId);
} catch (IOException e) {
}
provider.disable();
provider.shutdown();
}
if (redisServer != null) {
redisServer.stop();
}
}
public StateProvider getProvider() {
return provider;
}
@Test
public void testSetAndGet() throws IOException {
getProvider().setState(Collections.singletonMap("testSetAndGet", "value"), componentId);
assertEquals("value", getProvider().getState(componentId).get("testSetAndGet"));
}
@Test
public void testReplaceSuccessful() throws IOException {
final String key = "testReplaceSuccessful";
final StateProvider provider = getProvider();
StateMap map = provider.getState(componentId);
assertNotNull(map);
assertEquals(-1, map.getVersion());
assertNotNull(map.toMap());
assertTrue(map.toMap().isEmpty());
provider.setState(Collections.singletonMap(key, "value1"), componentId);
map = provider.getState(componentId);
assertNotNull(map);
assertEquals(0, map.getVersion());
assertEquals("value1", map.get(key));
assertEquals("value1", map.toMap().get(key));
final Map<String, String> newMap = new HashMap<>(map.toMap());
newMap.put(key, "value2");
assertTrue(provider.replace(map, newMap, componentId));
map = provider.getState(componentId);
assertEquals("value2", map.get(key));
assertEquals(1L, map.getVersion());
}
@Test
public void testReplaceWithWrongVersion() throws IOException {
final String key = "testReplaceWithWrongVersion";
final StateProvider provider = getProvider();
provider.setState(Collections.singletonMap(key, "value1"), componentId);
StateMap stateMap = provider.getState(componentId);
assertNotNull(stateMap);
assertEquals("value1", stateMap.get(key));
assertEquals(0, stateMap.getVersion());
provider.setState(Collections.singletonMap(key, "intermediate value"), componentId);
assertFalse(provider.replace(stateMap, Collections.singletonMap(key, "value2"), componentId));
stateMap = provider.getState(componentId);
assertEquals(key, stateMap.toMap().keySet().iterator().next());
assertEquals(1, stateMap.toMap().size());
assertEquals("intermediate value", stateMap.get(key));
assertEquals(1, stateMap.getVersion());
}
@Test
public void testToMap() throws IOException {
final String key = "testKeySet";
final StateProvider provider = getProvider();
Map<String, String> map = provider.getState(componentId).toMap();
assertNotNull(map);
assertTrue(map.isEmpty());
provider.setState(Collections.singletonMap(key, "value"), componentId);
map = provider.getState(componentId).toMap();
assertNotNull(map);
assertEquals(1, map.size());
assertEquals("value", map.get(key));
provider.setState(Collections.<String, String> emptyMap(), componentId);
final StateMap stateMap = provider.getState(componentId);
map = stateMap.toMap();
assertNotNull(map);
assertTrue(map.isEmpty());
assertEquals(1, stateMap.getVersion());
}
@Test
public void testClear() throws IOException {
final StateProvider provider = getProvider();
StateMap stateMap = provider.getState(componentId);
assertNotNull(stateMap);
assertEquals(-1L, stateMap.getVersion());
assertTrue(stateMap.toMap().isEmpty());
provider.setState(Collections.singletonMap("testClear", "value"), componentId);
stateMap = provider.getState(componentId);
assertNotNull(stateMap);
assertEquals(0, stateMap.getVersion());
assertEquals("value", stateMap.get("testClear"));
provider.clear(componentId);
stateMap = provider.getState(componentId);
assertNotNull(stateMap);
assertEquals(1L, stateMap.getVersion());
assertTrue(stateMap.toMap().isEmpty());
}
@Test
public void testReplaceWithNonExistingValue() throws Exception {
final StateProvider provider = getProvider();
StateMap stateMap = provider.getState(componentId);
assertNotNull(stateMap);
final Map<String, String> newValue = new HashMap<>();
newValue.put("value", "value");
final boolean replaced = provider.replace(stateMap, newValue, componentId);
assertFalse(replaced);
}
@Test
public void testReplaceWithNonExistingValueAndVersionGreaterThanNegativeOne() throws Exception {
final StateProvider provider = getProvider();
final StateMap stateMap = new StateMap() {
@Override
public long getVersion() {
return 4;
}
@Override
public String get(String key) {
return null;
}
@Override
public Map<String, String> toMap() {
return Collections.emptyMap();
}
};
final Map<String, String> newValue = new HashMap<>();
newValue.put("value", "value");
final boolean replaced = provider.replace(stateMap, newValue, componentId);
assertFalse(replaced);
}
@Test
public void testOnComponentRemoved() throws IOException, InterruptedException {
final StateProvider provider = getProvider();
final Map<String, String> newValue = new HashMap<>();
newValue.put("value", "value");
provider.setState(newValue, componentId);
final StateMap stateMap = provider.getState(componentId);
assertEquals(0L, stateMap.getVersion());
provider.onComponentRemoved(componentId);
// wait for the background process to complete
Thread.sleep(1000L);
final StateMap stateMapAfterRemoval = provider.getState(componentId);
// version should be -1 because the state has been removed entirely.
assertEquals(-1L, stateMapAfterRemoval.getVersion());
}
private void initializeProvider(final RedisStateProvider provider, final Map<PropertyDescriptor, String> properties) throws IOException {
provider.initialize(new StateProviderInitializationContext() {
@Override
public String getIdentifier() {
return "Unit Test Provider Initialization Context";
}
@Override
public Map<PropertyDescriptor, PropertyValue> getProperties() {
final Map<PropertyDescriptor, PropertyValue> propValueMap = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
propValueMap.put(entry.getKey(), new StandardPropertyValue(entry.getValue(), null));
}
return propValueMap;
}
@Override
public Map<String,String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
String prop = properties.get(property);
if (prop == null) {
prop = property.getDefaultValue();
}
return new StandardPropertyValue(prop, null);
}
@Override
public SSLContext getSSLContext() {
return null;
}
@Override
public ComponentLog getLogger() {
return new MockComponentLog("Unit Test RedisStateProvider", provider);
}
});
}
private RedisStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception {
final RedisStateProvider provider = new RedisStateProvider();
initializeProvider(provider, properties);
provider.enable();
return provider;
}
private int getAvailablePort() throws IOException {
try (SocketChannel socket = SocketChannel.open()) {
socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.bind(new InetSocketAddress("localhost", 0));
return socket.socket().getLocalPort();
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis.state;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestRedisStateMapJsonSerDe {
private RedisStateMapSerDe serDe;
@Before
public void setup() {
serDe = new RedisStateMapJsonSerDe();
}
@Test
public void testSerializeDeserialize() throws IOException {
final RedisStateMap stateMap = new RedisStateMap.Builder()
.version(2L)
.encodingVersion(3)
.stateValue("field1", "value1")
.stateValue("field2", "value2")
.stateValue("field3", "value3")
.build();
final byte[] serialized = serDe.serialize(stateMap);
Assert.assertNotNull(serialized);
final RedisStateMap deserialized = serDe.deserialize(serialized);
Assert.assertNotNull(deserialized);
Assert.assertEquals(stateMap.getVersion(), deserialized.getVersion());
Assert.assertEquals(stateMap.getEncodingVersion(), deserialized.getEncodingVersion());
Assert.assertEquals(stateMap.toMap(), deserialized.toMap());
}
@Test
public void testSerializeWhenNull() throws IOException {
Assert.assertNull(serDe.serialize(null));
}
@Test
public void testDeserializeWhenNull() throws IOException {
Assert.assertNull(serDe.deserialize(null));
}
@Test
public void testDefaultSerialization() throws IOException {
final RedisStateMap stateMap = new RedisStateMap.Builder().build();
final byte[] serialized = serDe.serialize(stateMap);
Assert.assertNotNull(serialized);
final RedisStateMap deserialized = serDe.deserialize(serialized);
Assert.assertNotNull(deserialized);
Assert.assertEquals(RedisStateMap.DEFAULT_VERSION.longValue(), stateMap.getVersion());
Assert.assertEquals(RedisStateMap.DEFAULT_ENCODING, stateMap.getEncodingVersion());
Assert.assertNotNull(deserialized.toMap());
Assert.assertEquals(0, deserialized.toMap().size());
}
}

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-redis-nar</artifactId>
<version>1.4.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-extensions</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-service-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,32 @@
Apache NiFi
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-redis-service-api-nar</artifactId>
<version>1.4.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-service-api</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,239 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core'
which is available under a BSD style license.
Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,33 @@
Apache NiFi
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Pool
The following NOTICE information applies:
Apache Commons Pool
Copyright 1999-2009 The Apache Software Foundation.
************************
The MIT License
************************
The following binary components are provided under the MIT License. See project link for details.
(MIT License) Jedis (redis.clients:jedis:2.9.0 - https://github.com/xetorthio/jedis)
*****************
Public Domain
*****************
The following binary components are provided to the 'Public Domain'. See project link for details.
(Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-redis-service-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${spring.data.redis.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis;
import org.apache.nifi.controller.ControllerService;
import org.springframework.data.redis.connection.RedisConnection;
/**
* A service that provides connections to Redis using spring-data-redis.
*/
public interface RedisConnectionPool extends ControllerService {
/**
* Obtains a RedisConnection instance from the pool.
*
* NOTE: Clients are responsible for ensuring the close() method of the connection is called to return it to the pool.
*
* @return a RedisConnection instance
*/
RedisConnection getConnection();
/**
* Some Redis operations are only supported in a specific mode. Clients should use this method to ensure
* the connection pool they are using supports their required operations.
*
* @return the type of Redis instance (i.e. standalone, clustered, sentinel)
*/
RedisType getRedisType();
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.redis;
/**
* Possible types of Redis instances.
*/
public enum RedisType {
STANDALONE("Standalone", "A single standalone Redis instance."),
SENTINEL("Sentinel", "Redis Sentinel which provides high-availability. Described further at https://redis.io/topics/sentinel"),
CLUSTER("Cluster", "Clustered Redis which provides sharding and replication. Described further at https://redis.io/topics/cluster-spec");
private final String displayName;
private final String description;
RedisType(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}
public String getDisplayName() {
return displayName;
}
public String getDescription() {
return description;
}
public static RedisType fromDisplayName(final String displayName) {
for (RedisType redisType : values()) {
if (redisType.getDisplayName().equals(displayName)) {
return redisType;
}
}
throw new IllegalArgumentException("Unknown RedisType: " + displayName);
}
}

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<spring.data.redis.version>1.8.3.RELEASE</spring.data.redis.version>
</properties>
<modules>
<module>nifi-redis-service-api</module>
<module>nifi-redis-service-api-nar</module>
<module>nifi-redis-extensions</module>
<module>nifi-redis-nar</module>
</modules>
</project>

View File

@ -49,8 +49,14 @@ public class WaitNotifyProtocol {
private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Serializer<String> stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
private static final Serializer<String> stringSerializer = (value, output) -> {
if (value != null ) {
output.write(value.getBytes(StandardCharsets.UTF_8));
}
};
private final Deserializer<String> stringDeserializer = input -> input == null ? null : new String(input, StandardCharsets.UTF_8);
public static class Signal {

View File

@ -83,6 +83,7 @@
<module>nifi-cybersecurity-bundle</module>
<module>nifi-parquet-bundle</module>
<module>nifi-extension-utils</module>
<module>nifi-redis-bundle</module>
</modules>
<build>

12
pom.xml
View File

@ -1434,6 +1434,18 @@
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-service-api-nar</artifactId>
<version>1.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-redis-nar</artifactId>
<version>1.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
<version>1.4.0-SNAPSHOT</version>