NIFI-4436:

- Addressing PR feedback.
- Addressing two phase commit logic issue when changing the flow version.
This commit is contained in:
Matt Gilman 2018-01-03 17:16:57 -05:00 committed by Bryan Bende
parent 63544c880f
commit b3e1584ef4
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 134 additions and 47 deletions

View File

@ -58,6 +58,7 @@ import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.CreateActiveRequestEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
@ -724,19 +725,19 @@ public class VersionsResource extends ApplicationResource {
throw new IllegalArgumentException("Process Group Revision must be specified.");
}
final VersionedFlowSnapshot flowSnapshot = requestEntity.getVersionedFlowSnapshot();
if (flowSnapshot == null) {
final VersionedFlowSnapshot requestFlowSnapshot = requestEntity.getVersionedFlowSnapshot();
if (requestFlowSnapshot == null) {
throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied.");
}
final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
if (snapshotMetadata == null) {
final VersionedFlowSnapshotMetadata requestSnapshotMetadata = requestFlowSnapshot.getSnapshotMetadata();
if (requestSnapshotMetadata == null) {
throw new IllegalArgumentException("Snapshot Metadata must be supplied.");
}
if (snapshotMetadata.getBucketIdentifier() == null) {
if (requestSnapshotMetadata.getBucketIdentifier() == null) {
throw new IllegalArgumentException("The Bucket ID must be supplied.");
}
if (snapshotMetadata.getFlowIdentifier() == null) {
if (requestSnapshotMetadata.getFlowIdentifier() == null) {
throw new IllegalArgumentException("The Flow ID must be supplied.");
}
@ -761,9 +762,12 @@ public class VersionsResource extends ApplicationResource {
// the client has explicitly indicated the dataflow that the Process Group should
// provide and provided the Revision to ensure that they have the most up-to-date
// view of the Process Group.
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false);
serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false);
},
(rev, entity) -> {
final VersionedFlowSnapshot flowSnapshot = entity.getVersionedFlowSnapshot();
final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
final Bucket bucket = flowSnapshot.getBucket();
final VersionedFlow flow = flowSnapshot.getFlow();
@ -1000,7 +1004,7 @@ public class VersionsResource extends ApplicationResource {
})
public Response initiateVersionControlUpdate(
@ApiParam("The process group id.") @PathParam("id") final String groupId,
@ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) throws IOException {
@ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) {
// Verify the request
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
@ -1008,26 +1012,26 @@ public class VersionsResource extends ApplicationResource {
throw new IllegalArgumentException("Process Group Revision must be specified");
}
final VersionControlInformationDTO versionControlInfoDto = requestEntity.getVersionControlInformation();
if (versionControlInfoDto == null) {
final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation();
if (requestVersionControlInfoDto == null) {
throw new IllegalArgumentException("Version Control Information must be supplied.");
}
if (versionControlInfoDto.getGroupId() == null) {
if (requestVersionControlInfoDto.getGroupId() == null) {
throw new IllegalArgumentException("The Process Group ID must be supplied.");
}
if (!versionControlInfoDto.getGroupId().equals(groupId)) {
if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) {
throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource.");
}
if (versionControlInfoDto.getBucketId() == null) {
if (requestVersionControlInfoDto.getBucketId() == null) {
throw new IllegalArgumentException("The Bucket ID must be supplied.");
}
if (versionControlInfoDto.getFlowId() == null) {
if (requestVersionControlInfoDto.getFlowId() == null) {
throw new IllegalArgumentException("The Flow ID must be supplied.");
}
if (versionControlInfoDto.getRegistryId() == null) {
if (requestVersionControlInfoDto.getRegistryId() == null) {
throw new IllegalArgumentException("The Registry ID must be supplied.");
}
if (versionControlInfoDto.getVersion() == null) {
if (requestVersionControlInfoDto.getVersion() == null) {
throw new IllegalArgumentException("The Version of the flow must be supplied.");
}
@ -1037,7 +1041,6 @@ public class VersionsResource extends ApplicationResource {
final boolean replicateRequest = isReplicateRequest();
final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle;
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Workflow for this process:
@ -1082,11 +1085,14 @@ public class VersionsResource extends ApplicationResource {
// Step 1: Determine which components will be affected by updating the version
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user);
final URI exampleUri = getAbsolutePath();
// build a request wrapper
final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents,
replicateRequest, flowSnapshot);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
requestEntity,
requestWrapper,
requestRevision,
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component.
@ -1106,7 +1112,9 @@ public class VersionsResource extends ApplicationResource {
// Step 5: Verify that Process Group is not 'dirty'
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, true);
},
(revision, processGroupEntity) -> {
(revision, wrapper) -> {
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
@ -1115,8 +1123,9 @@ public class VersionsResource extends ApplicationResource {
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true);
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(),
wrapper.getAffectedComponents(), user, wrapper.isReplicateRequest(), revision, wrapper.getVersionControlInformationEntity(), wrapper.getFlowSnapshot(), request,
idGenerationSeed, true, true);
vcur.markComplete(updatedVersionControlEntity);
} catch (final Exception e) {
@ -1139,7 +1148,7 @@ public class VersionsResource extends ApplicationResource {
updateRequestDto.setState(request.getState());
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision);
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision();
updateRequestEntity.setProcessGroupRevision(groupRevision);
updateRequestEntity.setRequest(updateRequestDto);
@ -1187,26 +1196,26 @@ public class VersionsResource extends ApplicationResource {
throw new IllegalArgumentException("Process Group Revision must be specified");
}
final VersionControlInformationDTO versionControlInfoDto = requestEntity.getVersionControlInformation();
if (versionControlInfoDto == null) {
final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation();
if (requestVersionControlInfoDto == null) {
throw new IllegalArgumentException("Version Control Information must be supplied.");
}
if (versionControlInfoDto.getGroupId() == null) {
if (requestVersionControlInfoDto.getGroupId() == null) {
throw new IllegalArgumentException("The Process Group ID must be supplied.");
}
if (!versionControlInfoDto.getGroupId().equals(groupId)) {
if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) {
throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource.");
}
if (versionControlInfoDto.getBucketId() == null) {
if (requestVersionControlInfoDto.getBucketId() == null) {
throw new IllegalArgumentException("The Bucket ID must be supplied.");
}
if (versionControlInfoDto.getFlowId() == null) {
if (requestVersionControlInfoDto.getFlowId() == null) {
throw new IllegalArgumentException("The Flow ID must be supplied.");
}
if (versionControlInfoDto.getRegistryId() == null) {
if (requestVersionControlInfoDto.getRegistryId() == null) {
throw new IllegalArgumentException("The Registry ID must be supplied.");
}
if (versionControlInfoDto.getVersion() == null) {
if (requestVersionControlInfoDto.getVersion() == null) {
throw new IllegalArgumentException("The Version of the flow must be supplied.");
}
@ -1216,7 +1225,6 @@ public class VersionsResource extends ApplicationResource {
final boolean replicateRequest = isReplicateRequest();
final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle;
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
@ -1228,11 +1236,14 @@ public class VersionsResource extends ApplicationResource {
// Step 1: Determine which components will be affected by updating the version
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user);
final URI exampleUri = getAbsolutePath();
// build a request wrapper
final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents,
replicateRequest, flowSnapshot);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
requestEntity,
requestWrapper,
requestRevision,
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component.
@ -1251,7 +1262,10 @@ public class VersionsResource extends ApplicationResource {
// Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot);
},
(revision, processGroupEntity) -> {
(revision, wrapper) -> {
final VersionControlInformationEntity versionControlInformationEntity = wrapper.getVersionControlInformationEntity();
final VersionControlInformationDTO versionControlInformationDTO = versionControlInformationEntity.getVersionControlInformation();
// Ensure that the information passed in is correct
final VersionControlInformationEntity currentVersionEntity = serviceFacade.getVersionControlInformation(groupId);
if (currentVersionEntity == null) {
@ -1259,19 +1273,21 @@ public class VersionsResource extends ApplicationResource {
}
final VersionControlInformationDTO currentVersion = currentVersionEntity.getVersionControlInformation();
if (!currentVersion.getBucketId().equals(versionControlInfoDto.getBucketId())) {
if (!currentVersion.getBucketId().equals(versionControlInformationDTO.getBucketId())) {
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
}
if (!currentVersion.getFlowId().equals(versionControlInfoDto.getFlowId())) {
if (!currentVersion.getFlowId().equals(versionControlInformationDTO.getFlowId())) {
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
}
if (!currentVersion.getRegistryId().equals(versionControlInfoDto.getRegistryId())) {
if (!currentVersion.getRegistryId().equals(versionControlInformationDTO.getRegistryId())) {
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
}
if (!currentVersion.getVersion().equals(versionControlInfoDto.getVersion())) {
if (!currentVersion.getVersion().equals(versionControlInformationDTO.getVersion())) {
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
}
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
@ -1280,8 +1296,9 @@ public class VersionsResource extends ApplicationResource {
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true);
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(),
wrapper.getAffectedComponents(), user, wrapper.isReplicateRequest(), revision, versionControlInformationEntity, wrapper.getFlowSnapshot(), request,
idGenerationSeed, false, true);
vcur.markComplete(updatedVersionControlEntity);
} catch (final Exception e) {
@ -1303,8 +1320,9 @@ public class VersionsResource extends ApplicationResource {
updateRequestDto.setPercentCompleted(request.getPercentComplete());
updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId));
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision);
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision();
updateRequestEntity.setProcessGroupRevision(groupRevision);
updateRequestEntity.setRequest(updateRequestDto);
@ -1313,7 +1331,7 @@ public class VersionsResource extends ApplicationResource {
}
private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity,
final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException {
@ -1372,7 +1390,7 @@ public class VersionsResource extends ApplicationResource {
headers.put("content-type", MediaType.APPLICATION_JSON);
final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision());
snapshotEntity.setProcessGroupRevision(dtoFactory.createRevisionDTO(revision));
snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
snapshotEntity.setVersionedFlow(flowSnapshot);
snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows);
@ -1408,8 +1426,6 @@ public class VersionsResource extends ApplicationResource {
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified);
// Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId);
final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
final Bucket bucket = flowSnapshot.getBucket();
@ -1571,4 +1587,50 @@ public class VersionsResource extends ApplicationResource {
return updatePerformed;
}
}
private static class InitiateChangeFlowVersionRequestWrapper extends Entity {
final VersionControlInformationEntity versionControlInformationEntity;
final ComponentLifecycle componentLifecycle;
final URI exampleUri;
final Set<AffectedComponentEntity> affectedComponents;
final boolean replicateRequest;
final VersionedFlowSnapshot flowSnapshot;
public InitiateChangeFlowVersionRequestWrapper(final VersionControlInformationEntity versionControlInformationEntity, final ComponentLifecycle componentLifecycle,
final URI exampleUri, final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest,
final VersionedFlowSnapshot flowSnapshot) {
this.versionControlInformationEntity = versionControlInformationEntity;
this.componentLifecycle = componentLifecycle;
this.exampleUri = exampleUri;
this.affectedComponents = affectedComponents;
this.replicateRequest = replicateRequest;
this.flowSnapshot = flowSnapshot;
}
public VersionControlInformationEntity getVersionControlInformationEntity() {
return versionControlInformationEntity;
}
public ComponentLifecycle getComponentLifecycle() {
return componentLifecycle;
}
public URI getExampleUri() {
return exampleUri;
}
public Set<AffectedComponentEntity> getAffectedComponents() {
return affectedComponents;
}
public boolean isReplicateRequest() {
return replicateRequest;
}
public VersionedFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
}
}
}

View File

@ -214,8 +214,14 @@
});
}
// determine the max registry height
var windowHeight = $(window).height();
var registryOffset = $('#import-flow-version-registry-combo').offset();
var registryMaxHeight = windowHeight - registryOffset.top - 64;
// load the registries
registryCombo.combo({
maxHeight: registryMaxHeight,
options: registries,
select: function (selectedOption) {
selectRegistry(dialog, selectedOption, bucketCombo, flowCombo, selectBucket, bucketCheck)
@ -290,8 +296,14 @@
}
}
// determine the max bucket height
var windowHeight = $(window).height();
var bucketOffset = $('#import-flow-version-bucket-combo').offset();
var bucketMaxHeight = windowHeight - bucketOffset.top - 64;
// load the buckets
bucketCombo.combo('destroy').combo({
maxHeight: bucketMaxHeight,
options: buckets,
select: selectBucket
});
@ -890,8 +902,14 @@
});
}
// determine the max flow height
var windowHeight = $(window).height();
var flowOffset = $('#import-flow-version-name-combo').offset();
var flowMaxHeight = windowHeight - flowOffset.top - 64;
// load the buckets
$('#import-flow-version-name-combo').combo('destroy').combo({
maxHeight: flowMaxHeight,
options: versionedFlows,
select: function (selectedFlow) {
if (nfCommon.isDefinedAndNotNull(selectedFlow.value)) {
@ -1284,7 +1302,6 @@
if (nfCanvasUtils.getGroupId() === processGroupId) {
// if reverting/changing current PG... reload/refresh this group/canvas
// TODO consider implementing this differently
$.ajax({
type: 'GET',
url: '../nifi-api/flow/process-groups/' + encodeURIComponent(processGroupId),
@ -1295,6 +1312,14 @@
// update the component visibility
nfGraph.updateVisibility();
// update the breadcrumbs
var breadcrumbsCtrl = nfNgBridge.injector.get('breadcrumbsCtrl');
breadcrumbsCtrl.resetBreadcrumbs();
breadcrumbsCtrl.generateBreadcrumbs(response.processGroupFlow.breadcrumb);
// inform Angular app values have changed
nfNgBridge.digest();
}).fail(nfErrorHandler.handleAjaxError);
} else {
// if reverting selected PG... reload selected PG to update counts, etc