NIFI-4733:

- Resolving logic issue in two phase commit when updating variable registry. This closes #2370
This commit is contained in:
Matt Gilman 2018-01-03 11:00:22 -05:00
parent f7f001eb9a
commit 7a8dbb8b15
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
1 changed files with 62 additions and 25 deletions

View File

@ -74,6 +74,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.FunnelsEntity; import org.apache.nifi.web.api.entity.FunnelsEntity;
@ -552,41 +553,43 @@ public class ProcessGroupResource extends ApplicationResource {
public Response updateVariableRegistry( public Response updateVariableRegistry(
@Context final HttpServletRequest httpServletRequest, @Context final HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
if (requestEntity == null || requestEntity.getVariableRegistry() == null) { if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
throw new IllegalArgumentException("Variable Registry details must be specified."); throw new IllegalArgumentException("Variable Registry details must be specified.");
} }
if (requestEntity.getProcessGroupRevision() == null) { if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
throw new IllegalArgumentException("Process Group Revision must be specified."); throw new IllegalArgumentException("Process Group Revision must be specified.");
} }
// ensure the same id is being used // ensure the same id is being used
final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry(); final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry();
if (!groupId.equals(registryDto.getProcessGroupId())) { if (!groupId.equals(requestRegistryDto.getProcessGroupId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId)); + "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId));
} }
if (isReplicateRequest()) { if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestEntity); return replicate(HttpMethod.PUT, requestVariableRegistryEntity);
} }
// handle expects request (usually from the cluster manager) // handle expects request (usually from the cluster manager)
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
return withWriteLock( return withWriteLock(
serviceFacade, serviceFacade,
requestEntity, requestVariableRegistryEntity,
requestRevision, requestRevision,
lookup -> { lookup -> {
Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
}, },
null, null,
(revision, processGroupEntity) -> { (revision, variableRegistryEntity) -> {
final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry();
// update the process group // update the process group
final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto); final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry);
return generateOkResponse(entity).build(); return generateOkResponse(entity).build();
}); });
} }
@ -597,7 +600,7 @@ public class ProcessGroupResource extends ApplicationResource {
* *
* @param httpServletRequest request * @param httpServletRequest request
* @param groupId The id of the process group. * @param groupId The id of the process group.
* @param requestEntity the Variable Registry Entity * @param requestVariableRegistryEntity the Variable Registry Entity
* @return A Variable Registry Entry. * @return A Variable Registry Entry.
*/ */
@POST @POST
@ -620,13 +623,13 @@ public class ProcessGroupResource extends ApplicationResource {
public Response submitUpdateVariableRegistryRequest( public Response submitUpdateVariableRegistryRequest(
@Context final HttpServletRequest httpServletRequest, @Context final HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
if (requestEntity == null || requestEntity.getVariableRegistry() == null) { if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
throw new IllegalArgumentException("Variable Registry details must be specified."); throw new IllegalArgumentException("Variable Registry details must be specified.");
} }
if (requestEntity.getProcessGroupRevision() == null) { if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
throw new IllegalArgumentException("Process Group Revision must be specified."); throw new IllegalArgumentException("Process Group Revision must be specified.");
} }
@ -641,14 +644,14 @@ public class ProcessGroupResource extends ApplicationResource {
// 6. Re-Enable all previously Active Processors that Depended on the Controller Services // 6. Re-Enable all previously Active Processors that Depended on the Controller Services
// Determine the affected components (and their associated revisions) // Determine the affected components (and their associated revisions)
final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry());
final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry();
if (computedRegistryDto == null) { if (computedRegistryDto == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
} }
final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream() final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream()
.collect(Collectors.groupingBy(comp -> comp.getReferenceType())); .collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
@ -698,7 +701,7 @@ public class ProcessGroupResource extends ApplicationResource {
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user)); final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
SecurityContextHolder.getContext().setAuthentication(authentication); SecurityContextHolder.getContext().setAuthentication(authentication);
updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity); updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to update variable registry", e); logger.error("Failed to update variable registry", e);
@ -721,15 +724,19 @@ public class ProcessGroupResource extends ApplicationResource {
return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
} }
final UpdateVariableRegistryRequestWrapper requestWrapper =
new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
return withWriteLock( return withWriteLock(
serviceFacade, serviceFacade,
requestEntity, requestWrapper,
requestRevision, requestRevision,
authorizeAccess, authorizeAccess,
null, null,
(revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity) (revision, wrapper) ->
updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(),
wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity())
); );
} }
@ -1088,7 +1095,7 @@ public class ProcessGroupResource extends ApplicationResource {
} }
private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors, private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors,
final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) { final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) {
final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream() final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
.map(component -> component.getId()) .map(component -> component.getId())
@ -1105,8 +1112,6 @@ public class ProcessGroupResource extends ApplicationResource {
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
final Pause pause = createPause(updateRequest); final Pause pause = createPause(updateRequest);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
final Runnable updateTask = new Runnable() { final Runnable updateTask = new Runnable() {
@Override @Override
public void run() { public void run() {
@ -3474,6 +3479,38 @@ public class ProcessGroupResource extends ApplicationResource {
); );
} }
private static class UpdateVariableRegistryRequestWrapper extends Entity {
private final Set<AffectedComponentEntity> allAffectedComponents;
private final List<AffectedComponentDTO> activeAffectedProcessors;
private final List<AffectedComponentDTO> activeAffectedServices;
private final VariableRegistryEntity variableRegistryEntity;
public UpdateVariableRegistryRequestWrapper(final Set<AffectedComponentEntity> allAffectedComponents, final List<AffectedComponentDTO> activeAffectedProcessors,
final List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity variableRegistryEntity) {
this.allAffectedComponents = allAffectedComponents;
this.activeAffectedProcessors = activeAffectedProcessors;
this.activeAffectedServices = activeAffectedServices;
this.variableRegistryEntity = variableRegistryEntity;
}
public Set<AffectedComponentEntity> getAllAffectedComponents() {
return allAffectedComponents;
}
public List<AffectedComponentDTO> getActiveAffectedProcessors() {
return activeAffectedProcessors;
}
public List<AffectedComponentDTO> getActiveAffectedServices() {
return activeAffectedServices;
}
public VariableRegistryEntity getVariableRegistryEntity() {
return variableRegistryEntity;
}
}
// setters // setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) { public void setServiceFacade(NiFiServiceFacade serviceFacade) {