NIFI-8670: Fixed bug in which a Parameter Context Update would fail if an updated parameter was referenced by a Controller Service whose state was ENABLING. Created system test to verify and addressed bug that was encountered in SingleFlowFileConcurrencyIT. (#5137)

This commit is contained in:
markap14 2021-06-09 13:07:33 -04:00 committed by GitHub
parent 69c10f5a69
commit d44dec7345
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 70 additions and 21 deletions

View File

@ -331,8 +331,9 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
final ControllerServiceState state = getState();
if (state != ControllerServiceState.DISABLED) {
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
}
}

View File

@ -910,16 +910,16 @@ public class FlowResource extends ApplicationResource {
OperationAuthorizable.authorizeOperation(authorizable, authorizer, NiFiUserUtils.getNiFiUser());
});
},
() -> serviceFacade.verifyActivateControllerServices(id, desiredState, requestComponentRevisions.keySet()),
() -> serviceFacade.verifyActivateControllerServices(id, desiredState, requestComponentRevisions.keySet()),
(revisions, scheduleComponentsEntity) -> {
final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState());
final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState());
final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
final Map<String, Revision> componentRevisions =
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
// update the controller services
final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions);
final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions);
return generateOkResponse(entity).build();
}
);

View File

@ -323,7 +323,7 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
logger.info("Stopping {} Processors", runningComponents.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(requestUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
componentLifecycle.scheduleComponents(requestUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.WAIT);
if (asyncRequest.isCancelled()) {
return;
@ -339,7 +339,7 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
logger.info("Disabling {} Controller Services", enabledServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT);
if (asyncRequest.isCancelled()) {
return;

View File

@ -854,7 +854,7 @@ public class ParameterContextResource extends ApplicationResource {
final Set<AffectedComponentEntity> enabledControllerServices = affectedComponents.stream()
.filter(entity -> entity.getComponent() != null)
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
.filter(dto -> "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
.filter(dto -> "Enabling".equalsIgnoreCase(dto.getComponent().getState()) || "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
.collect(Collectors.toSet());
stopProcessors(runningProcessors, asyncRequest, componentLifecycle, uri);
@ -965,7 +965,7 @@ public class ParameterContextResource extends ApplicationResource {
logger.info("Stopping {} Processors in order to update Parameter Context", processors.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.WAIT);
}
private void restartProcessors(final Set<AffectedComponentEntity> processors, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri)
@ -1000,7 +1000,7 @@ public class ParameterContextResource extends ApplicationResource {
logger.info("Disabling {} Controller Services in order to update Parameter Context", controllerServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT);
}
private void enableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,

View File

@ -572,10 +572,19 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
}
final String validationStatus = serviceDto.getValidationStatus();
final boolean desiredStateReached = desiredStateName.equals(serviceDto.getState());
logger.debug("ControllerService[id={}, name={}] now has a state of {} with a Validation Status of {}; desired state = {}; invalid component action is {}; desired state reached = {}",
serviceDto.getId(), serviceDto.getName(), serviceDto.getState(), validationStatus, desiredState, invalidComponentAction, desiredStateReached);
if (desiredStateReached) {
continue;
}
// The desired state for this component has not yet been reached. Check how we should handle this based on the validation status.
if (ControllerServiceDTO.INVALID.equals(validationStatus)) {
switch (invalidComponentAction) {
case WAIT:
allReachedDesiredState = false;
break;
case SKIP:
continue;
@ -585,10 +594,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
}
}
if (!desiredStateName.equalsIgnoreCase(serviceDto.getState())) {
allReachedDesiredState = false;
break;
}
allReachedDesiredState = false;
}
if (allReachedDesiredState) {

View File

@ -391,11 +391,20 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
}
final ControllerServiceDTO serviceDto = serviceEntity.getComponent();
final boolean desiredStateReached = desiredStateName.equals(serviceDto.getState());
final String validationStatus = serviceDto.getValidationStatus();
logger.debug("ControllerService[id={}, name={}] now has a state of {} with a Validation Status of {}; desired state = {}; invalid component action is {}; desired state reached = {}",
serviceDto.getId(), serviceDto.getName(), serviceDto.getState(), validationStatus, desiredState, invalidComponentAction, desiredStateReached);
if (desiredStateReached) {
continue;
}
// The desired state for this component has not yet been reached. Check how we should handle this based on the validation status.
if (ControllerServiceDTO.INVALID.equals(validationStatus)) {
switch (invalidComponentAction) {
case WAIT:
allReachedDesiredState = false;
break;
case SKIP:
continue;
@ -405,10 +414,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
}
}
if (!desiredStateName.equals(serviceDto.getState())) {
allReachedDesiredState = false;
break;
}
allReachedDesiredState = false;
}
if (allReachedDesiredState) {

View File

@ -244,7 +244,11 @@ public class NiFiClientUtil {
while (true) {
final ParameterContextUpdateRequestEntity entity = nifiClient.getParamContextClient().getParamContextUpdateRequest(contextId, requestId);
if (entity.getRequest().isComplete()) {
return;
if (entity.getRequest().getFailureReason() == null) {
return;
}
throw new RuntimeException("Parameter Context Update failed: " + entity.getRequest().getFailureReason());
}
Thread.sleep(100L);

View File

@ -307,6 +307,7 @@ public abstract class NiFiSystemIT {
try {
return getNifiClient().getFlowClient().getConnectionStatus(connectionId, true);
} catch (final Exception e) {
e.printStackTrace();
Assert.fail("Failed to obtain connection status");
return null;
}

View File

@ -416,6 +416,31 @@ public class ParameterContextIT extends NiFiSystemIT {
}
}
@Test
public void testParamChangeWhileReferencingControllerServiceEnabling() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "7 sec");
// Set the Parameter Context on the root Process Group
setParameterContext("root", createdContextEntity);
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", "root", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
// Set service's sleep time to the parameter.
serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnablend Sleep Time", "#{sleep}"));
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
// Enable the service. It should take 7 seconds for the service to fully enable.
getClientUtil().enableControllerService(serviceEntity);
// Wait for the service to reach of state of ENABLING but not enabled. We want to change the parameter that it references while it's enabling.
getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "ENABLING", Collections.emptyList());
// While the service is enabling, change the parameter
final ParameterContextUpdateRequestEntity paramUpdateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "1 sec");
// Wait for the update to complete
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
}
@Test
public void testProcessorRestartedWhenParameterChanged() throws NiFiClientException, IOException, InterruptedException {

View File

@ -199,9 +199,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
waitForQueueCount(inputToSecondOut.getId(), 1);
assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
// Stop processor so that it won't generate data upon restart
getNifiClient().getProcessorClient().stopProcessor(generate);
// Everything is queued up at an Output Port so the first Output Port should run and its queue should become empty.
waitForQueueCount(inputToOutput.getId(), 0);
// Wait a bit before shutting down so that nifi has a chance to save the changes to the flow
Thread.sleep(2000L);
// Restart nifi.
getNiFiInstance().stop();
getNiFiInstance().start(true);