NIFI-8774: Fixed NullPointerExceptions

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5208
This commit is contained in:
Mark Payne 2021-07-09 20:43:45 -04:00 committed by Matthew Burgess
parent 75de68e013
commit a4cfdbb695
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 46 additions and 38 deletions

View File

@ -164,7 +164,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -229,7 +228,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
private volatile BatchCounts batchCounts = new NoOpBatchCounts();
private final DataValve dataValve;
private final Map<String, String> niFiPropertiesBackPressure;
private final Long nifiPropertiesBackpressureCount;
private final String nifiPropertiesBackpressureSize;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@ -237,21 +237,15 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
private static final String DEFAULT_BACKPRESSURE_OBJECT = "0";
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "0 GB";
private static final long DEFAULT_BACKPRESSURE_OBJECT = 10_000L;
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "1 GB";
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider) {
this(id, serviceProvider, scheduler, encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient,
reloadComponent, variableRegistry, nodeTypeProvider, null);
}
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider,
final NiFiProperties niFiProperties) {
final NiFiProperties nifiProperties) {
this.id = id;
this.controllerServiceProvider = serviceProvider;
@ -276,14 +270,36 @@ public final class StandardProcessGroup implements ProcessGroup {
this.defaultFlowFileExpiration = new AtomicReference<>();
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
// save only the nifi properties needed, and account for the possibility those properties are missing
niFiPropertiesBackPressure = new ConcurrentHashMap<>();
niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_COUNT,
niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT) == null ? DEFAULT_BACKPRESSURE_OBJECT : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT));
niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_SIZE,
niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE) == null ? DEFAULT_BACKPRESSURE_DATA_SIZE : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE));
if (nifiProperties == null) {
nifiPropertiesBackpressureCount = DEFAULT_BACKPRESSURE_OBJECT;
nifiPropertiesBackpressureSize = DEFAULT_BACKPRESSURE_DATA_SIZE;
} else {
// Validate the property values.
Long count;
try {
final String explicitValue = nifiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT, String.valueOf(DEFAULT_BACKPRESSURE_OBJECT));
count = Long.parseLong(explicitValue);
} catch (final Exception e) {
LOG.warn("nifi.properties has an invalid value for the '" + NiFiProperties.BACKPRESSURE_COUNT + "' property. Using default value instaed.");
count = DEFAULT_BACKPRESSURE_OBJECT;
}
nifiPropertiesBackpressureCount = count;
String size;
try {
size = nifiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE, DEFAULT_BACKPRESSURE_DATA_SIZE);
DataUnit.parseDataSize(size, DataUnit.B);
} catch (final Exception e) {
LOG.warn("nifi.properties has an invalid value for the '" + NiFiProperties.BACKPRESSURE_SIZE + "' property. Using default value instaed.");
size = DEFAULT_BACKPRESSURE_DATA_SIZE;
}
nifiPropertiesBackpressureSize = size;
}
}
@Override
public ProcessGroup getParent() {
return parent.get();
@ -5642,15 +5658,9 @@ public final class StandardProcessGroup implements ProcessGroup {
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
// use default if value not provided
if (defaultBackPressureObjectThreshold == null) {
this.defaultBackPressureObjectThreshold.set(Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT)));
this.defaultBackPressureObjectThreshold.set(nifiPropertiesBackpressureCount);
} else {
// Validate field is numeric
Pattern pattern = Pattern.compile("(\\d+)");
if (pattern.matcher(String.valueOf(defaultBackPressureObjectThreshold)).matches()) {
this.defaultBackPressureObjectThreshold.set(defaultBackPressureObjectThreshold);
} else {
throw new IllegalArgumentException("The Default Back Pressure Object Threshold of the process group must be numeric.");
}
this.defaultBackPressureObjectThreshold.set(defaultBackPressureObjectThreshold);
}
}
@ -5659,28 +5669,23 @@ public final class StandardProcessGroup implements ProcessGroup {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
if (defaultBackPressureObjectThreshold.get() == null) {
if (isRootGroup()) {
return Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT));
return nifiPropertiesBackpressureCount;
} else {
return getParent().getDefaultBackPressureObjectThreshold();
}
}
return defaultBackPressureObjectThreshold.get();
return defaultBackPressureObjectThreshold.get();
}
@Override
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
// use default if value not provided
if (StringUtils.isBlank(defaultBackPressureDataSizeThreshold)) {
this.defaultBackPressureDataSizeThreshold.set(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE));
this.defaultBackPressureDataSizeThreshold.set(nifiPropertiesBackpressureSize);
} else {
// Validate entry: must include size unit label
Pattern pattern = Pattern.compile(DataUnit.DATA_SIZE_REGEX);
String caseAdjustedSizeThreshold = defaultBackPressureDataSizeThreshold.toUpperCase();
if (pattern.matcher(caseAdjustedSizeThreshold).matches()) {
this.defaultBackPressureDataSizeThreshold.set(caseAdjustedSizeThreshold);
} else {
throw new IllegalArgumentException("The Default Back Pressure Data Size Threshold of the process group must contain a valid data size unit.");
}
DataUnit.parseDataSize(defaultBackPressureDataSizeThreshold, DataUnit.B);
this.defaultBackPressureDataSizeThreshold.set(defaultBackPressureDataSizeThreshold.toUpperCase());
}
}
@ -5689,11 +5694,12 @@ public final class StandardProcessGroup implements ProcessGroup {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
if (StringUtils.isEmpty(defaultBackPressureDataSizeThreshold.get())) {
if (isRootGroup()) {
return niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE);
return nifiPropertiesBackpressureSize;
} else {
return parent.get().getDefaultBackPressureDataSizeThreshold();
}
}
return defaultBackPressureDataSizeThreshold.get();
}
}

View File

@ -216,7 +216,8 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
statelessEngine.getFlowRegistryClient(),
statelessEngine.getReloadComponent(),
mutableVariableRegistry,
new StatelessNodeTypeProvider());
new StatelessNodeTypeProvider(),
null);
}
@Override
@ -242,6 +243,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
.source(requireNonNull(source))
.destination(destination)
.flowFileQueueFactory(flowFileQueueFactory)
.processGroup(destination.getProcessGroup())
.build();
return connection;