NIFI-5792: Remember created versioned flow information

- Before this fix, NiFi loses information about created versioned flow in case of subsequent snapshot creation failure, and NiFi API returned an error response
- This commit makes:
  - The created versioned Flow information is stored even if subsequent snapshot creation fails
  - NiFi API to return a successful 200 response in that case, but return versioned flow status as SYNC_FAILURE with an explanation. NiFi UI shows a popup error dialog with the explanation.
  - Versioned flow status will be LOCALLY_MODIFIED if the latest version is 0.

This closes #3134.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Koji Kawamura 2018-11-06 14:08:23 +09:00 committed by Mark Payne
parent 94c2b1e76e
commit c5d0643d1d
3 changed files with 53 additions and 11 deletions

View File

@ -3057,6 +3057,10 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private boolean isModified() {
if (versionControlInformation.getVersion() == 0) {
return true;
}
Set<FlowDifference> differences = versionControlFields.getFlowDifferences();
if (differences == null) {
differences = getModifications();
@ -3110,7 +3114,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionedFlowState flowState = versionControlInformation.getStatus().getState();
versionControlFields.setStale(flowState == VersionedFlowState.STALE || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
versionControlFields.setLocallyModified(flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
versionControlFields.setSyncFailureExplanation(null);
versionControlFields.setSyncFailureExplanation(flowState == VersionedFlowState.SYNC_FAILURE ? versionControlInformation.getStatus().getStateExplanation() : null);
writeLock.lock();
try {
@ -3268,7 +3272,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final VersionedProcessGroup snapshot = vci.getFlowSnapshot();
if (snapshot == null) {
if (snapshot == null && vci.getVersion() > 0) {
// We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
// This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
try {
@ -3301,8 +3305,13 @@ public final class StandardProcessGroup implements ProcessGroup {
vci.setRegistryName(flowRegistry.getName());
if (latestVersion == vci.getVersion()) {
LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
versionControlFields.setStale(false);
if (latestVersion == 0) {
LOG.debug("{} does not have any version in the Registry", this, latestVersion);
versionControlFields.setLocallyModified(true);
} else {
LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
}
} else {
LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
this, vci.getVersion(), latestVersion);

View File

@ -3778,24 +3778,46 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final String registryId = requestEntity.getVersionedFlow().getRegistryId();
final VersionedFlowSnapshot registeredSnapshot;
final VersionedFlow registeredFlow;
final boolean registerNewFlow = versionedFlowDto.getFlowId() == null;
String action = "create the flow";
try {
// first, create the flow in the registry, if necessary
if (versionedFlowDto.getFlowId() == null) {
if (registerNewFlow) {
registeredFlow = registerVersionedFlow(registryId, versionedFlow);
} else {
registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
}
action = "add the local flow to the Flow Registry as the first Snapshot";
// add first snapshot to the flow in the registry
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
} catch (final NiFiRegistryException e) {
throw new IllegalArgumentException(e.getLocalizedMessage());
} catch (final IOException ioe) {
throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to " + action);
throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to create the flow");
}
try {
// add a snapshot to the flow in the registry
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
} catch (final NiFiCoreException e) {
// If the flow has been created, but failed to add a snapshot,
// then we need to capture the created versioned flow information as a partial successful result.
if (registerNewFlow) {
logger.error("The flow has been created, but failed to add a snapshot. Returning the created flow information.", e);
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(registeredFlow.getBucketIdentifier());
vci.setBucketName(registeredFlow.getBucketName());
vci.setFlowId(registeredFlow.getIdentifier());
vci.setFlowName(registeredFlow.getName());
vci.setFlowDescription(registeredFlow.getDescription());
vci.setGroupId(groupId);
vci.setRegistryId(registryId);
vci.setRegistryName(getFlowRegistryName(registryId));
vci.setVersion(0);
vci.setState(VersionedFlowState.SYNC_FAILURE.name());
vci.setStateExplanation(e.getLocalizedMessage());
return createVersionControlComponentMappingEntity(groupId, versionedProcessGroup, vci);
}
throw e;
}
final Bucket bucket = registeredSnapshot.getBucket();
@ -3814,6 +3836,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
vci.setState(VersionedFlowState.UP_TO_DATE.name());
return createVersionControlComponentMappingEntity(groupId, versionedProcessGroup, vci);
}
private VersionControlComponentMappingEntity createVersionControlComponentMappingEntity(String groupId, InstantiatedVersionedProcessGroup versionedProcessGroup, VersionControlInformationDTO vci) {
final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
final Revision groupRevision = revisionManager.getRevision(groupId);

View File

@ -421,6 +421,13 @@
url: '../nifi-api/versions/process-groups/' + encodeURIComponent(processGroupId),
dataType: 'json',
contentType: 'application/json'
}).done(function (response) {
if ('SYNC_FAILURE' === response.versionControlInformation.state) {
nfDialog.showOkDialog({
headerText: 'Error',
dialogContent: response.versionControlInformation.stateExplanation
});
}
}).fail(nfErrorHandler.handleAjaxError);
};