NIFI-13952 - Flow Analysis Rule to restrict backpressure configuration

This commit is contained in:
Pierre Villard 2024-10-31 13:55:18 +01:00
parent 41edb12183
commit c90f00d0e7
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
7 changed files with 433 additions and 4 deletions

View File

@ -23,7 +23,20 @@
<artifactId>nifi-standard-rules</artifactId> <artifactId>nifi-standard-rules</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies />
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json</exclude>
<exclude>src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flowanalysis.rules;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
import org.apache.nifi.flowanalysis.GroupAnalysisResult;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Tags({"connection", "backpressure"})
@CapabilityDescription("This rule will generate a violation if backpressure settings of a connection exceed configured thresholds. "
+ "Improper configuration of backpressure settings can lead to decreased performances because of excessive swapping as well as "
+ "to filled up content repository with too much in-flight data in NiFi.")
public class RestrictBackpressureSettings extends AbstractFlowAnalysisRule {
public static final PropertyDescriptor COUNT_MIN = new PropertyDescriptor.Builder()
.name("Minimum Backpressure Object Count Threshold")
.description("This is the minimum value that should be set for the Object Count backpressure setting on connections. "
+ "This can be used to prevent a user from setting a value of 0 which disables backpressure based on count.")
.required(true)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.defaultValue("1")
.build();
public static final PropertyDescriptor COUNT_MAX = new PropertyDescriptor.Builder()
.name("Maximum Backpressure Object Count Threshold")
.description("This is the maximum value that should be set for the Object Count backpressure setting on connections. "
+ "This can be used to prevent a user from setting a very high value that may be leading to a lot of swapping.")
.required(true)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.defaultValue("10000")
.build();
public static final PropertyDescriptor SIZE_MIN = new PropertyDescriptor.Builder()
.name("Minimum Backpressure Data Size Threshold")
.description("This is the minimum value that should be set for the Data Size backpressure setting on connections. "
+ "This can be used to prevent a user from setting a value of 0 which disables backpressure based on size.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor SIZE_MAX = new PropertyDescriptor.Builder()
.name("Maximum Backpressure Data Size Threshold")
.description("This is the maximum value that should be set for the Data Size backpressure setting on connections. "
+ "This can be used to prevent a user from setting a very high value that may be filling up the content repo.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 GB")
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
COUNT_MIN,
COUNT_MAX,
SIZE_MIN,
SIZE_MAX
);
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<ValidationResult>();
final long minCount = validationContext.getProperty(COUNT_MIN).asLong();
final long maxCount = validationContext.getProperty(COUNT_MAX).asLong();
final double minSize = validationContext.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
final double maxSize = validationContext.getProperty(SIZE_MAX).asDataSize(DataUnit.B);
if (minCount > maxCount) {
results.add(
new ValidationResult.Builder()
.subject(COUNT_MIN.getName())
.valid(false)
.explanation("Value of '" + COUNT_MIN.getName() + "' cannot be strictly greater than '" + COUNT_MAX.getName() + "'")
.build());
}
if (Double.compare(minSize, maxSize) > 0) {
results.add(
new ValidationResult.Builder()
.subject(SIZE_MIN.getName())
.valid(false)
.explanation("Value of '" + SIZE_MIN.getName() + "' cannot be strictly greater than '" + SIZE_MAX.getName() + "'")
.build());
}
return results;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Collection<GroupAnalysisResult> analyzeProcessGroup(VersionedProcessGroup pg, FlowAnalysisRuleContext context) {
final Collection<GroupAnalysisResult> results = new HashSet<GroupAnalysisResult>();
final long minCount = context.getProperty(COUNT_MIN).asLong();
final long maxCount = context.getProperty(COUNT_MAX).asLong();
final double minSize = context.getProperty(SIZE_MIN).asDataSize(DataUnit.B);
final double maxSize = context.getProperty(SIZE_MAX).asDataSize(DataUnit.B);
// Map of all id/components to generate more human readable violations
final Map<String, VersionedComponent> idComponent = Stream.of(
pg.getFunnels().stream(),
pg.getProcessors().stream(),
pg.getInputPorts().stream(),
pg.getOutputPorts().stream()
).flatMap(c -> c)
.collect(Collectors.toMap(c -> c.getIdentifier(), Function.identity()));
pg.getConnections().stream().forEach(
connection -> {
if (connection.getBackPressureObjectThreshold() < minCount) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT,
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_BELOW_LIMIT, connection.getBackPressureObjectThreshold().toString(), Long.toString(minCount))));
}
if (connection.getBackPressureObjectThreshold() > maxCount) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT,
getViolationMessage(BackpressureViolationType.BP_COUNT_THRESHOLD_ABOVE_LIMIT, connection.getBackPressureObjectThreshold().toString(), Long.toString(maxCount))));
}
final double sizeThreshold = DataUnit.parseDataSize(connection.getBackPressureDataSizeThreshold(), DataUnit.B);
if (Double.compare(sizeThreshold, minSize) < 0) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT,
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_BELOW_LIMIT, connection.getBackPressureDataSizeThreshold(), context.getProperty(SIZE_MIN).getValue())));
}
if (Double.compare(sizeThreshold, maxSize) > 0) {
results.add(buildViolation(connection,
idComponent.get(connection.getSource().getId()),
idComponent.get(connection.getDestination().getId()),
BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT,
getViolationMessage(BackpressureViolationType.BP_SIZE_THRESHOLD_ABOVE_LIMIT, connection.getBackPressureDataSizeThreshold(), context.getProperty(SIZE_MAX).getValue())));
}
}
);
return results;
}
private GroupAnalysisResult buildViolation(final VersionedConnection connection, final VersionedComponent source,
final VersionedComponent destination, final BackpressureViolationType backpressureViolationType, final String violationMessage) {
if (!(source instanceof VersionedProcessor) && !(destination instanceof VersionedProcessor)) {
// connection between two components that are not processors and cannot be invalid, setting violation on connection
return GroupAnalysisResult.forComponent(connection,
connection.getIdentifier() + "_" + backpressureViolationType.getId(),
getLocationMessage(connection, source, destination) + violationMessage).build();
} else if (source instanceof VersionedProcessor) {
// defining violation on source processor
return GroupAnalysisResult.forComponent(source,
connection.getIdentifier() + "_" + backpressureViolationType.getId(),
getLocationMessage(connection, source, destination) + violationMessage).build();
} else {
// defining violation on destination processor
return GroupAnalysisResult.forComponent(destination,
connection.getIdentifier() + "_" + backpressureViolationType.getId(),
getLocationMessage(connection, source, destination) + violationMessage).build();
}
}
private String getLocationMessage(final VersionedConnection connection, final VersionedComponent source, final VersionedComponent destination) {
if (source == null || destination == null) {
return "The connection [" + connection.getIdentifier() + "] is violating the rule for backpressure settings. ";
}
return "The connection [" + connection.getIdentifier() + "] connecting " + source.getName() + " [" + source.getIdentifier() + "] to "
+ destination.getName() + " [" + destination.getIdentifier() + "] is violating the rule for backpressure settings. ";
}
private String getViolationMessage(final BackpressureViolationType backpressureViolationType, final String configured, final String limit) {
switch (backpressureViolationType) {
case BP_COUNT_THRESHOLD_ABOVE_LIMIT:
return "The connection is configured with a Backpressure Count Threshold of " + configured + " and it should be lesser or equal than " + limit + ".";
case BP_COUNT_THRESHOLD_BELOW_LIMIT:
return "The connection is configured with a Backpressure Count Threshold of " + configured + " and it should be greater or equal than " + limit + ".";
case BP_SIZE_THRESHOLD_ABOVE_LIMIT:
return "The connection is configured with a Backpressure Data Size Threshold of " + configured + " and it should be lesser or equal than " + limit + ".";
case BP_SIZE_THRESHOLD_BELOW_LIMIT:
return "The connection is configured with a Backpressure Data Size Threshold of " + configured + " and it should be greater or equal than " + limit + ".";
default:
return null;
}
}
private enum BackpressureViolationType {
BP_COUNT_THRESHOLD_BELOW_LIMIT("BackpressureCountThresholdTooLow"),
BP_COUNT_THRESHOLD_ABOVE_LIMIT("BackpressureCountThresholdTooHigh"),
BP_SIZE_THRESHOLD_BELOW_LIMIT("BackpressureSizeThresholdTooLow"),
BP_SIZE_THRESHOLD_ABOVE_LIMIT("BackpressureSizeThresholdTooHigh");
private String id;
BackpressureViolationType(String id) {
this.id = id;
}
public String getId() {
return this.id;
}
}
}

View File

@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.flowanalysis.rules.DisallowComponentType org.apache.nifi.flowanalysis.rules.DisallowComponentType
org.apache.nifi.flowanalysis.rules.RestrictBackpressureSettings

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flowanalysis.rules;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.mockito.ArgumentMatchers.any;
import java.io.FileInputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule;
import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext;
import org.apache.nifi.flowanalysis.GroupAnalysisResult;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
@ExtendWith(MockitoExtension.class)
public abstract class AbstractFlowAnalaysisRuleTest<T extends AbstractFlowAnalysisRule> {
private static final ObjectMapper FLOW_MAPPER = new ObjectMapper();
protected Map<PropertyDescriptor, PropertyValue> properties = new HashMap<PropertyDescriptor, PropertyValue>();
protected T rule;
@Mock
protected FlowAnalysisRuleContext flowAnalysisRuleContext;
@Mock
protected ConfigurationContext configurationContext;
@Mock
protected ValidationContext validationContext;
protected abstract T initializeRule();
@BeforeEach
public void setup() {
rule = initializeRule();
Mockito.lenient().when(flowAnalysisRuleContext.getProperty(any())).thenAnswer(invocation -> {
return properties.get(invocation.getArgument(0));
});
Mockito.lenient().when(configurationContext.getProperty(any())).thenAnswer(invocation -> {
return properties.get(invocation.getArgument(0));
});
Mockito.lenient().when(validationContext.getProperty(any())).thenAnswer(invocation -> {
return properties.get(invocation.getArgument(0));
});
}
protected void setProperty(PropertyDescriptor propertyDescriptor, String value) {
properties.put(propertyDescriptor, new StandardPropertyValue(value, null, null));
}
private VersionedProcessGroup getProcessGroup(String flowDefinition) throws Exception {
final RegisteredFlowSnapshot flowSnapshot = FLOW_MAPPER.readValue(new FileInputStream(flowDefinition), RegisteredFlowSnapshot.class);
return flowSnapshot.getFlowContents();
}
protected void testAnalyzeProcessGroup(String flowDefinition, List<String> expected) throws Exception {
final Collection<GroupAnalysisResult> actual = rule.analyzeProcessGroup(getProcessGroup(flowDefinition), flowAnalysisRuleContext);
assertIterableEquals(expected, actual.stream().map(r -> r.getComponent().get().getInstanceIdentifier()).sorted().toList());
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flowanalysis.rules;
import static org.junit.jupiter.api.Assertions.assertFalse;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class RestrictBackpressureSettingsTest extends AbstractFlowAnalaysisRuleTest<RestrictBackpressureSettings> {
@Override
protected RestrictBackpressureSettings initializeRule() {
return new RestrictBackpressureSettings();
}
@BeforeEach
@Override
public void setup() {
super.setup();
setProperty(RestrictBackpressureSettings.COUNT_MIN, RestrictBackpressureSettings.COUNT_MIN.getDefaultValue());
setProperty(RestrictBackpressureSettings.COUNT_MAX, RestrictBackpressureSettings.COUNT_MAX.getDefaultValue());
setProperty(RestrictBackpressureSettings.SIZE_MIN, RestrictBackpressureSettings.SIZE_MIN.getDefaultValue());
setProperty(RestrictBackpressureSettings.SIZE_MAX, RestrictBackpressureSettings.SIZE_MAX.getDefaultValue());
}
@Test
public void testWrongCountConfiguration() {
setProperty(RestrictBackpressureSettings.COUNT_MIN, "100");
setProperty(RestrictBackpressureSettings.COUNT_MAX, "10");
assertFalse(rule.customValidate(validationContext).isEmpty());
}
@Test
public void testWrongSizeConfiguration() {
setProperty(RestrictBackpressureSettings.SIZE_MIN, "1GB");
setProperty(RestrictBackpressureSettings.SIZE_MAX, "1MB");
assertFalse(rule.customValidate(validationContext).isEmpty());
}
@Test
public void testNoViolations() throws Exception {
testAnalyzeProcessGroup(
"src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings_noViolation.json",
List.of()
);
}
@Test
public void testViolations() throws Exception {
testAnalyzeProcessGroup(
"src/test/resources/RestrictBackpressureSettings/RestrictBackpressureSettings.json",
List.of(
"e26f20c1-0192-1000-ff8b-bcf395c02076", // processor GenerateFlowFile connecting to UpdateAttribute
"e26f3857-0192-1000-776a-3d62e15f75dc", // processor UpdateAttribute connecting to funnel
"e26f3857-0192-1000-776a-3d62e15f75dc", // processor UpdateAttribute connecting from funnel
"e26fd0d5-0192-1000-ee3d-f90141590475", // connection from funnel to funnel
"e27073f8-0192-1000-cf43-9c41e69eadd2", // connection from output port to funnel
"e270eaa4-0192-1000-0622-8f9af5319328" // connection from funnel to input port
)
);
}
}