NIFI-11794 - Fix NPE + configure max attempts for Redis State Provider (#7473)

Signed-off-by: Otto Fowler<ottobackwards@gmail.com>

This closes #7473.
This commit is contained in:
Pierre Villard 2023-07-14 17:51:18 +02:00 committed by GitHub
parent e812951c57
commit 8a61d5bdbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 3 deletions

View File

@ -105,6 +105,9 @@
nifi.properties. This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode. nifi.properties. This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode.
If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure (default false). If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure (default false).
Max Attempts - Maximum number of attempts when setting/clearing the state for a component. This number should be higher than the number of nodes
in the NiFi cluster to account for the case where each node may concurrently try to clear a state with a local scope (default is 20).
Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout). Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout).
A negative value indicates that there is no limit. A negative value indicates that there is no limit.

View File

@ -73,12 +73,22 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
.defaultValue("false") .defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor MAX_ATTEMPTS = new PropertyDescriptor.Builder()
.name("Max Attempts")
.displayName("Max Attempts")
.description("Maximum number of attempts when setting/clearing the state for a component. This number should be higher than the number of nodes "
+ "in the NiFi cluster to account for the case where each node may concurrently try to clear a state with a local scope.")
.required(true)
.defaultValue("20")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES; static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
static { static {
final List<PropertyDescriptor> props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS); final List<PropertyDescriptor> props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
props.add(KEY_PREFIX); props.add(KEY_PREFIX);
props.add(ENABLE_TLS); props.add(ENABLE_TLS);
props.add(MAX_ATTEMPTS);
STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props); STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
} }
@ -90,6 +100,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
private String identifier; private String identifier;
private String keyPrefix; private String keyPrefix;
private int maxAttempts;
private ComponentLog logger; private ComponentLog logger;
private PropertyContext context; private PropertyContext context;
private SSLContext sslContext; private SSLContext sslContext;
@ -113,6 +124,8 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
keyPrefix = keyPrefix + "/"; keyPrefix = keyPrefix + "/";
} }
this.keyPrefix = keyPrefix; this.keyPrefix = keyPrefix;
this.maxAttempts = context.getProperty(MAX_ATTEMPTS).asInteger();
} }
@Override @Override
@ -184,7 +197,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
int attempted = 0; int attempted = 0;
boolean updated = false; boolean updated = false;
while (!updated && attempted < 20) { while (!updated && attempted < this.maxAttempts) {
updated = replace(currStateMap, state, componentId, true); updated = replace(currStateMap, state, componentId, true);
attempted++; attempted++;
} }
@ -257,7 +270,8 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
final List<Object> results = redisConnection.exec(); final List<Object> results = redisConnection.exec();
// if we have a result then the replace succeeded // if we have a result then the replace succeeded
if (results.size() > 0) { // results can be null if the transaction has been aborted
if (results != null && results.size() > 0) {
replaced = true; replaced = true;
} }
@ -270,7 +284,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
int attempted = 0; int attempted = 0;
boolean updated = false; boolean updated = false;
while (!updated && attempted < 20) { while (!updated && attempted < this.maxAttempts) {
final StateMap currStateMap = getState(componentId); final StateMap currStateMap = getState(componentId);
updated = replace(currStateMap, Collections.emptyMap(), componentId, true); updated = replace(currStateMap, Collections.emptyMap(), componentId, true);

View File

@ -45,6 +45,7 @@ public class TestRedisStateProvider {
// Set up mock state provider init context // Set up mock state provider init context
Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new MockPropertyValue("/nifi/components/")); Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new MockPropertyValue("/nifi/components/"));
Mockito.when(context.getProperty(RedisStateProvider.MAX_ATTEMPTS)).thenReturn(new MockPropertyValue("20"));
// Set up mock validation context // Set up mock validation context
Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new MockPropertyValue("localhost:6379")); Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new MockPropertyValue("localhost:6379"));