NIFI-8731: If a processor is running but made invalid due to a parameter update, it ends up in a state of STARTING. If the parameter is then updated again, it transitions the state to STOPPING but the processor is not fully stopped yet. At that point, the parameter is updated and the processor is attempted to be started again. Fixed this by keeping the number of active threads to >= 1 if processor is STOPPING in order to convey that it is not fully stopped. Also addressed a few minor bugs discovered in the process: when stopping a processor, if status == invalid, it should be skipped instead of waiting for the status to become stopped since it never will be. In the DTO's run status use Stopped instead of Invalid if there is at least 1 active thread / if stopping but not stopped. When considering if a processor has transitioned to the desired state for parameter updates, do not consider validation status if still transitioning to stopped or if the desired state has already been reached. Added new system tests to verify behavior. (#5180)

This closes #5180
This commit is contained in:
markap14 2021-06-25 12:40:49 -04:00 committed by GitHub
parent 1313ee3d90
commit 88cc232f15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 40 deletions

View File

@ -1025,7 +1025,29 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public int getActiveThreadCount() {
return processScheduler.getActiveThreadCount(this);
final int activeThreadCount = processScheduler.getActiveThreadCount(this);
// When getScheduledState() is called, we map the 'physical' state of STOPPING to STOPPED. This is done in order to maintain
// backward compatibility because the UI and other clients will not know of the (relatively newer) 'STOPPING' state.
// Because of there previously was no STOPPING state, the way to determine of a processor had truly stopped was to check if its
// Scheduled State was STOPPED AND it had no active threads.
//
// Also, we can have a situation in which a processor is started while invalid. Before the processor becomes valid, it can be stopped.
// In this situation, the processor state will become STOPPING until the background thread checks the state, calls any necessary lifecycle methods,
// and finally updates the state to STOPPED. In the interim, we have a situation where a call to getScheduledState() returns STOPPED and there are no
// active threads, which the client will interpret as the processor being fully stopped. However, in this situation, an attempt to update the processor, etc.
// will fail because the processor is not truly fully stopped.
//
// To prevent this situation, we return 1 for the number of active tasks when the processor is considered STOPPING. In doing this, we ensure that the condition
// of (getScheduledState() == STOPPED and activeThreads == 0) never happens while the processor is still stopping.
//
// This probably is calling for a significant refactoring / rethinking of this class. It would make sense, for example, to extract some of the logic into a separate
// StateTransition class as we've done with Controller Services. That would at least more cleanly encapsulate this logic. However, this is a simple enough work around for the time being.
if (activeThreadCount == 0 && getPhysicalScheduledState() == ScheduledState.STOPPING) {
return 1;
}
return activeThreadCount;
}
List<Connection> getIncomingNonLoopConnections() {
@ -1389,7 +1411,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
private void run(ScheduledExecutorService taskScheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredSate, ScheduledState scheduledState) {
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredState, ScheduledState scheduledState) {
final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
@ -1403,10 +1425,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
if (currentState == ScheduledState.STOPPED) {
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState);
if (starting) {
desiredState = desiredSate;
this.desiredState = desiredState;
}
} else if (currentState == ScheduledState.STOPPING && !failIfStopping) {
desiredState = desiredSate;
this.desiredState = desiredState;
return;
} else {
starting = false;

View File

@ -2513,7 +2513,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (procNode.isRunning()) {
throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it is running");
}
final int activeThreadCount = scheduler.getActiveThreadCount(procNode);
final int activeThreadCount = procNode.getActiveThreadCount();
if (activeThreadCount != 0) {
throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it still has " + activeThreadCount + " active threads");
}

View File

@ -621,7 +621,7 @@ public abstract class AbstractEventAccess implements EventAccess {
status.setRunStatus(RunStatus.Running);
} else if (procNode.getValidationStatus() == ValidationStatus.VALIDATING) {
status.setRunStatus(RunStatus.Validating);
} else if (procNode.getValidationStatus() == ValidationStatus.INVALID) {
} else if (procNode.getValidationStatus() == ValidationStatus.INVALID && procNode.getActiveThreadCount() == 0) {
status.setRunStatus(RunStatus.Invalid);
} else {
status.setRunStatus(RunStatus.Stopped);
@ -629,7 +629,7 @@ public abstract class AbstractEventAccess implements EventAccess {
status.setExecutionNode(procNode.getExecutionNode());
status.setTerminatedThreadCount(procNode.getTerminatedThreadCount());
status.setActiveThreadCount(processScheduler.getActiveThreadCount(procNode));
status.setActiveThreadCount(procNode.getActiveThreadCount());
return status;
}

View File

@ -323,7 +323,7 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
logger.info("Stopping {} Processors", runningComponents.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(requestUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.WAIT);
componentLifecycle.scheduleComponents(requestUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;
@ -339,7 +339,7 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
logger.info("Disabling {} Controller Services", enabledServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT);
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;

View File

@ -965,7 +965,7 @@ public class ParameterContextResource extends ApplicationResource {
logger.info("Stopping {} Processors in order to update Parameter Context", processors.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.WAIT);
componentLifecycle.scheduleComponents(uri, "root", processors, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
}
private void restartProcessors(final Set<AffectedComponentEntity> processors, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri)

View File

@ -337,20 +337,34 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
private boolean isProcessorActionComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map<String, AffectedComponentEntity> affectedComponents,
final ScheduledState desiredState, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
final String desiredStateName = desiredState.name();
updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents);
boolean allReachedDesiredState = true;
for (final ProcessorRunStatusDetailsEntity entity : runStatusDetailsEntity.getRunStatusDetails()) {
final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails();
if (!affectedComponents.containsKey(runStatusDetailsDto.getId())) {
continue;
}
if (ProcessorRunStatusDetailsDTO.INVALID.equals(runStatusDetailsDto.getRunStatus())) {
final boolean desiredStateReached = isDesiredProcessorStateReached(runStatusDetailsDto, desiredState);
logger.debug("Processor[id={}, name={}] now has a state of {} with {} Active Threads, Validation Errors: {}; desired state = {}; invalid component action: {}; desired state reached = {}",
runStatusDetailsDto.getId(), runStatusDetailsDto.getName(), runStatusDetailsDto.getRunStatus(), runStatusDetailsDto.getActiveThreadCount(), runStatusDetailsDto.getValidationErrors(),
desiredState, invalidComponentAction, desiredStateReached);
if (desiredStateReached) {
continue;
}
// If the desired state is stopped and there are active threads, return false. We don't consider the validation status in this case.
if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) {
return false;
}
if (ProcessorRunStatusDetailsDTO.INVALID.equalsIgnoreCase(runStatusDetailsDto.getRunStatus())) {
switch (invalidComponentAction) {
case WAIT:
return false;
break;
case SKIP:
continue;
case FAIL:
@ -359,22 +373,33 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
}
}
final String runStatus = runStatusDetailsDto.getRunStatus();
final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
}
allReachedDesiredState = false;
}
if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) {
return false;
}
if (allReachedDesiredState) {
logger.debug("All {} Processors of interest now have the desired state of {}", runStatusDetailsEntity.getRunStatusDetails().size(), desiredState);
return true;
}
return false;
}
private boolean isDesiredProcessorStateReached(final ProcessorRunStatusDetailsDTO runStatusDetailsDto, final ScheduledState desiredState) {
final String runStatus = runStatusDetailsDto.getRunStatus();
final boolean stateMatches = desiredState.name().equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
}
if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) {
return false;
}
return true;
}
@Override
public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final String groupId, final Set<AffectedComponentEntity> affectedServices,
final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
@ -582,7 +607,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
}
// The desired state for this component has not yet been reached. Check how we should handle this based on the validation status.
if (ControllerServiceDTO.INVALID.equals(validationStatus)) {
if (ControllerServiceDTO.INVALID.equalsIgnoreCase(validationStatus)) {
switch (invalidComponentAction) {
case WAIT:
break;

View File

@ -27,7 +27,7 @@ import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
@ -217,11 +217,10 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
});
}
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState,
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
final String desiredStateName = desiredState.name();
updateAffectedProcessors(processorEntities, affectedComponents);
for (final ProcessorEntity entity : processorEntities) {
@ -229,13 +228,27 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
continue;
}
final ProcessorStatusDTO status = entity.getStatus();
final boolean desiredStateReached = isDesiredProcessorStateReached(entity, desiredState);
logger.debug("Processor[id={}, name={}] now has a state of {} with {} Active Threads, Validation Errors: {}; desired state = {}; invalid component action: {}; desired state reached = {}",
entity.getId(), entity.getComponent().getName(), entity.getStatus().getRunStatus(), entity.getStatus().getAggregateSnapshot().getActiveThreadCount(),
entity.getComponent().getValidationErrors(), desiredState, invalidComponentAction, desiredStateReached);
if (ProcessorDTO.INVALID.equals(entity.getComponent().getValidationStatus())) {
if (desiredStateReached) {
continue;
}
// If the desired state is stopped and there are active threads, return false. We don't consider the validation status in this case.
if (desiredState == ScheduledState.STOPPED && entity.getStatus().getAggregateSnapshot().getActiveThreadCount() != 0) {
return false;
}
if (ProcessorRunStatusDetailsDTO.INVALID.equalsIgnoreCase(entity.getComponent().getValidationStatus())) {
switch (invalidComponentAction) {
case WAIT:
return false;
break;
case SKIP:
logger.debug("Processor[id={}, name={}] is invalid. Skipping over this processor when looking for Desired State of {} because Invalid Component Action = SKIP",
entity.getId(), entity.getComponent().getName(), desiredState);
continue;
case FAIL:
final String action = desiredState == ScheduledState.RUNNING ? "start" : "stop";
@ -243,20 +256,27 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
}
}
final String runStatus = status.getAggregateSnapshot().getRunStatus();
final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
}
if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) {
return false;
}
return false;
}
return true;
}
private boolean isDesiredProcessorStateReached(final ProcessorEntity processorEntity, final ScheduledState desiredState) {
final String runStatus = processorEntity.getStatus().getRunStatus();
final boolean stateMatches = desiredState.name().equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
}
final Integer activeThreadCount = processorEntity.getStatus().getAggregateSnapshot().getActiveThreadCount();
if (desiredState == ScheduledState.STOPPED && activeThreadCount != 0) {
return false;
}
return true;
}
private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause,
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
@ -402,7 +422,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
}
// The desired state for this component has not yet been reached. Check how we should handle this based on the validation status.
if (ControllerServiceDTO.INVALID.equals(validationStatus)) {
if (ControllerServiceDTO.INVALID.equalsIgnoreCase(validationStatus)) {
switch (invalidComponentAction) {
case WAIT:
break;

View File

@ -149,7 +149,7 @@ public class StatelessProcessScheduler implements ProcessScheduler {
logger.info("Stopping {}", procNode);
final ProcessContext processContext = processContextFactory.createProcessContext(procNode);
final LifecycleState lifecycleState = new LifecycleState();
lifecycleState.setScheduled(true);
lifecycleState.setScheduled(false);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState);
}

View File

@ -442,6 +442,33 @@ public class ParameterContextIT extends NiFiSystemIT {
getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
}
@Test
public void testParamChangeWhileReferencingProcessorStartingButInvalid() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity contextEntity = createParameterContext("clone", "true");
// Set the Parameter Context on the root Process Group
setParameterContext("root", contextEntity);
// Create simple dataflow: GenerateFlowFile -> SplitByLine -> <auto-terminate>
// Set SplitByLine to use a parameter for the "Use Clone" property such that it's valid.
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
ProcessorEntity splitByLine = getClientUtil().createProcessor("SplitByLine");
getClientUtil().updateProcessorProperties(splitByLine, Collections.singletonMap("Use Clone", "#{clone}"));
getClientUtil().setAutoTerminatedRelationships(splitByLine, Collections.singleton("success"));
getClientUtil().createConnection(generate, splitByLine, "success");
getNifiClient().getProcessorClient().startProcessor(splitByLine);
// Change parameter to an invalid value. This will result in the processor being stopped, becoming invalid, and then being transitioned to a 'starting' state while invalid.
final ParameterContextUpdateRequestEntity updateToInvalidRequestEntity = updateParameterContext(contextEntity, "clone", "invalid");
getClientUtil().waitForParameterContextRequestToComplete(contextEntity.getId(), updateToInvalidRequestEntity.getRequest().getRequestId());
// Change back to a valid value and wait for the update to complete
final ParameterContextUpdateRequestEntity updateToValidRequestEntity = updateParameterContext(contextEntity, "clone", "true");
getClientUtil().waitForParameterContextRequestToComplete(contextEntity.getId(), updateToValidRequestEntity.getRequest().getRequestId());
}
@Test
public void testProcessorRestartedWhenParameterChanged() throws NiFiClientException, IOException, InterruptedException {
testProcessorRestartedWhenParameterChanged("#{name}");