mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 10:38:33 +00:00
NIFI-12131 Fixed potential NPE related to FlowAnalyzer
Revamped RuleViolationsManager and FlowAnalyzer handling to make sure no issue occurs when these are not set. Fix ResourceNotFoundException: When determining the subject permissions for a rule violation the type of the subject is now known, so we try to lookup all possible types. Non-matching tpyes throw a ResourceNotFoundException exception though. Going to ignore those. This closes #7809 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
19b4be40aa
commit
9c425d273c
@ -59,6 +59,7 @@ import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -124,7 +125,7 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
String identifier = group.getIdentifier();
|
||||
allProcessGroups.remove(identifier);
|
||||
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
|
||||
public void onProcessorAdded(final ProcessorNode procNode) {
|
||||
@ -137,7 +138,7 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
allProcessors.remove(identifier);
|
||||
pythonBridge.onProcessorRemoved(identifier, procNode.getComponentType(), procNode.getBundleCoordinate().getVersion());
|
||||
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
|
||||
public Set<ProcessorNode> findAllProcessors(final Predicate<ProcessorNode> filter) {
|
||||
@ -197,7 +198,7 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
flowFileEventRepository.purgeTransferEvents(identifier);
|
||||
allConnections.remove(identifier);
|
||||
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
|
||||
public Connection getConnection(final String id) {
|
||||
@ -353,7 +354,7 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
flowFileEventRepository.purgeTransferEvents(identifier);
|
||||
allInputPorts.remove(identifier);
|
||||
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
|
||||
public Port getInputPort(final String id) {
|
||||
@ -369,7 +370,7 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
flowFileEventRepository.purgeTransferEvents(identifier);
|
||||
allOutputPorts.remove(identifier);
|
||||
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
|
||||
public Port getOutputPort(final String id) {
|
||||
@ -385,7 +386,7 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
flowFileEventRepository.purgeTransferEvents(identifier);
|
||||
allFunnels.remove(identifier);
|
||||
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
|
||||
public Funnel getFunnel(final String id) {
|
||||
@ -686,12 +687,18 @@ public abstract class AbstractFlowManager implements FlowManager {
|
||||
protected abstract Authorizable getParameterContextParent();
|
||||
|
||||
@Override
|
||||
public FlowAnalyzer getFlowAnalyzer() {
|
||||
return flowAnalyzer;
|
||||
public Optional<FlowAnalyzer> getFlowAnalyzer() {
|
||||
return Optional.ofNullable(flowAnalyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleViolationsManager getRuleViolationsManager() {
|
||||
return ruleViolationsManager;
|
||||
public Optional<RuleViolationsManager> getRuleViolationsManager() {
|
||||
return Optional.ofNullable(ruleViolationsManager);
|
||||
}
|
||||
|
||||
private void removeRuleViolationsForSubject(String identifier) {
|
||||
if (ruleViolationsManager != null) {
|
||||
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -835,7 +835,6 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||
|
||||
@Override
|
||||
protected void performFlowAnalysisOnThis() {
|
||||
Optional.ofNullable(getValidationContextFactory().getFlowAnalyzer())
|
||||
.ifPresent(flowAnalyzer -> flowAnalyzer.analyzeControllerService(this));
|
||||
getValidationContextFactory().getFlowAnalyzer().ifPresent(flowAnalyzer -> flowAnalyzer.analyzeControllerService(this));
|
||||
}
|
||||
}
|
||||
|
@ -683,7 +683,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||
extensionManager.removeInstanceClassLoader(serviceNode.getIdentifier());
|
||||
serviceCache.remove(serviceNode.getIdentifier());
|
||||
|
||||
flowManager.getRuleViolationsManager().removeRuleViolationsForSubject(serviceNode.getIdentifier());
|
||||
flowManager.getRuleViolationsManager().ifPresent(
|
||||
ruleViolationsManager -> ruleViolationsManager.removeRuleViolationsForSubject(serviceNode.getIdentifier())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -26,6 +26,7 @@ import org.apache.nifi.parameter.ParameterContext;
|
||||
import org.apache.nifi.validation.RuleViolationsManager;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class StandardValidationContextFactory implements ValidationContextFactory {
|
||||
|
||||
@ -54,12 +55,12 @@ public class StandardValidationContextFactory implements ValidationContextFactor
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleViolationsManager getRuleViolationsManager() {
|
||||
return ruleViolationsManager;
|
||||
public Optional<RuleViolationsManager> getRuleViolationsManager() {
|
||||
return Optional.ofNullable(ruleViolationsManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowAnalyzer getFlowAnalyzer() {
|
||||
return flowAnalyzer;
|
||||
public Optional<FlowAnalyzer> getFlowAnalyzer() {
|
||||
return Optional.ofNullable(flowAnalyzer);
|
||||
}
|
||||
}
|
@ -62,7 +62,6 @@ import org.apache.nifi.util.CharacterFilterUtils;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
|
||||
import org.apache.nifi.validation.RuleViolation;
|
||||
import org.apache.nifi.validation.RuleViolationsManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -789,8 +788,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||
|
||||
performFlowAnalysisOnThis();
|
||||
|
||||
RuleViolationsManager ruleViolationsManager = getValidationContextFactory().getRuleViolationsManager();
|
||||
if (ruleViolationsManager != null) {
|
||||
getValidationContextFactory().getRuleViolationsManager().ifPresent(ruleViolationsManager -> {
|
||||
Collection<RuleViolation> ruleViolations = ruleViolationsManager.getRuleViolationsForSubject(getIdentifier());
|
||||
for (RuleViolation ruleViolation : ruleViolations) {
|
||||
if (ruleViolation.getEnforcementPolicy() == EnforcementPolicy.ENFORCE) {
|
||||
@ -803,7 +801,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug("Computed validation errors with Validation Context {}; results = {}", validationContext, validationResults);
|
||||
|
||||
|
@ -36,7 +36,6 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@ -286,8 +285,7 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
|
||||
|
||||
@Override
|
||||
protected void performFlowAnalysisOnThis() {
|
||||
Optional.ofNullable(getValidationContextFactory().getFlowAnalyzer())
|
||||
.ifPresent(flowAnalyzer -> flowAnalyzer.analyzeProcessor(this));
|
||||
getValidationContextFactory().getFlowAnalyzer().ifPresent(flowAnalyzer -> flowAnalyzer.analyzeProcessor(this));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,13 +23,14 @@ import org.apache.nifi.parameter.ParameterContext;
|
||||
import org.apache.nifi.validation.RuleViolationsManager;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface ValidationContextFactory {
|
||||
|
||||
ValidationContext newValidationContext(Map<PropertyDescriptor, PropertyConfiguration> properties, String annotationData, String groupId, String componentId, ParameterContext parameterContext,
|
||||
boolean validateConnections);
|
||||
|
||||
RuleViolationsManager getRuleViolationsManager();
|
||||
Optional<RuleViolationsManager> getRuleViolationsManager();
|
||||
|
||||
FlowAnalyzer getFlowAnalyzer();
|
||||
Optional<FlowAnalyzer> getFlowAnalyzer();
|
||||
}
|
||||
|
@ -435,7 +435,7 @@ public interface FlowManager extends ParameterProviderLookup {
|
||||
|
||||
Set<FlowAnalysisRuleNode> getAllFlowAnalysisRules();
|
||||
|
||||
FlowAnalyzer getFlowAnalyzer();
|
||||
Optional<FlowAnalyzer> getFlowAnalyzer();
|
||||
|
||||
RuleViolationsManager getRuleViolationsManager();
|
||||
Optional<RuleViolationsManager> getRuleViolationsManager();
|
||||
}
|
||||
|
@ -562,11 +562,15 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
|
||||
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
|
||||
|
||||
this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true);
|
||||
if (ruleViolationsManager != null) {
|
||||
flowAnalyzer = new StandardFlowAnalyzer(
|
||||
ruleViolationsManager,
|
||||
this,
|
||||
extensionManager
|
||||
);
|
||||
} else {
|
||||
flowAnalyzer = null;
|
||||
}
|
||||
|
||||
flowManager = new StandardFlowManager(
|
||||
nifiProperties,
|
||||
@ -600,7 +604,9 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
|
||||
flowAnalyzer,
|
||||
ruleViolationsManager
|
||||
);
|
||||
if (flowAnalyzer != null) {
|
||||
flowAnalyzer.initialize(controllerServiceProvider);
|
||||
}
|
||||
|
||||
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
|
||||
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
|
||||
@ -1204,7 +1210,9 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
|
||||
}
|
||||
};
|
||||
|
||||
if (flowAnalyzer != null) {
|
||||
new TriggerFlowAnalysisTask(flowAnalyzer, rootProcessGroupSupplier).run();
|
||||
}
|
||||
new TriggerValidationTask(flowManager, triggerIfValidating).run();
|
||||
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
@ -1288,11 +1296,12 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
|
||||
}
|
||||
|
||||
private void scheduleBackgroundFlowAnalysis(Supplier<VersionedProcessGroup> rootProcessGroupSupplier) {
|
||||
if (flowAnalyzer != null) {
|
||||
try {
|
||||
final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.BACKGROUND_FLOW_ANALYSIS_SCHEDULE);
|
||||
|
||||
flowAnalysisThreadPool.scheduleWithFixedDelay(
|
||||
new TriggerFlowAnalysisTask(flowManager.getFlowAnalyzer(), rootProcessGroupSupplier),
|
||||
new TriggerFlowAnalysisTask(flowAnalyzer, rootProcessGroupSupplier),
|
||||
scheduleMillis,
|
||||
scheduleMillis,
|
||||
TimeUnit.MILLISECONDS
|
||||
@ -1301,6 +1310,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
|
||||
LOG.warn("Could not initialize TriggerFlowAnalysisTask.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleLongRunningTaskMonitor() {
|
||||
longRunningTaskMonitorThreadPool.ifPresent(flowEngine -> {
|
||||
|
@ -351,8 +351,8 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
||||
.addClasspathUrls(additionalUrls)
|
||||
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
|
||||
.extensionManager(extensionManager)
|
||||
.flowAnalyzer(getFlowAnalyzer())
|
||||
.ruleViolationsManager(getRuleViolationsManager())
|
||||
.flowAnalyzer(getFlowAnalyzer().orElse(null))
|
||||
.ruleViolationsManager(getRuleViolationsManager().orElse(null))
|
||||
.classloaderIsolationKey(classloaderIsolationKey)
|
||||
.pythonBridge(flowController.getPythonBridge())
|
||||
.buildProcessor();
|
||||
@ -557,8 +557,8 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
||||
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
|
||||
.flowController(flowController)
|
||||
.extensionManager(extensionManager)
|
||||
.flowAnalyzer(getFlowAnalyzer())
|
||||
.ruleViolationsManager(getRuleViolationsManager())
|
||||
.flowAnalyzer(getFlowAnalyzer().orElse(null))
|
||||
.ruleViolationsManager(getRuleViolationsManager().orElse(null))
|
||||
.classloaderIsolationKey(classloaderIsolationKey)
|
||||
.buildFlowAnalysisRuleNode();
|
||||
|
||||
@ -693,7 +693,11 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
||||
|
||||
processScheduler.submitFrameworkTask(() -> flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier()));
|
||||
|
||||
processScheduler.submitFrameworkTask(() -> getRuleViolationsManager().removeRuleViolationsForSubject(service.getIdentifier()));
|
||||
getRuleViolationsManager().ifPresent(
|
||||
ruleViolationsManager -> processScheduler.submitFrameworkTask(
|
||||
() -> ruleViolationsManager.removeRuleViolationsForSubject(service.getIdentifier())
|
||||
)
|
||||
);
|
||||
|
||||
extensionManager.removeInstanceClassLoader(service.getIdentifier());
|
||||
|
||||
@ -720,8 +724,8 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
||||
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
|
||||
.stateManagerProvider(flowController.getStateManagerProvider())
|
||||
.extensionManager(extensionManager)
|
||||
.flowAnalyzer(getFlowAnalyzer())
|
||||
.ruleViolationsManager(getRuleViolationsManager())
|
||||
.flowAnalyzer(getFlowAnalyzer().orElse(null))
|
||||
.ruleViolationsManager(getRuleViolationsManager().orElse(null))
|
||||
.classloaderIsolationKey(classloaderIsolationKey)
|
||||
.buildControllerService();
|
||||
|
||||
|
@ -6307,7 +6307,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||
controllerFacade.getControllerServiceProvider()
|
||||
);
|
||||
|
||||
controllerFacade.getFlowManager().getFlowAnalyzer().analyzeProcessGroup(nonVersionedProcessGroup);
|
||||
controllerFacade.getFlowManager().getFlowAnalyzer().ifPresent(
|
||||
flowAnalyzer -> flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -6408,12 +6410,16 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||
}
|
||||
|
||||
private Optional<AuthorizableHolder> findAuthorizableHolder(
|
||||
String id,
|
||||
Function<String, AuthorizableHolder>... lookupMethods
|
||||
final String id,
|
||||
final Function<String, AuthorizableHolder>... lookupMethods
|
||||
) {
|
||||
AuthorizableHolder authorizableHolder = null;
|
||||
for (Function<String, AuthorizableHolder> lookupMethod : lookupMethods) {
|
||||
try {
|
||||
authorizableHolder = lookupMethod.apply(id);
|
||||
} catch (ResourceNotFoundException e) {
|
||||
// We don't know beforehand what kind of component we are looking for. Ignore if one lookup fails.
|
||||
}
|
||||
if (authorizableHolder != null) {
|
||||
break;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user