NIFI-8040: When changing version of a flow, stop processors that have a state of Starting in addition to those with a state of Running

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5718.
This commit is contained in:
Mark Payne 2022-01-26 10:03:55 -05:00 committed by Pierre Villard
parent 0eff249870
commit 3ea9faccc6
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 18 additions and 3 deletions

View File

@ -5056,7 +5056,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
if (localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE) {
final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceIdentifier();
final String serviceId = localComponent.getInstanceIdentifier();
final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
@ -5171,6 +5171,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
dto.setId(connectable.getIdentifier());
dto.setReferenceType(connectable.getConnectableType().name());
dto.setState(connectable.getScheduledState().name());
dto.setName(connectable.getName());
final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier();
dto.setProcessGroupId(groupId);

View File

@ -299,6 +299,20 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
return createUpdateRequestResponse(requestType, requestId, request, false);
}
private boolean isActive(final AffectedComponentDTO affectedComponentDto) {
final String state = affectedComponentDto.getState();
if ("Running".equalsIgnoreCase(state) || "Starting".equalsIgnoreCase(state)) {
return true;
}
final Integer threadCount = affectedComponentDto.getActiveThreadCount();
if (threadCount != null && threadCount > 0) {
return true;
}
return false;
}
/**
* Perform the specified flow update
*/
@ -318,8 +332,8 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
.filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
.filter(dto -> "Running".equalsIgnoreCase(dto.getComponent().getState()))
.filter(entity -> stoppableReferenceTypes.contains(entity.getComponent().getReferenceType()))
.filter(entity -> isActive(entity.getComponent()))
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());