diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 7ff78857f9..b866677b22 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -74,6 +74,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.FunnelsEntity; @@ -552,41 +553,43 @@ public class ProcessGroupResource extends ApplicationResource { public Response updateVariableRegistry( @Context final HttpServletRequest httpServletRequest, @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, - @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) { - if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) { throw new IllegalArgumentException("Variable Registry details must be specified."); } - if (requestEntity.getProcessGroupRevision() == null) { + if (requestVariableRegistryEntity.getProcessGroupRevision() == null) { throw new IllegalArgumentException("Process Group Revision must be specified."); } // ensure the same id is being used - final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry(); - if (!groupId.equals(registryDto.getProcessGroupId())) { + final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry(); + if (!groupId.equals(requestRegistryDto.getProcessGroupId())) { throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " - + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId)); + + "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, requestEntity); + return replicate(HttpMethod.PUT, requestVariableRegistryEntity); } // handle expects request (usually from the cluster manager) - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestVariableRegistryEntity, requestRevision, lookup -> { Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - (revision, processGroupEntity) -> { + (revision, variableRegistryEntity) -> { + final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry(); + // update the process group - final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto); + final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry); return generateOkResponse(entity).build(); }); } @@ -597,7 +600,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The id of the process group. - * @param requestEntity the Variable Registry Entity + * @param requestVariableRegistryEntity the Variable Registry Entity * @return A Variable Registry Entry. */ @POST @@ -620,13 +623,13 @@ public class ProcessGroupResource extends ApplicationResource { public Response submitUpdateVariableRegistryRequest( @Context final HttpServletRequest httpServletRequest, @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, - @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) { - if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) { throw new IllegalArgumentException("Variable Registry details must be specified."); } - if (requestEntity.getProcessGroupRevision() == null) { + if (requestVariableRegistryEntity.getProcessGroupRevision() == null) { throw new IllegalArgumentException("Process Group Revision must be specified."); } @@ -641,14 +644,14 @@ public class ProcessGroupResource extends ApplicationResource { // 6. Re-Enable all previously Active Processors that Depended on the Controller Services // Determine the affected components (and their associated revisions) - final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); + final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry()); final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); if (computedRegistryDto == null) { throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); } - final Set allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); - final Set activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + final Set allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry()); + final Set activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry()); final Map> activeAffectedComponentsByType = activeAffectedComponents.stream() .collect(Collectors.groupingBy(comp -> comp.getReferenceType())); @@ -698,7 +701,7 @@ public class ProcessGroupResource extends ApplicationResource { final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user)); SecurityContextHolder.getContext().setAuthentication(authentication); - updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity); + updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity); } catch (final Exception e) { logger.error("Failed to update variable registry", e); @@ -721,15 +724,19 @@ public class ProcessGroupResource extends ApplicationResource { return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); } + final UpdateVariableRegistryRequestWrapper requestWrapper = + new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity); - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestWrapper, requestRevision, authorizeAccess, null, - (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity) + (revision, wrapper) -> + updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(), + wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity()) ); } @@ -1088,7 +1095,7 @@ public class ProcessGroupResource extends ApplicationResource { } private Response updateVariableRegistryLocal(final String groupId, final Set affectedComponents, final List affectedProcessors, - final List affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) { + final List affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) { final Set affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream() .map(component -> component.getId()) @@ -1105,8 +1112,6 @@ public class ProcessGroupResource extends ApplicationResource { updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); final Pause pause = createPause(updateRequest); - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); - final Runnable updateTask = new Runnable() { @Override public void run() { @@ -3474,6 +3479,38 @@ public class ProcessGroupResource extends ApplicationResource { ); } + private static class UpdateVariableRegistryRequestWrapper extends Entity { + private final Set allAffectedComponents; + private final List activeAffectedProcessors; + private final List activeAffectedServices; + private final VariableRegistryEntity variableRegistryEntity; + + public UpdateVariableRegistryRequestWrapper(final Set allAffectedComponents, final List activeAffectedProcessors, + final List activeAffectedServices, VariableRegistryEntity variableRegistryEntity) { + + this.allAffectedComponents = allAffectedComponents; + this.activeAffectedProcessors = activeAffectedProcessors; + this.activeAffectedServices = activeAffectedServices; + this.variableRegistryEntity = variableRegistryEntity; + } + + public Set getAllAffectedComponents() { + return allAffectedComponents; + } + + public List getActiveAffectedProcessors() { + return activeAffectedProcessors; + } + + public List getActiveAffectedServices() { + return activeAffectedServices; + } + + public VariableRegistryEntity getVariableRegistryEntity() { + return variableRegistryEntity; + } + } + // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) {