From d44dec7345bea5671b6858927dc71f8318c592d4 Mon Sep 17 00:00:00 2001 From: markap14 Date: Wed, 9 Jun 2021 13:07:33 -0400 Subject: [PATCH] NIFI-8670: Fixed bug in which a Parameter Context Update would fail if an updated parameter was referenced by a Controller Service whose state was ENABLING. Created system test to verify and addressed bug that was encountered in SingleFlowFileConcurrencyIT. (#5137) --- .../StandardControllerServiceNode.java | 5 ++-- .../org/apache/nifi/web/api/FlowResource.java | 6 ++--- .../nifi/web/api/FlowUpdateResource.java | 4 +-- .../web/api/ParameterContextResource.java | 6 ++--- .../ClusterReplicationComponentLifecycle.java | 16 ++++++++---- .../web/util/LocalComponentLifecycle.java | 16 ++++++++---- .../nifi/tests/system/NiFiClientUtil.java | 6 ++++- .../nifi/tests/system/NiFiSystemIT.java | 1 + .../system/parameters/ParameterContextIT.java | 25 +++++++++++++++++++ .../pg/SingleFlowFileConcurrencyIT.java | 6 +++++ 10 files changed, 70 insertions(+), 21 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 17e97eca01..b2b98a66f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -331,8 +331,9 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme @Override public void verifyCanEnable(final Set ignoredReferences) { - if (getState() != ControllerServiceState.DISABLED) { - throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled"); + final ControllerServiceState state = getState(); + if (state != ControllerServiceState.DISABLED) { + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 3483bd355d..0816c42c0b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -910,16 +910,16 @@ public class FlowResource extends ApplicationResource { OperationAuthorizable.authorizeOperation(authorizable, authorizer, NiFiUserUtils.getNiFiUser()); }); }, - () -> serviceFacade.verifyActivateControllerServices(id, desiredState, requestComponentRevisions.keySet()), + () -> serviceFacade.verifyActivateControllerServices(id, desiredState, requestComponentRevisions.keySet()), (revisions, scheduleComponentsEntity) -> { - final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState()); + final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState()); final Map componentsToSchedule = scheduleComponentsEntity.getComponents(); final Map componentRevisions = componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); // update the controller services - final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions); + final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions); return generateOkResponse(entity).build(); } ); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java index 63a38f59ba..1dbccbd3d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java @@ -323,7 +323,7 @@ public abstract class FlowUpdateResource enabledControllerServices = affectedComponents.stream() .filter(entity -> entity.getComponent() != null) .filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType())) - .filter(dto -> "Enabled".equalsIgnoreCase(dto.getComponent().getState())) + .filter(dto -> "Enabling".equalsIgnoreCase(dto.getComponent().getState()) || "Enabled".equalsIgnoreCase(dto.getComponent().getState())) .collect(Collectors.toSet()); stopProcessors(runningProcessors, asyncRequest, componentLifecycle, uri); @@ -965,7 +965,7 @@ public class ParameterContextResource extends ApplicationResource { logger.info("Stopping {} Processors in order to update Parameter Context", processors.size()); final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); asyncRequest.setCancelCallback(stopComponentsPause::cancel); - componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP); + componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.WAIT); } private void restartProcessors(final Set processors, final AsynchronousWebRequest asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri) @@ -1000,7 +1000,7 @@ public class ParameterContextResource extends ApplicationResource { logger.info("Disabling {} Controller Services in order to update Parameter Context", controllerServices.size()); final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); asyncRequest.setCancelCallback(disableServicesPause::cancel); - componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP); + componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT); } private void enableControllerServices(final Set controllerServices, final AsynchronousWebRequest asyncRequest, final ComponentLifecycle componentLifecycle, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java index 6794040c26..f61b26da26 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -572,10 +572,19 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle } final String validationStatus = serviceDto.getValidationStatus(); + final boolean desiredStateReached = desiredStateName.equals(serviceDto.getState()); + + logger.debug("ControllerService[id={}, name={}] now has a state of {} with a Validation Status of {}; desired state = {}; invalid component action is {}; desired state reached = {}", + serviceDto.getId(), serviceDto.getName(), serviceDto.getState(), validationStatus, desiredState, invalidComponentAction, desiredStateReached); + + if (desiredStateReached) { + continue; + } + + // The desired state for this component has not yet been reached. Check how we should handle this based on the validation status. if (ControllerServiceDTO.INVALID.equals(validationStatus)) { switch (invalidComponentAction) { case WAIT: - allReachedDesiredState = false; break; case SKIP: continue; @@ -585,10 +594,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle } } - if (!desiredStateName.equalsIgnoreCase(serviceDto.getState())) { - allReachedDesiredState = false; - break; - } + allReachedDesiredState = false; } if (allReachedDesiredState) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java index 473ede465f..34cf29f8de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java @@ -391,11 +391,20 @@ public class LocalComponentLifecycle implements ComponentLifecycle { } final ControllerServiceDTO serviceDto = serviceEntity.getComponent(); + final boolean desiredStateReached = desiredStateName.equals(serviceDto.getState()); + final String validationStatus = serviceDto.getValidationStatus(); + logger.debug("ControllerService[id={}, name={}] now has a state of {} with a Validation Status of {}; desired state = {}; invalid component action is {}; desired state reached = {}", + serviceDto.getId(), serviceDto.getName(), serviceDto.getState(), validationStatus, desiredState, invalidComponentAction, desiredStateReached); + + if (desiredStateReached) { + continue; + } + + // The desired state for this component has not yet been reached. Check how we should handle this based on the validation status. if (ControllerServiceDTO.INVALID.equals(validationStatus)) { switch (invalidComponentAction) { case WAIT: - allReachedDesiredState = false; break; case SKIP: continue; @@ -405,10 +414,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle { } } - if (!desiredStateName.equals(serviceDto.getState())) { - allReachedDesiredState = false; - break; - } + allReachedDesiredState = false; } if (allReachedDesiredState) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index feb7bda40c..2f7cdc541c 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -244,7 +244,11 @@ public class NiFiClientUtil { while (true) { final ParameterContextUpdateRequestEntity entity = nifiClient.getParamContextClient().getParamContextUpdateRequest(contextId, requestId); if (entity.getRequest().isComplete()) { - return; + if (entity.getRequest().getFailureReason() == null) { + return; + } + + throw new RuntimeException("Parameter Context Update failed: " + entity.getRequest().getFailureReason()); } Thread.sleep(100L); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index c27f429751..08348eb35f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -307,6 +307,7 @@ public abstract class NiFiSystemIT { try { return getNifiClient().getFlowClient().getConnectionStatus(connectionId, true); } catch (final Exception e) { + e.printStackTrace(); Assert.fail("Failed to obtain connection status"); return null; } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java index 1949d45f8c..2b61f90fed 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java @@ -416,6 +416,31 @@ public class ParameterContextIT extends NiFiSystemIT { } } + @Test + public void testParamChangeWhileReferencingControllerServiceEnabling() throws NiFiClientException, IOException, InterruptedException { + final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "7 sec"); + + // Set the Parameter Context on the root Process Group + setParameterContext("root", createdContextEntity); + + final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", "root", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion()); + + // Set service's sleep time to the parameter. + serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnablend Sleep Time", "#{sleep}")); + getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity); + + // Enable the service. It should take 7 seconds for the service to fully enable. + getClientUtil().enableControllerService(serviceEntity); + + // Wait for the service to reach of state of ENABLING but not enabled. We want to change the parameter that it references while it's enabling. + getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "ENABLING", Collections.emptyList()); + + // While the service is enabling, change the parameter + final ParameterContextUpdateRequestEntity paramUpdateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "1 sec"); + + // Wait for the update to complete + getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId()); + } @Test public void testProcessorRestartedWhenParameterChanged() throws NiFiClientException, IOException, InterruptedException { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java index a35541d2f4..18f9f9ed4f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -199,9 +199,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { waitForQueueCount(inputToSecondOut.getId(), 1); assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + // Stop processor so that it won't generate data upon restart + getNifiClient().getProcessorClient().stopProcessor(generate); + // Everything is queued up at an Output Port so the first Output Port should run and its queue should become empty. waitForQueueCount(inputToOutput.getId(), 0); + // Wait a bit before shutting down so that nifi has a chance to save the changes to the flow + Thread.sleep(2000L); + // Restart nifi. getNiFiInstance().stop(); getNiFiInstance().start(true);