NIFI-12973 Add Process Group scope to Flow Analysis rules

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8682.
This commit is contained in:
tpalfy 2024-04-22 18:52:09 +02:00 committed by Pierre Villard
parent b0ace45b55
commit bc75ef108c
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
14 changed files with 120 additions and 2 deletions

View File

@ -25,6 +25,8 @@ public class VersionedFlowAnalysisRule extends VersionedConfigurableExtension {
private ScheduledState scheduledState;
private EnforcementPolicy enforcementPolicy;
private String scope;
@Schema(description = "How to handle violations.")
public EnforcementPolicy getEnforcementPolicy() {
return enforcementPolicy;
@ -34,6 +36,14 @@ public class VersionedFlowAnalysisRule extends VersionedConfigurableExtension {
this.enforcementPolicy = enforcementPolicy;
}
public String getScope() {
return scope;
}
public void setScope(String scope) {
this.scope = scope;
}
@Override
public ComponentType getComponentType() {
return ComponentType.FLOW_ANALYSIS_RULE;

View File

@ -42,6 +42,7 @@ public class FlowAnalysisRuleDTO extends ComponentDTO {
private Boolean supportsSensitiveDynamicProperties;
private String enforcementPolicy;
private String scope;
private Map<String, String> properties;
private Map<String, PropertyDescriptorDTO> descriptors;
@ -208,6 +209,14 @@ public class FlowAnalysisRuleDTO extends ComponentDTO {
this.enforcementPolicy = enforcementPolicy;
}
public String getScope() {
return scope;
}
public void setScope(String scope) {
this.scope = scope;
}
/**
* @return flow analysis rule's properties
*/

View File

@ -77,6 +77,8 @@ public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode
private volatile String comment;
private EnforcementPolicy enforcementPolicy;
private String scope;
private volatile FlowAnalysisRuleState state = FlowAnalysisRuleState.DISABLED;
public AbstractFlowAnalysisRuleNode(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule, final String id,
@ -114,6 +116,16 @@ public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode
this.enforcementPolicy = enforcementPolicy;
}
@Override
public String getScope() {
return scope;
}
@Override
public void setScope(String scope) {
this.scope = scope;
}
@Override
public ConfigurableComponent getComponent() {
return flowAnalysisRuleRef.get().getFlowAnalysisRule();

View File

@ -3624,6 +3624,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
flowAnalysisRule.setName(proposed.getName());
flowAnalysisRule.setComments(proposed.getComments());
flowAnalysisRule.setEnforcementPolicy(proposed.getEnforcementPolicy());
flowAnalysisRule.setScope(proposed.getScope());
if (!isEqual(flowAnalysisRule.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());

View File

@ -427,6 +427,7 @@ public class NiFiRegistryFlowMapper {
versionedRule.setProperties(mapProperties(flowAnalysisRuleNode, serviceProvider));
versionedRule.setPropertyDescriptors(mapPropertyDescriptors(flowAnalysisRuleNode, serviceProvider, Collections.emptySet(), Collections.emptyMap()));
versionedRule.setEnforcementPolicy(flowAnalysisRuleNode.getEnforcementPolicy());
versionedRule.setScope(flowAnalysisRuleNode.getScope());
versionedRule.setType(flowAnalysisRuleNode.getCanonicalClassName());
versionedRule.setScheduledState(flowMappingOptions.getStateLookup().getState(flowAnalysisRuleNode));

View File

@ -38,6 +38,10 @@ public interface FlowAnalysisRuleNode extends ComponentNode {
*/
EnforcementPolicy getEnforcementPolicy();
String getScope();
void setScope(String scope);
void setFlowAnalysisRule(LoggableComponent<FlowAnalysisRule> flowAnalysisRule);
FlowAnalysisRuleContext getFlowAnalysisRuleContext();

View File

@ -591,7 +591,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
ruleViolationsManager
);
if (flowAnalyzer != null) {
flowAnalyzer.initialize(controllerServiceProvider);
flowAnalyzer.initialize(flowManager, controllerServiceProvider);
}
final CronSchedulingAgent cronSchedulingAgent = new CronSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);

View File

@ -694,6 +694,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
ruleNode.setName(flowAnalysisRule.getName());
ruleNode.setComments(flowAnalysisRule.getComments());
ruleNode.setEnforcementPolicy(flowAnalysisRule.getEnforcementPolicy());
ruleNode.setScope(flowAnalysisRule.getScope());
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(ruleNode, flowAnalysisRule);
final Map<String, String> decryptedProperties = decryptProperties(flowAnalysisRule.getProperties(), controller.getEncryptor());

View File

@ -18,6 +18,7 @@ package org.apache.nifi.flowanalysis;
import org.apache.nifi.controller.FlowAnalysisRuleNode;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
@ -29,6 +30,7 @@ import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.validation.RuleViolation;
@ -36,6 +38,7 @@ import org.apache.nifi.validation.RuleViolationsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -57,6 +60,7 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
private final FlowAnalysisRuleProvider flowAnalysisRuleProvider;
private final ExtensionManager extensionManager;
private StandardFlowManager flowManager;
private ControllerServiceProvider controllerServiceProvider;
private volatile boolean flowAnalysisRequired;
@ -71,7 +75,11 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
this.extensionManager = extensionManager;
}
public void initialize(final ControllerServiceProvider controllerServiceProvider) {
public void initialize(
final StandardFlowManager flowManager,
final ControllerServiceProvider controllerServiceProvider
) {
this.flowManager = flowManager;
this.controllerServiceProvider = controllerServiceProvider;
}
@ -127,6 +135,7 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
Set<RuleViolation> violations = flowAnalysisRules.stream()
.filter(FlowAnalysisRuleNode::isEnabled)
.filter(ruleNode -> isWithinScope(ruleNode, component.getGroupIdentifier()))
.flatMap(flowAnalysisRuleNode -> {
String ruleId = flowAnalysisRuleNode.getIdentifier();
@ -195,6 +204,7 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
flowAnalysisRules.stream()
.filter(FlowAnalysisRuleNode::isEnabled)
.filter(ruleNode -> isWithinScope(ruleNode, groupId))
.forEach(flowAnalysisRuleNode -> {
String ruleId = flowAnalysisRuleNode.getIdentifier();
@ -250,6 +260,28 @@ public class StandardFlowAnalyzer implements FlowAnalyzer {
processGroup.getProcessGroups().forEach(childProcessGroup -> analyzeProcessGroup(childProcessGroup, flowAnalysisRules, groupViolations, componentToRuleViolations));
}
private boolean isWithinScope(FlowAnalysisRuleNode ruleNode, String groupIdentifier) {
final String ruleScope = ruleNode.getScope();
while (groupIdentifier != null) {
if (ruleScope == null || ruleScope.isBlank()) {
return true;
}
final HashSet<String> scopedProcessGroupIds = new HashSet<>(Arrays.asList(ruleScope.split("\\s*,\\s*")));
if (scopedProcessGroupIds.contains(groupIdentifier)) {
return true;
}
groupIdentifier = Optional.ofNullable(flowManager.getGroup(groupIdentifier))
.map(ProcessGroup::getParent)
.map(ProcessGroup::getIdentifier)
.orElse(null);
}
return false;
}
private String getDisplayName(VersionedComponent component) {
final String displayName;

View File

@ -44,6 +44,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
@ -59,6 +60,7 @@ public class FlowAnalysisRuleAuditor extends NiFiAuditor {
private static final String ANNOTATION_DATA = "Annotation Data";
private static final String EXTENSION_VERSION = "Extension Version";
private static final String ENFORCEMENT_POLICY = "Enforcement Policy";
private static final String SCOPE = "Scope";
/**
* Audits the creation of flow analysis rule via createFlowAnalysisRule().
@ -106,6 +108,7 @@ public class FlowAnalysisRuleAuditor extends NiFiAuditor {
final Map<String, String> values = extractConfiguredPropertyValues(flowAnalysisRule, flowAnalysisRuleDTO);
final FlowAnalysisRuleState state = flowAnalysisRule.getState();
final EnforcementPolicy enforcementPolicy = flowAnalysisRule.getEnforcementPolicy();
final String scope = flowAnalysisRule.getScope();
// update the flow analysis rule state
final FlowAnalysisRuleNode updatedFlowAnalysisRule = (FlowAnalysisRuleNode) proceedingJoinPoint.proceed();
@ -202,6 +205,25 @@ public class FlowAnalysisRuleAuditor extends NiFiAuditor {
actions.add(configurationAction);
}
final String updatedScope = flowAnalysisRule.getScope();
if (!Objects.equals(updatedScope, scope)) {
final FlowChangeConfigureDetails actionDetails = new FlowChangeConfigureDetails();
actionDetails.setName(SCOPE);
actionDetails.setValue(String.valueOf(updatedScope));
actionDetails.setPreviousValue(String.valueOf(scope));
final FlowChangeAction configurationAction = new FlowChangeAction();
configurationAction.setUserIdentity(user.getIdentity());
configurationAction.setOperation(Operation.Configure);
configurationAction.setTimestamp(actionTimestamp);
configurationAction.setSourceId(flowAnalysisRule.getIdentifier());
configurationAction.setSourceName(flowAnalysisRule.getName());
configurationAction.setSourceType(Component.FlowAnalysisRule);
configurationAction.setComponentDetails(ruleDetails);
configurationAction.setActionDetails(actionDetails);
actions.add(configurationAction);
}
// determine the new state
final FlowAnalysisRuleState updatedState = flowAnalysisRule.getState();

View File

@ -4986,6 +4986,7 @@ public final class DtoFactory {
final FlowAnalysisRuleDTO dto = new FlowAnalysisRuleDTO();
dto.setId(flowAnalysisRuleNode.getIdentifier());
dto.setEnforcementPolicy(flowAnalysisRuleNode.getEnforcementPolicy().name());
dto.setScope(flowAnalysisRuleNode.getScope());
dto.setName(flowAnalysisRuleNode.getName());
dto.setType(flowAnalysisRuleNode.getCanonicalClassName());
dto.setBundle(createBundleDto(bundleCoordinate));

View File

@ -304,6 +304,7 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna
final String name = flowAnalysisRuleDTO.getName();
final String comments = flowAnalysisRuleDTO.getComments();
final String enforcementPolicy = flowAnalysisRuleDTO.getEnforcementPolicy();
final String scope = flowAnalysisRuleDTO.getScope();
final Map<String, String> properties = flowAnalysisRuleDTO.getProperties();
flowAnalysisRule.pauseValidationTrigger(); // avoid triggering validation multiple times
@ -311,6 +312,9 @@ public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements FlowAna
if (isNotNull(enforcementPolicy)) {
flowAnalysisRule.setEnforcementPolicy(EnforcementPolicy.valueOf(enforcementPolicy));
}
if (isNotNull(scope)) {
flowAnalysisRule.setScope(scope);
}
if (isNotNull(name)) {
flowAnalysisRule.setName(name);
}

View File

@ -61,6 +61,18 @@
<span id="read-only-flow-analysis-rule-enforcement-policy"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">
Scope
<div class="fa fa-question-circle" alt="Info" title="A comma-separated list of ids of the process groups to which this rule should be limited to."></div>
</div>
<div class="flow-analysis-rule-editable setting-field">
<input type="text" id="flow-analysis-rule-scope" name="flow-analysis-rule-scope"/>
</div>
<div class="flow-analysis-rule-read-only setting-field hidden">
<span id="read-only-flow-analysis-rule-scope"></span>
</div>
</div>
</div>
<div class="clear"></div>
</div>

View File

@ -108,6 +108,10 @@
return true;
}
if ($('#flow-analysis-rule-scope').val() !== entity.component['scope']) {
return true;
}
// defer to the properties
return $('#flow-analysis-rule-properties').propertytable('isSaveRequired');
};
@ -127,6 +131,7 @@
flowAnalysisRuleDto['name'] = $('#flow-analysis-rule-name').val();
flowAnalysisRuleDto['comments'] = $('#flow-analysis-rule-comments').val();
flowAnalysisRuleDto['enforcementPolicy'] = enforcementPolicy;
flowAnalysisRuleDto['scope'] = $('#flow-analysis-rule-scope').val();
// set the properties
if ($.isEmptyObject(properties) === false) {
@ -475,6 +480,8 @@
}
});
$('#flow-analysis-rule-scope').val(flowAnalysisRule['scope']);
var buttons = [{
buttonText: 'Apply',
color: {
@ -650,6 +657,8 @@
}
nfCommon.populateField('read-only-flow-analysis-rule-enforcement-policy', enforcementPolicy);
nfCommon.populateField('read-only-flow-analysis-rule-scope', flowAnalysisRule['scope']);
var buttons = [{
buttonText: 'Ok',
color: {