From b3e1584ef4f5b7de8cf8517a8b950125f82832cb Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Wed, 3 Jan 2018 17:16:57 -0500 Subject: [PATCH] NIFI-4436: - Addressing PR feedback. - Addressing two phase commit logic issue when changing the flow version. --- .../apache/nifi/web/api/VersionsResource.java | 154 ++++++++++++------ .../webapp/js/nf/canvas/nf-flow-version.js | 27 ++- 2 files changed, 134 insertions(+), 47 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index af9a515928..53dc091c5b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -58,6 +58,7 @@ import org.apache.nifi.web.api.dto.VersionedFlowDTO; import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.CreateActiveRequestEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; @@ -724,19 +725,19 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("Process Group Revision must be specified."); } - final VersionedFlowSnapshot flowSnapshot = requestEntity.getVersionedFlowSnapshot(); - if (flowSnapshot == null) { + final VersionedFlowSnapshot requestFlowSnapshot = requestEntity.getVersionedFlowSnapshot(); + if (requestFlowSnapshot == null) { throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied."); } - final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); - if (snapshotMetadata == null) { + final VersionedFlowSnapshotMetadata requestSnapshotMetadata = requestFlowSnapshot.getSnapshotMetadata(); + if (requestSnapshotMetadata == null) { throw new IllegalArgumentException("Snapshot Metadata must be supplied."); } - if (snapshotMetadata.getBucketIdentifier() == null) { + if (requestSnapshotMetadata.getBucketIdentifier() == null) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (snapshotMetadata.getFlowIdentifier() == null) { + if (requestSnapshotMetadata.getFlowIdentifier() == null) { throw new IllegalArgumentException("The Flow ID must be supplied."); } @@ -761,9 +762,12 @@ public class VersionsResource extends ApplicationResource { // the client has explicitly indicated the dataflow that the Process Group should // provide and provided the Revision to ensure that they have the most up-to-date // view of the Process Group. - serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false); + serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false); }, (rev, entity) -> { + final VersionedFlowSnapshot flowSnapshot = entity.getVersionedFlowSnapshot(); + final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + final Bucket bucket = flowSnapshot.getBucket(); final VersionedFlow flow = flowSnapshot.getFlow(); @@ -1000,7 +1004,7 @@ public class VersionsResource extends ApplicationResource { }) public Response initiateVersionControlUpdate( @ApiParam("The process group id.") @PathParam("id") final String groupId, - @ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) throws IOException { + @ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) { // Verify the request final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); @@ -1008,26 +1012,26 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("Process Group Revision must be specified"); } - final VersionControlInformationDTO versionControlInfoDto = requestEntity.getVersionControlInformation(); - if (versionControlInfoDto == null) { + final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation(); + if (requestVersionControlInfoDto == null) { throw new IllegalArgumentException("Version Control Information must be supplied."); } - if (versionControlInfoDto.getGroupId() == null) { + if (requestVersionControlInfoDto.getGroupId() == null) { throw new IllegalArgumentException("The Process Group ID must be supplied."); } - if (!versionControlInfoDto.getGroupId().equals(groupId)) { + if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) { throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource."); } - if (versionControlInfoDto.getBucketId() == null) { + if (requestVersionControlInfoDto.getBucketId() == null) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (versionControlInfoDto.getFlowId() == null) { + if (requestVersionControlInfoDto.getFlowId() == null) { throw new IllegalArgumentException("The Flow ID must be supplied."); } - if (versionControlInfoDto.getRegistryId() == null) { + if (requestVersionControlInfoDto.getRegistryId() == null) { throw new IllegalArgumentException("The Registry ID must be supplied."); } - if (versionControlInfoDto.getVersion() == null) { + if (requestVersionControlInfoDto.getVersion() == null) { throw new IllegalArgumentException("The Version of the flow must be supplied."); } @@ -1037,7 +1041,6 @@ public class VersionsResource extends ApplicationResource { final boolean replicateRequest = isReplicateRequest(); final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle; final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Workflow for this process: @@ -1082,11 +1085,14 @@ public class VersionsResource extends ApplicationResource { // Step 1: Determine which components will be affected by updating the version final Set affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user); - final URI exampleUri = getAbsolutePath(); + // build a request wrapper + final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents, + replicateRequest, flowSnapshot); + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestWrapper, requestRevision, lookup -> { // Step 2: Verify READ and WRITE permissions for user, for every component. @@ -1106,7 +1112,9 @@ public class VersionsResource extends ApplicationResource { // Step 5: Verify that Process Group is not 'dirty' serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, true); }, - (revision, processGroupEntity) -> { + (revision, wrapper) -> { + final String idGenerationSeed = getIdGenerationSeed().orElse(null); + // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); @@ -1115,8 +1123,9 @@ public class VersionsResource extends ApplicationResource { // Submit the request to be performed in the background final Consumer> updateTask = vcur -> { try { - final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true); + final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(), + wrapper.getAffectedComponents(), user, wrapper.isReplicateRequest(), revision, wrapper.getVersionControlInformationEntity(), wrapper.getFlowSnapshot(), request, + idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); } catch (final Exception e) { @@ -1139,7 +1148,7 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setState(request.getState()); final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); - final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision); + final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision(); updateRequestEntity.setProcessGroupRevision(groupRevision); updateRequestEntity.setRequest(updateRequestDto); @@ -1187,26 +1196,26 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("Process Group Revision must be specified"); } - final VersionControlInformationDTO versionControlInfoDto = requestEntity.getVersionControlInformation(); - if (versionControlInfoDto == null) { + final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation(); + if (requestVersionControlInfoDto == null) { throw new IllegalArgumentException("Version Control Information must be supplied."); } - if (versionControlInfoDto.getGroupId() == null) { + if (requestVersionControlInfoDto.getGroupId() == null) { throw new IllegalArgumentException("The Process Group ID must be supplied."); } - if (!versionControlInfoDto.getGroupId().equals(groupId)) { + if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) { throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource."); } - if (versionControlInfoDto.getBucketId() == null) { + if (requestVersionControlInfoDto.getBucketId() == null) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (versionControlInfoDto.getFlowId() == null) { + if (requestVersionControlInfoDto.getFlowId() == null) { throw new IllegalArgumentException("The Flow ID must be supplied."); } - if (versionControlInfoDto.getRegistryId() == null) { + if (requestVersionControlInfoDto.getRegistryId() == null) { throw new IllegalArgumentException("The Registry ID must be supplied."); } - if (versionControlInfoDto.getVersion() == null) { + if (requestVersionControlInfoDto.getVersion() == null) { throw new IllegalArgumentException("The Version of the flow must be supplied."); } @@ -1216,7 +1225,6 @@ public class VersionsResource extends ApplicationResource { final boolean replicateRequest = isReplicateRequest(); final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle; final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Step 0: Get the Versioned Flow Snapshot from the Flow Registry final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true); @@ -1228,11 +1236,14 @@ public class VersionsResource extends ApplicationResource { // Step 1: Determine which components will be affected by updating the version final Set affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user); - final URI exampleUri = getAbsolutePath(); + // build a request wrapper + final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents, + replicateRequest, flowSnapshot); + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestWrapper, requestRevision, lookup -> { // Step 2: Verify READ and WRITE permissions for user, for every component. @@ -1251,7 +1262,10 @@ public class VersionsResource extends ApplicationResource { // Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot); }, - (revision, processGroupEntity) -> { + (revision, wrapper) -> { + final VersionControlInformationEntity versionControlInformationEntity = wrapper.getVersionControlInformationEntity(); + final VersionControlInformationDTO versionControlInformationDTO = versionControlInformationEntity.getVersionControlInformation(); + // Ensure that the information passed in is correct final VersionControlInformationEntity currentVersionEntity = serviceFacade.getVersionControlInformation(groupId); if (currentVersionEntity == null) { @@ -1259,19 +1273,21 @@ public class VersionsResource extends ApplicationResource { } final VersionControlInformationDTO currentVersion = currentVersionEntity.getVersionControlInformation(); - if (!currentVersion.getBucketId().equals(versionControlInfoDto.getBucketId())) { + if (!currentVersion.getBucketId().equals(versionControlInformationDTO.getBucketId())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - if (!currentVersion.getFlowId().equals(versionControlInfoDto.getFlowId())) { + if (!currentVersion.getFlowId().equals(versionControlInformationDTO.getFlowId())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - if (!currentVersion.getRegistryId().equals(versionControlInfoDto.getRegistryId())) { + if (!currentVersion.getRegistryId().equals(versionControlInformationDTO.getRegistryId())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - if (!currentVersion.getVersion().equals(versionControlInfoDto.getVersion())) { + if (!currentVersion.getVersion().equals(versionControlInformationDTO.getVersion())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } + final String idGenerationSeed = getIdGenerationSeed().orElse(null); + // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); @@ -1280,8 +1296,9 @@ public class VersionsResource extends ApplicationResource { // Submit the request to be performed in the background final Consumer> updateTask = vcur -> { try { - final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true); + final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(), + wrapper.getAffectedComponents(), user, wrapper.isReplicateRequest(), revision, versionControlInformationEntity, wrapper.getFlowSnapshot(), request, + idGenerationSeed, false, true); vcur.markComplete(updatedVersionControlEntity); } catch (final Exception e) { @@ -1303,8 +1320,9 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setPercentCompleted(request.getPercentComplete()); updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId)); + final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); - final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision); + final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision(); updateRequestEntity.setProcessGroupRevision(groupRevision); updateRequestEntity.setRequest(updateRequestDto); @@ -1313,7 +1331,7 @@ public class VersionsResource extends ApplicationResource { } private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri, - final Set affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity, + final Set affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity, final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest asyncRequest, final String idGenerationSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException { @@ -1372,7 +1390,7 @@ public class VersionsResource extends ApplicationResource { headers.put("content-type", MediaType.APPLICATION_JSON); final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity(); - snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision()); + snapshotEntity.setProcessGroupRevision(dtoFactory.createRevisionDTO(revision)); snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId()); snapshotEntity.setVersionedFlow(flowSnapshot); snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows); @@ -1408,8 +1426,6 @@ public class VersionsResource extends ApplicationResource { serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified); // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed - final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); - final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); final Bucket bucket = flowSnapshot.getBucket(); @@ -1571,4 +1587,50 @@ public class VersionsResource extends ApplicationResource { return updatePerformed; } } + + + private static class InitiateChangeFlowVersionRequestWrapper extends Entity { + final VersionControlInformationEntity versionControlInformationEntity; + final ComponentLifecycle componentLifecycle; + final URI exampleUri; + final Set affectedComponents; + final boolean replicateRequest; + final VersionedFlowSnapshot flowSnapshot; + + public InitiateChangeFlowVersionRequestWrapper(final VersionControlInformationEntity versionControlInformationEntity, final ComponentLifecycle componentLifecycle, + final URI exampleUri, final Set affectedComponents, final boolean replicateRequest, + final VersionedFlowSnapshot flowSnapshot) { + + this.versionControlInformationEntity = versionControlInformationEntity; + this.componentLifecycle = componentLifecycle; + this.exampleUri = exampleUri; + this.affectedComponents = affectedComponents; + this.replicateRequest = replicateRequest; + this.flowSnapshot = flowSnapshot; + } + + public VersionControlInformationEntity getVersionControlInformationEntity() { + return versionControlInformationEntity; + } + + public ComponentLifecycle getComponentLifecycle() { + return componentLifecycle; + } + + public URI getExampleUri() { + return exampleUri; + } + + public Set getAffectedComponents() { + return affectedComponents; + } + + public boolean isReplicateRequest() { + return replicateRequest; + } + + public VersionedFlowSnapshot getFlowSnapshot() { + return flowSnapshot; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js index b676f5b9ce..96c467ab20 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js @@ -214,8 +214,14 @@ }); } + // determine the max registry height + var windowHeight = $(window).height(); + var registryOffset = $('#import-flow-version-registry-combo').offset(); + var registryMaxHeight = windowHeight - registryOffset.top - 64; + // load the registries registryCombo.combo({ + maxHeight: registryMaxHeight, options: registries, select: function (selectedOption) { selectRegistry(dialog, selectedOption, bucketCombo, flowCombo, selectBucket, bucketCheck) @@ -290,8 +296,14 @@ } } + // determine the max bucket height + var windowHeight = $(window).height(); + var bucketOffset = $('#import-flow-version-bucket-combo').offset(); + var bucketMaxHeight = windowHeight - bucketOffset.top - 64; + // load the buckets bucketCombo.combo('destroy').combo({ + maxHeight: bucketMaxHeight, options: buckets, select: selectBucket }); @@ -890,8 +902,14 @@ }); } + // determine the max flow height + var windowHeight = $(window).height(); + var flowOffset = $('#import-flow-version-name-combo').offset(); + var flowMaxHeight = windowHeight - flowOffset.top - 64; + // load the buckets $('#import-flow-version-name-combo').combo('destroy').combo({ + maxHeight: flowMaxHeight, options: versionedFlows, select: function (selectedFlow) { if (nfCommon.isDefinedAndNotNull(selectedFlow.value)) { @@ -1284,7 +1302,6 @@ if (nfCanvasUtils.getGroupId() === processGroupId) { // if reverting/changing current PG... reload/refresh this group/canvas - // TODO consider implementing this differently $.ajax({ type: 'GET', url: '../nifi-api/flow/process-groups/' + encodeURIComponent(processGroupId), @@ -1295,6 +1312,14 @@ // update the component visibility nfGraph.updateVisibility(); + + // update the breadcrumbs + var breadcrumbsCtrl = nfNgBridge.injector.get('breadcrumbsCtrl'); + breadcrumbsCtrl.resetBreadcrumbs(); + breadcrumbsCtrl.generateBreadcrumbs(response.processGroupFlow.breadcrumb); + + // inform Angular app values have changed + nfNgBridge.digest(); }).fail(nfErrorHandler.handleAjaxError); } else { // if reverting selected PG... reload selected PG to update counts, etc