NIFI-6328 Allowing force-commit when in LOCALLY_MODIFIED_AND_STALE state

NIFI-6328 Addressing review feedback
NIFI-6328 Additional validation on action

This closes #3499
This commit is contained in:
Bryan Bende 2019-04-02 15:25:55 -04:00 committed by Matt Gilman
parent 75fb34c8ee
commit 2102d8a0bd
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
11 changed files with 132 additions and 28 deletions

View File

@ -23,12 +23,16 @@ import javax.xml.bind.annotation.XmlType;
@XmlType(name = "versionedFlow")
public class VersionedFlowDTO {
public static final String COMMIT_ACTION = "COMMIT";
public static final String FORCE_COMMIT_ACTION = "FORCE_COMMIT";
private String registryId = "default"; // placeholder for now.
private String bucketId;
private String flowId;
private String flowName;
private String description;
private String comments;
private String action;
@ApiModelProperty("The ID of the registry that the flow is tracked to")
public String getRegistryId() {
@ -83,4 +87,13 @@ public class VersionedFlowDTO {
public void setComments(String comments) {
this.comments = comments;
}
@ApiModelProperty(value = "The action being performed", allowableValues = COMMIT_ACTION + ", " + FORCE_COMMIT_ACTION)
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
}

View File

@ -16,13 +16,6 @@
*/
package org.apache.nifi.groups;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.components.validation.ValidationStatus;
@ -46,6 +39,13 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.remote.RemoteGroupPort;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
/**
* <p>
* ProcessGroup objects are containers for processing entities, such as
@ -901,7 +901,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*
* @throws IllegalStateException if the Process Group cannot currently be saved to a Flow Registry
*/
void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId);
void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId, String saveAction);
/**
* Adds the given template to this Process Group

View File

@ -121,6 +121,7 @@ import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -4730,7 +4731,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId) {
public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId, final String saveAction) {
verifyNoDescendantsWithLocalModifications("be saved to a Flow Registry");
final StandardVersionControlInformation vci = versionControlInfo.get();
@ -4739,7 +4740,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
// In order to do this, we have to ensure that the Process Group is 'current'.
final VersionedFlowState state = vci.getStatus().getState();
if (state == VersionedFlowState.STALE || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE) {
if (state == VersionedFlowState.STALE
|| (state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE && VersionedFlowDTO.COMMIT_ACTION.equals(saveAction))) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");

View File

@ -649,7 +649,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId) {
public void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId, String action) {
}
@Override

View File

@ -1439,10 +1439,11 @@ public interface NiFiServiceFacade {
* @param registryId the ID of the Flow Registry
* @param bucketId the ID of the bucket
* @param flowId the ID of the flow
* @param saveAction the save action being performed
*
* @throws IllegalStateException if the Process Group cannot be saved to the flow registry with the coordinates specified
*/
void verifyCanSaveToFlowRegistry(String groupId, String registryId, String bucketId, String flowId);
void verifyCanSaveToFlowRegistry(String groupId, String registryId, String bucketId, String flowId, String saveAction);
/**
* Verifies that the Process Group with the given identifier can have its local modifications reverted to the given VersionedFlowSnapshot

View File

@ -3753,15 +3753,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
int snapshotVersion;
if (VersionedFlowDTO.FORCE_COMMIT_ACTION.equals(versionedFlowDto.getAction())) {
snapshotVersion = -1;
} else {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
snapshotVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
}
// Create a VersionedProcessGroup snapshot of the flow as it is currently.
final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
final VersionedFlow versionedFlow = new VersionedFlow();
@ -3793,7 +3797,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
try {
// add a snapshot to the flow in the registry
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), expectedVersion);
registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, versionedFlowDto.getComments(), snapshotVersion);
} catch (final NiFiCoreException e) {
// If the flow has been created, but failed to add a snapshot,
// then we need to capture the created versioned flow information as a partial successful result.
@ -4020,9 +4024,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) {
public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId, final String saveAction) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId);
group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId, saveAction);
}
@Override

View File

@ -468,6 +468,13 @@ public class VersionsResource extends ApplicationResource {
if (versionedFlowDto.getComments() != null && versionedFlowDto.getComments().length() > 65535) {
throw new IllegalArgumentException("Comments cannot exceed 65,535 characters");
}
if (StringUtils.isEmpty(versionedFlowDto.getAction())) {
throw new IllegalArgumentException("Action is required");
}
if (!VersionedFlowDTO.COMMIT_ACTION.equals(versionedFlowDto.getAction())
&& !VersionedFlowDTO.FORCE_COMMIT_ACTION.equals(versionedFlowDto.getAction())) {
throw new IllegalArgumentException("Action must be one of " + VersionedFlowDTO.COMMIT_ACTION + " or " + VersionedFlowDTO.FORCE_COMMIT_ACTION);
}
if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged());
@ -534,7 +541,8 @@ public class VersionsResource extends ApplicationResource {
final String registryId = versionedFlow.getRegistryId();
final String bucketId = versionedFlow.getBucketId();
final String flowId = versionedFlow.getFlowId();
serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId);
final String action = versionedFlow.getAction();
serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId, action);
},
(rev, flowEntity) -> {
// Register the current flow with the Flow Registry.

View File

@ -35,6 +35,7 @@
<div class="setting-name">Flow Name</div>
<div id="save-flow-version-registry-container" class="setting-field">
<span id="save-flow-version-process-group-id" class="hidden"></span>
<span id="save-flow-version-action" class="hidden"></span>
<input type="text" id="save-flow-version-name-field" class="setting-input hidden"/>
<div id="save-flow-version-name" class="hidden"></div>
<div id="save-flow-version-label"></div>

View File

@ -1304,15 +1304,37 @@
*/
saveFlowVersion: function (selection) {
if (selection.empty()) {
nfFlowVersion.showFlowVersionDialog(nfCanvasUtils.getGroupId());
nfFlowVersion.showFlowVersionDialog(nfCanvasUtils.getGroupId(), 'COMMIT');
} else if (selection.size() === 1) {
var selectionData = selection.datum();
if (nfCanvasUtils.isProcessGroup(selection)) {
nfFlowVersion.showFlowVersionDialog(selectionData.id);
nfFlowVersion.showFlowVersionDialog(selectionData.id, 'COMMIT');
}
}
},
/**
* Confirms force save and shows the flow version dialog.
*/
forceSaveFlowVersion: function (selection) {
nfDialog.showYesNoDialog({
headerText: 'Commit',
dialogContent: 'Committing will ignore available upgrades and commit local changes as the next version. Are you sure you want to proceed?',
noText: 'Cancel',
yesText: 'Yes',
yesHandler: function () {
if (selection.empty()) {
nfFlowVersion.showFlowVersionDialog(nfCanvasUtils.getGroupId(),'FORCE_COMMIT');
} else if (selection.size() === 1) {
var selectionData = selection.datum();
if (nfCanvasUtils.isProcessGroup(selection)) {
nfFlowVersion.showFlowVersionDialog(selectionData.id, 'FORCE_COMMIT');
}
};
}
});
},
/**
* Reverts local changes.
*/

View File

@ -504,6 +504,46 @@
return versionControlInformation.state === 'LOCALLY_MODIFIED';
};
/**
* Returns whether the process group support supports force commit.
*
* @param selection
* @returns {boolean}
*/
var supportsForceCommitFlowVersion = function (selection) {
// ensure this selection supports flow versioning above
if (supportsFlowVersioning(selection) === false) {
return false;
}
var versionControlInformation;
if (selection.empty()) {
// check bread crumbs for version control information in the current group
var breadcrumbEntities = nfNgBridge.injector.get('breadcrumbsCtrl').getBreadcrumbs();
if (breadcrumbEntities.length > 0) {
var breadcrumbEntity = breadcrumbEntities[breadcrumbEntities.length - 1];
if (breadcrumbEntity.permissions.canRead) {
versionControlInformation = breadcrumbEntity.breadcrumb.versionControlInformation;
} else {
return false;
}
} else {
return false;
}
} else {
var processGroupData = selection.datum();
versionControlInformation = processGroupData.component.versionControlInformation;
}
if (nfCommon.isUndefinedOrNull(versionControlInformation)) {
return false;
}
// check the selection for version control information
return versionControlInformation.state === 'LOCALLY_MODIFIED_AND_STALE';
};
/**
* Returns whether the process group supports revert local changes.
*
@ -798,6 +838,7 @@
{id: 'start-version-control-menu-item', condition: supportsStartFlowVersioning, menuItem: {clazz: 'fa fa-upload', text: 'Start version control', action: 'saveFlowVersion'}},
{separator: true},
{id: 'commit-menu-item', condition: supportsCommitFlowVersion, menuItem: {clazz: 'fa fa-upload', text: 'Commit local changes', action: 'saveFlowVersion'}},
{id: 'force-commit-menu-item', condition: supportsForceCommitFlowVersion, menuItem: {clazz: 'fa fa-upload', text: 'Commit local changes', action: 'forceSaveFlowVersion'}},
{id: 'local-changes-menu-item', condition: hasLocalChanges, menuItem: {clazz: 'fa', text: 'Show local changes', action: 'showLocalChanges'}},
{id: 'revert-menu-item', condition: hasLocalChanges, menuItem: {clazz: 'fa fa-undo', text: 'Revert local changes', action: 'revertLocalChanges'}},
{id: 'change-version-menu-item', condition: supportsChangeFlowVersion, menuItem: {clazz: 'fa', text: 'Change version', action: 'changeFlowVersion'}},

View File

@ -100,6 +100,7 @@
$('#save-flow-version-change-comments').val('');
$('#save-flow-version-process-group-id').removeData('versionControlInformation').removeData('revision').text('');
$('#save-flow-version-action').text('');
};
/**
@ -400,7 +401,8 @@
registryId: versionControlInformation.registryId,
bucketId: versionControlInformation.bucketId,
flowId: versionControlInformation.flowId,
comments: $('#save-flow-version-change-comments').val()
comments: $('#save-flow-version-change-comments').val(),
action: $('#save-flow-version-action').text()
}
} else {
var selectedRegistry = $('#save-flow-version-registry-combo').combo('getSelectedOption');
@ -411,7 +413,8 @@
bucketId: selectedBucket.value,
flowName: $('#save-flow-version-name-field').val(),
description: $('#save-flow-version-description-field').val(),
comments: $('#save-flow-version-change-comments').val()
comments: $('#save-flow-version-change-comments').val(),
action: $('#save-flow-version-action').text()
};
}
@ -1753,7 +1756,7 @@
*
* @param processGroupId
*/
showFlowVersionDialog: function (processGroupId) {
showFlowVersionDialog: function (processGroupId, action) {
var focusName = true;
return $.Deferred(function (deferred) {
@ -1764,7 +1767,12 @@
// update the registry and bucket visibility
$('#save-flow-version-registry').text(versionControlInformation.registryName).show();
$('#save-flow-version-bucket').text(versionControlInformation.bucketName).show();
$('#save-flow-version-label').text(versionControlInformation.version + 1);
if (action == 'COMMIT') {
$('#save-flow-version-label').text(versionControlInformation.version + 1).show();
} else {
$('#save-flow-version-label').hide();
}
$('#save-flow-version-name').text(versionControlInformation.flowName).show();
nfCommon.populateField('save-flow-version-description', versionControlInformation.flowDescription);
@ -1773,6 +1781,9 @@
// record the versionControlInformation
$('#save-flow-version-process-group-id').data('versionControlInformation', versionControlInformation);
// record the type of action (i.e. commit vs force-commit)
$('#save-flow-version-action').text(action);
// reposition the version label
$('#save-flow-version-label').css('margin-top', '-15px');
@ -1798,10 +1809,11 @@
}).show();
// set the initial version
$('#save-flow-version-label').text(1);
$('#save-flow-version-label').text(1).show();
$('#save-flow-version-name-field').show();
$('#save-flow-version-description-field').show();
$('#save-flow-version-action').text(action);
// reposition the version label
$('#save-flow-version-label').css('margin-top', '0');