mirror of https://github.com/apache/nifi.git
NIFI-8939: Ensure that when async/long-running flow updates are made,… (#5240)
* NIFI-8939: Ensure that when async/long-running flow updates are made, referencing controller services that are disabling are waited on but not attempted to be disabled * NIFI-8939: Ensure that when waiting for Controller Services to reach desired state, we use correct URI for fetch service state. There was a typo that resulted in not getting all controller services' states. This closes #5240
This commit is contained in:
parent
1d61be776b
commit
80456d681e
|
@ -330,16 +330,27 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
|
|||
}
|
||||
asyncRequest.markStepComplete();
|
||||
|
||||
// Steps 7-8. Disable enabled controller services that are affected
|
||||
final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream()
|
||||
// Steps 7-8. Disable enabled controller services that are affected.
|
||||
// We don't want to disable services that are already disabling. But we need to wait for their state to transition from Disabling to Disabled.
|
||||
final Set<AffectedComponentEntity> servicesToWaitFor = affectedComponents.stream()
|
||||
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
|
||||
.filter(dto -> "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
|
||||
.filter(dto -> {
|
||||
final String state = dto.getComponent().getState();
|
||||
return "Enabled".equalsIgnoreCase(state) || "Enabling".equalsIgnoreCase(state) || "Disabling".equalsIgnoreCase(state);
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
final Set<AffectedComponentEntity> enabledServices = servicesToWaitFor.stream()
|
||||
.filter(dto -> {
|
||||
final String state = dto.getComponent().getState();
|
||||
return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state);
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
logger.info("Disabling {} Controller Services", enabledServices.size());
|
||||
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
asyncRequest.setCancelCallback(disableServicesPause::cancel);
|
||||
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
|
||||
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, servicesToWaitFor, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
|
||||
|
||||
if (asyncRequest.isCancelled()) {
|
||||
return;
|
||||
|
@ -413,7 +424,8 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
|
|||
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
|
||||
|
||||
try {
|
||||
componentLifecycle.activateControllerServices(requestUri, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
|
||||
componentLifecycle.activateControllerServices(requestUri, groupId, servicesToEnable, servicesToEnable,
|
||||
ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
|
||||
} catch (final IllegalStateException ise) {
|
||||
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
||||
|
|
|
@ -851,10 +851,13 @@ public class ParameterContextResource extends ApplicationResource {
|
|||
.filter(component -> "Running".equalsIgnoreCase(component.getComponent().getState()))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
final Set<AffectedComponentEntity> enabledControllerServices = affectedComponents.stream()
|
||||
final Set<AffectedComponentEntity> servicesRequiringDisabledState = affectedComponents.stream()
|
||||
.filter(entity -> entity.getComponent() != null)
|
||||
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
|
||||
.filter(dto -> "Enabling".equalsIgnoreCase(dto.getComponent().getState()) || "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
|
||||
.filter(dto -> {
|
||||
final String state = dto.getComponent().getState();
|
||||
return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state) || "Disabling".equalsIgnoreCase(state);
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
stopProcessors(runningProcessors, asyncRequest, componentLifecycle, uri);
|
||||
|
@ -862,7 +865,16 @@ public class ParameterContextResource extends ApplicationResource {
|
|||
return null;
|
||||
}
|
||||
|
||||
disableControllerServices(enabledControllerServices, asyncRequest, componentLifecycle, uri);
|
||||
// We want to disable only those Controller Services that are currently enabled or enabling, but we need to wait for
|
||||
// services that are currently Disabling to become disabled before we are able to consider this step complete.
|
||||
final Set<AffectedComponentEntity> enabledControllerServices = servicesRequiringDisabledState.stream()
|
||||
.filter(dto -> {
|
||||
final String state = dto.getComponent().getState();
|
||||
return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state);
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
disableControllerServices(enabledControllerServices, servicesRequiringDisabledState, asyncRequest, componentLifecycle, uri);
|
||||
if (asyncRequest.isCancelled()) {
|
||||
return null;
|
||||
}
|
||||
|
@ -878,7 +890,7 @@ public class ParameterContextResource extends ApplicationResource {
|
|||
} finally {
|
||||
// TODO: can almost certainly be refactored so that the same code is shared between VersionsResource and ParameterContextResource.
|
||||
if (!asyncRequest.isCancelled()) {
|
||||
enableControllerServices(enabledControllerServices, asyncRequest, componentLifecycle, uri);
|
||||
enableControllerServices(enabledControllerServices, enabledControllerServices, asyncRequest, componentLifecycle, uri);
|
||||
}
|
||||
|
||||
if (!asyncRequest.isCancelled()) {
|
||||
|
@ -993,17 +1005,19 @@ public class ParameterContextResource extends ApplicationResource {
|
|||
}
|
||||
}
|
||||
|
||||
private void disableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,
|
||||
final URI uri) throws LifecycleManagementException {
|
||||
private void disableControllerServices(final Set<AffectedComponentEntity> enabledControllerServices, final Set<AffectedComponentEntity> controllerServicesRequiringDisabledState,
|
||||
final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri) throws LifecycleManagementException {
|
||||
|
||||
asyncRequest.markStepComplete();
|
||||
logger.info("Disabling {} Controller Services in order to update Parameter Context", controllerServices.size());
|
||||
logger.info("Disabling {} Controller Services in order to update Parameter Context", enabledControllerServices.size());
|
||||
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
asyncRequest.setCancelCallback(disableServicesPause::cancel);
|
||||
componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT);
|
||||
componentLifecycle.activateControllerServices(uri, "root", enabledControllerServices, controllerServicesRequiringDisabledState, ControllerServiceState.DISABLED, disableServicesPause,
|
||||
InvalidComponentAction.WAIT);
|
||||
}
|
||||
|
||||
private void enableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,
|
||||
private void enableControllerServices(final Set<AffectedComponentEntity> controllerServices, final Set<AffectedComponentEntity> controllerServicesRequiringDisabledState,
|
||||
final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,
|
||||
final URI uri) throws LifecycleManagementException, ResumeFlowException {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Re-Enabling {} Controller Services: {}", controllerServices.size(), controllerServices);
|
||||
|
@ -1017,7 +1031,8 @@ public class ParameterContextResource extends ApplicationResource {
|
|||
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(controllerServices);
|
||||
|
||||
try {
|
||||
componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
|
||||
componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, controllerServicesRequiringDisabledState,
|
||||
ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
|
||||
asyncRequest.markStepComplete();
|
||||
} catch (final IllegalStateException ise) {
|
||||
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||
|
|
|
@ -166,9 +166,13 @@ public class StandardParameterContextDAO implements ParameterContextDAO {
|
|||
verifyParameterUpdate(parameterDto, processor, currentContext.getName(), verifyComponentStates, processor.isRunning(), "Processor that is running");
|
||||
}
|
||||
|
||||
for (final ControllerServiceNode serviceNode : referenceManager.getControllerServicesReferencing(currentContext, parameterName)) {
|
||||
verifyParameterUpdate(parameterDto, serviceNode, currentContext.getName(), verifyComponentStates,
|
||||
serviceNode.getState() != ControllerServiceState.DISABLED, "Controller Service that is enabled");
|
||||
final Set<ControllerServiceNode> referencingServices = referenceManager.getControllerServicesReferencing(currentContext, parameterName);
|
||||
for (final ControllerServiceNode serviceNode : referencingServices) {
|
||||
final ControllerServiceState serviceState = serviceNode.getState();
|
||||
final boolean serviceActive = serviceState != ControllerServiceState.DISABLED;
|
||||
|
||||
verifyParameterUpdate(parameterDto, serviceNode, currentContext.getName(), verifyComponentStates, serviceActive,
|
||||
"Controller Service [id=" + serviceNode.getIdentifier() + "] with a state of " + serviceState + " (state expected to be DISABLED)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -402,15 +402,20 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
|
||||
@Override
|
||||
public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final String groupId, final Set<AffectedComponentEntity> affectedServices,
|
||||
final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
final Set<AffectedComponentEntity> servicesRequiringDesiredState, final ControllerServiceState desiredState,
|
||||
final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
final Set<String> affectedServiceIds = affectedServices.stream()
|
||||
.map(ComponentEntity::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
final Set<String> idsOfServicesRequiringDesiredState = servicesRequiringDesiredState.stream()
|
||||
.map(ComponentEntity::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
|
||||
final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
|
||||
Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
|
||||
final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
|
||||
|
||||
final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity();
|
||||
activateServicesEntity.setComponents(serviceRevisionDtoMap);
|
||||
|
@ -431,7 +436,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
// If enabling services, validation must complete first
|
||||
if (desiredState == ControllerServiceState.ENABLED) {
|
||||
if (desiredState == ControllerServiceState.ENABLED && !affectedServiceIds.isEmpty()) {
|
||||
try {
|
||||
waitForControllerServiceValidation(user, originalUri, groupId, affectedServiceIds, pause);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -442,6 +447,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
|
||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
||||
try {
|
||||
if (!affectedServiceIds.isEmpty()) {
|
||||
final NodeResponse clusterResponse;
|
||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
|
||||
|
@ -455,8 +461,9 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
final String explanation = getResponseEntity(clusterResponse, String.class);
|
||||
throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
|
||||
}
|
||||
}
|
||||
|
||||
final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, affectedServiceIds, desiredState, pause, invalidComponentAction);
|
||||
final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, idsOfServicesRequiringDesiredState, desiredState, pause, invalidComponentAction);
|
||||
|
||||
if (!serviceTransitioned) {
|
||||
throw new LifecycleManagementException("Failed while waiting for Controller Services to finish transitioning to a state of " + desiredState);
|
||||
|
@ -551,7 +558,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
URI groupUri;
|
||||
try {
|
||||
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment());
|
||||
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -47,7 +47,8 @@ public interface ComponentLifecycle {
|
|||
*
|
||||
* @param exampleUri an URI to use as a base for the REST API
|
||||
* @param groupId the ID of the process group
|
||||
* @param services the controller services to enable or disable
|
||||
* @param servicesToUpdate the controller services to enable or disable
|
||||
* @param servicesRequiringDesiredState the controller services whose state must be transitioned to the desired state before returning
|
||||
* @param desiredState the desired state of the components
|
||||
* @param pause a pause that can be used to determine how long to wait between polling for task completion and that can also be used to cancel the operation
|
||||
* @param invalidComponentAction when waiting for a component to reach the specified desired state, indicates how the deal with a component that is invalid
|
||||
|
@ -56,6 +57,6 @@ public interface ComponentLifecycle {
|
|||
*
|
||||
* @throws IllegalStateException if any of the components given do not have a state that can be transitioned to the given desired state
|
||||
*/
|
||||
Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, String groupId, Set<AffectedComponentEntity> services,
|
||||
Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, String groupId, Set<AffectedComponentEntity> servicesToUpdate, Set<AffectedComponentEntity> servicesRequiringDesiredState,
|
||||
ControllerServiceState desiredState, Pause pause, InvalidComponentAction invalidComponentAction) throws LifecycleManagementException;
|
||||
}
|
||||
|
|
|
@ -74,22 +74,26 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> services,
|
||||
final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> servicesToUpdate,
|
||||
final Set<AffectedComponentEntity> servicesRequiringDesiredState, final ControllerServiceState desiredState,
|
||||
final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
final Map<String, Revision> serviceRevisions = services.stream()
|
||||
final Map<String, Revision> serviceRevisions = servicesToUpdate.stream()
|
||||
.collect(Collectors.toMap(AffectedComponentEntity::getId, entity -> revisionManager.getRevision(entity.getId())));
|
||||
|
||||
final Map<String, AffectedComponentEntity> affectedServiceMap = services.stream()
|
||||
final Map<String, AffectedComponentEntity> affectedServiceMap = servicesToUpdate.stream()
|
||||
.collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity()));
|
||||
|
||||
final Map<String, AffectedComponentEntity> servicesToWaitFor = servicesRequiringDesiredState.stream()
|
||||
.collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity()));
|
||||
|
||||
if (desiredState == ControllerServiceState.ENABLED) {
|
||||
enableControllerServices(groupId, serviceRevisions, affectedServiceMap, pause, invalidComponentAction);
|
||||
enableControllerServices(groupId, serviceRevisions, affectedServiceMap, servicesToWaitFor, pause, invalidComponentAction);
|
||||
} else {
|
||||
disableControllerServices(groupId, serviceRevisions, affectedServiceMap, pause, invalidComponentAction);
|
||||
disableControllerServices(groupId, serviceRevisions, affectedServiceMap, servicesToWaitFor, pause, invalidComponentAction);
|
||||
}
|
||||
|
||||
return services.stream()
|
||||
return servicesRequiringDesiredState.stream()
|
||||
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId()))
|
||||
.map(dtoFactory::createAffectedComponentEntity)
|
||||
.collect(Collectors.toSet());
|
||||
|
@ -278,7 +282,8 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
|||
return true;
|
||||
}
|
||||
|
||||
private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause,
|
||||
private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices,
|
||||
final Map<String, AffectedComponentEntity> servicesRequiringDesiredState, final Pause pause,
|
||||
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
if (serviceRevisions.isEmpty()) {
|
||||
|
@ -291,21 +296,29 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
|||
|
||||
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet());
|
||||
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
|
||||
waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, invalidComponentAction);
|
||||
waitForControllerServiceState(processGroupId, servicesRequiringDesiredState, ControllerServiceState.ENABLED, pause, invalidComponentAction);
|
||||
}
|
||||
|
||||
private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause,
|
||||
private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices,
|
||||
final Map<String, AffectedComponentEntity> servicesToWaitFor, final Pause pause,
|
||||
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
if (serviceRevisions.isEmpty()) {
|
||||
if (serviceRevisions.isEmpty() && servicesToWaitFor.isEmpty()) {
|
||||
logger.debug("No Controller Services to update or wait for state to become DISABLED");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug("Disabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId);
|
||||
|
||||
if (!affectedServices.isEmpty()) {
|
||||
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, affectedServices.keySet());
|
||||
}
|
||||
|
||||
if (!serviceRevisions.isEmpty()) {
|
||||
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
|
||||
waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, invalidComponentAction);
|
||||
}
|
||||
|
||||
waitForControllerServiceState(processGroupId, servicesToWaitFor, ControllerServiceState.DISABLED, pause, invalidComponentAction);
|
||||
}
|
||||
|
||||
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
||||
|
|
|
@ -465,7 +465,7 @@ public class NiFiClientUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public ActivateControllerServicesEntity disableControllerServices(final String groupId) throws NiFiClientException, IOException {
|
||||
public ActivateControllerServicesEntity disableControllerServices(final String groupId, final boolean recurse) throws NiFiClientException, IOException {
|
||||
final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
|
||||
activateControllerServicesEntity.setId(groupId);
|
||||
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED);
|
||||
|
@ -473,6 +473,15 @@ public class NiFiClientUtil {
|
|||
final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
|
||||
waitForControllerSerivcesDisabled(groupId);
|
||||
|
||||
if (recurse) {
|
||||
final ProcessGroupFlowEntity groupEntity = nifiClient.getFlowClient().getProcessGroup(groupId);
|
||||
final FlowDTO flowDto = groupEntity.getProcessGroupFlow().getFlow();
|
||||
for (final ProcessGroupEntity childGroupEntity : flowDto.getProcessGroups()) {
|
||||
final String childGroupId = childGroupEntity.getId();
|
||||
disableControllerServices(childGroupId, recurse);
|
||||
}
|
||||
}
|
||||
|
||||
return activateControllerServices;
|
||||
}
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ public abstract class NiFiSystemIT {
|
|||
|
||||
protected void destroyFlow() throws NiFiClientException, IOException {
|
||||
getClientUtil().stopProcessGroupComponents("root");
|
||||
getClientUtil().disableControllerServices("root");
|
||||
getClientUtil().disableControllerServices("root", true);
|
||||
getClientUtil().stopTransmitting("root");
|
||||
getClientUtil().deleteAll("root");
|
||||
}
|
||||
|
|
|
@ -426,7 +426,7 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", "root", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
|
||||
|
||||
// Set service's sleep time to the parameter.
|
||||
serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnablend Sleep Time", "#{sleep}"));
|
||||
serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnabled Sleep Time", "#{sleep}"));
|
||||
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
|
||||
|
||||
// Enable the service. It should take 7 seconds for the service to fully enable.
|
||||
|
@ -442,6 +442,51 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParamChangeWhileReferencingControllerServiceDisabling() throws NiFiClientException, IOException, InterruptedException {
|
||||
testParamChangeWhileReferencingControllerServiceDisabling(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParamChangeWhileReferencingControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
|
||||
testParamChangeWhileReferencingControllerServiceDisabling(false);
|
||||
}
|
||||
|
||||
private void testParamChangeWhileReferencingControllerServiceDisabling(final boolean disableServiceBeforeUpdate) throws NiFiClientException, IOException, InterruptedException {
|
||||
final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "7 sec");
|
||||
|
||||
// Set the Parameter Context on the root Process Group
|
||||
final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root");
|
||||
setParameterContext(childGroup.getId(), createdContextEntity);
|
||||
|
||||
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", childGroup.getId(),
|
||||
NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
|
||||
|
||||
// Set service's sleep time to the parameter.
|
||||
serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnDisabled Sleep Time", "#{sleep}"));
|
||||
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
|
||||
|
||||
// Enable the service.
|
||||
getClientUtil().enableControllerService(serviceEntity);
|
||||
|
||||
// Wait for the service to reach of state of ENABLED.
|
||||
getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "ENABLED", Collections.emptyList());
|
||||
|
||||
if (disableServiceBeforeUpdate) {
|
||||
// Disable the service.
|
||||
getClientUtil().disableControllerService(serviceEntity);
|
||||
|
||||
// Wait for service to reach state of DISABLING but not DISABLED. We want to change the parameter that it references while it's disabling.
|
||||
getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "DISABLING", Collections.emptyList());
|
||||
}
|
||||
|
||||
// Change the parameter
|
||||
final ParameterContextUpdateRequestEntity paramUpdateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "1 sec");
|
||||
|
||||
// Wait for the update to complete
|
||||
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParamChangeWhileReferencingProcessorStartingButInvalid() throws NiFiClientException, IOException, InterruptedException {
|
||||
final ParameterContextEntity contextEntity = createParameterContext("clone", "true");
|
||||
|
|
|
@ -88,6 +88,7 @@
|
|||
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
|
||||
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
|
||||
<logger name="org.apache.nifi.connectable.LocalPort" level="DEBUG"/>
|
||||
<logger name="org.apache.nifi.web.util.ClusterReplicationComponentLifecycle" level="DEBUG" />
|
||||
|
||||
|
||||
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
|
||||
|
|
|
@ -88,6 +88,7 @@
|
|||
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
|
||||
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
|
||||
<logger name="org.apache.nifi.connectable.LocalPort" level="DEBUG"/>
|
||||
<logger name="org.apache.nifi.web.util.ClusterReplicationComponentLifecycle" level="DEBUG" />
|
||||
|
||||
<logger name="org.apache.nifi.controller.StandardFlowSynchronizer" level="DEBUG" />
|
||||
<logger name="org.apache.nifi.controller.inheritance" level="DEBUG" />
|
||||
|
|
Loading…
Reference in New Issue