NIFI-7460: Avoid NPE when a VersionedProcessor has a null value for autoTerminatedRelationships. Added additional logging and improved error handling around syncing with invalid flows

This commit is contained in:
Mark Payne 2020-05-15 14:25:51 -04:00 committed by Bryan Bende
parent 53a161234e
commit c51b9051a8
3 changed files with 74 additions and 35 deletions

View File

@ -374,7 +374,16 @@ public final class StandardProcessGroup implements ProcessGroup {
// update the vci counts for this child group
final VersionControlInformation vci = childGroup.getVersionControlInformation();
if (vci != null) {
switch (vci.getStatus().getState()) {
final VersionedFlowStatus flowStatus;
try {
flowStatus = vci.getStatus();
} catch (final Exception e) {
LOG.warn("Could not determine Version Control State for {}. Will consider state to be SYNC_FAILURE", this, e);
syncFailure++;
continue;
}
switch (flowStatus.getState()) {
case LOCALLY_MODIFIED:
locallyModified++;
break;
@ -1646,7 +1655,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("identifier", getIdentifier()).toString();
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("identifier", getIdentifier())
.append("name", getName())
.toString();
}
@Override
@ -3345,28 +3357,33 @@ public final class StandardProcessGroup implements ProcessGroup {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation);
}
final boolean modified = isModified();
if (!modified) {
final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
if (vci.getFlowSnapshot() == null) {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
try {
final boolean modified = isModified();
if (!modified) {
final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
if (vci.getFlowSnapshot() == null) {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
}
}
final boolean stale = versionControlFields.isStale();
final VersionedFlowState flowState;
if (modified && stale) {
flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
} else if (modified) {
flowState = VersionedFlowState.LOCALLY_MODIFIED;
} else if (stale) {
flowState = VersionedFlowState.STALE;
} else {
flowState = VersionedFlowState.UP_TO_DATE;
}
return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
} catch (final Exception e) {
LOG.warn("Could not correctly determine Versioned Flow Status for {}. Will consider state to be SYNC_FAILURE", this, e);
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Could not properly determine flow status due to: " + e);
}
final boolean stale = versionControlFields.isStale();
final VersionedFlowState flowState;
if (modified && stale) {
flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
} else if (modified) {
flowState = VersionedFlowState.LOCALLY_MODIFIED;
} else if (stale) {
flowState = VersionedFlowState.STALE;
} else {
flowState = VersionedFlowState.UP_TO_DATE;
}
return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
}
};
@ -4909,15 +4926,16 @@ public final class StandardProcessGroup implements ProcessGroup {
return null;
}
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
try {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences().stream()
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_PUBLIC_PORT_NAME_CHANGES)
@ -4927,8 +4945,11 @@ public final class StandardProcessGroup implements ProcessGroup {
.filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
.collect(Collectors.toCollection(HashSet::new));
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
return differences;
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
return differences;
} catch (final RuntimeException e) {
throw new RuntimeException("Could not compute differences between local flow and Versioned Flow in NiFi Registry for " + this, e);
}
}

View File

@ -62,10 +62,16 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardEventAccess implements UserAwareEventAccess {
private static final Logger logger = LoggerFactory.getLogger(StandardEventAccess.class);
private final FlowFileEventRepository flowFileEventRepository;
private final FlowController flowController;
private final StatusAnalyticsEngine statusAnalyticsEngine;
@ -552,8 +558,16 @@ public class StandardEventAccess implements UserAwareEventAccess {
status.setBytesTransferred(bytesTransferred);
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null && vci.getStatus() != null && vci.getStatus().getState() != null) {
status.setVersionedFlowState(vci.getStatus().getState());
if (vci != null) {
try {
final VersionedFlowStatus flowStatus = vci.getStatus();
if (flowStatus != null && flowStatus.getState() != null) {
status.setVersionedFlowState(flowStatus.getState());
}
} catch (final Exception e) {
logger.warn("Failed to determine Version Control State for {}. Will consider state to be SYNC_FAILURE", group, e);
status.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
}
}
return status;

View File

@ -36,6 +36,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
@ -196,8 +197,8 @@ public class FlowDifferenceFilters {
// Determine if this Flow Difference indicates that Processor B has all of the same Auto-Terminated Relationships as Processor A, plus some.
// If that is the case, then it may be that a new Relationship was added, defaulting to 'Auto-Terminated' and that Processor B is still auto-terminated.
// We want to be able to identify that case.
final Set<String> autoTerminatedA = processorA.getAutoTerminatedRelationships();
final Set<String> autoTerminatedB = processorB.getAutoTerminatedRelationships();
final Set<String> autoTerminatedA = replaceNull(processorA.getAutoTerminatedRelationships(), Collections.emptySet());
final Set<String> autoTerminatedB = replaceNull(processorB.getAutoTerminatedRelationships(), Collections.emptySet());
// If B is smaller than A, then B cannot possibly contain all of A. So use that as a first comparison to avoid the expense of #containsAll
if (autoTerminatedB.size() < autoTerminatedA.size() || !autoTerminatedB.containsAll(autoTerminatedA)) {
@ -233,6 +234,9 @@ public class FlowDifferenceFilters {
return true;
}
private static <T> T replaceNull(final T value, final T replacement) {
return value == null ? replacement : value;
}
/**
* Determines whether or not the given Process Group has a Connection whose source is the given Processor and that contains the given relationship