NIFI-5204: Ensure that verifyCanStop throws ISE if component is disabled

NIFI-5204: If processor joins cluster and inherits 'disabled' state but is still stopping, ensure that the state becomes disabled when the processor finishes stopping
This closes #2713
This commit is contained in:
Mark Payne 2018-05-16 16:06:00 -04:00 committed by Matt Gilman
parent f5108ea839
commit 2afbf96381
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
4 changed files with 57 additions and 16 deletions

View File

@ -229,20 +229,12 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
* that this processor can be started. This is idempotent operation and will
* result in the WARN message if processor can not be enabled.
*/
public void enable() {
if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED)) {
logger.warn("Processor cannot be enabled because it is not disabled");
}
}
public abstract void enable();
/**
* Will set the state of the processor to DISABLED which essentially implies
* that this processor can NOT be started. This is idempotent operation and
* will result in the WARN message if processor can not be enabled.
*/
public void disable() {
if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED)) {
logger.warn("Processor cannot be disabled because its state is set to " + this.scheduledState);
}
}
public abstract void disable();
}

View File

@ -1268,6 +1268,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
}
@Override
public void enable() {
desiredState = ScheduledState.STOPPED;
final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
if (updated) {
LOG.info("{} enabled so ScheduledState transitioned from DISABLED to STOPPED", this);
} else {
LOG.info("{} enabled but not currently DISABLED so set desired state to STOPPED; current state is {}", this, scheduledState.get());
}
}
@Override
public void disable() {
desiredState = ScheduledState.DISABLED;
final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
if (updated) {
LOG.info("{} disabled so ScheduledState transitioned from STOPPED to DISABLED", this);
} else {
LOG.info("{} disabled but not currently STOPPED so set desired state to DISABLED; current state is {}", this, scheduledState.get());
}
}
/**
* Will idempotently start the processor using the following sequence: <i>
* <ul>
@ -1453,11 +1478,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
deactivateThread();
}
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
if (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor);
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else {
LOG.debug("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now", processor);
LOG.info("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now; current state = {}, desired state = {}",
processor, scheduledState.get(), desiredState);
// can only happen if stopProcessor was called before service was transitioned to RUNNING state
activateThread();
@ -1470,6 +1496,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
scheduledState.set(ScheduledState.STOPPED);
if (desiredState == ScheduledState.DISABLED) {
final boolean disabled = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
if (disabled) {
LOG.info("After stopping {}, determined that Desired State is DISABLED so disabled processor", processor);
}
}
}
} finally {
schedulingAgentCallback.onTaskComplete();
@ -1603,8 +1636,19 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// the Processor is to be running. However, if the Processor is already in the process of stopping, we cannot immediately
// start running the Processor. As a result, we check here, since the Processor is stopped, and then immediately start the
// Processor if need be.
if (desiredState == ScheduledState.RUNNING) {
final ScheduledState desired = StandardProcessorNode.this.desiredState;
if (desired == ScheduledState.RUNNING) {
LOG.info("Finished stopping {} but desired state is now RUNNING so will start processor", this);
processScheduler.startProcessor(StandardProcessorNode.this, true);
} else if (desired == ScheduledState.DISABLED) {
final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
if (updated) {
LOG.info("Finished stopping {} but desired state is now DISABLED so disabled processor", this);
} else {
LOG.info("Finished stopping {} but desired state is now DISABLED. Scheduled State could not be transitioned from STOPPED to DISABLED, "
+ "though, so will allow the other thread to finish state transition. Current state is {}", this, scheduledState.get());
}
}
} else {
// Not all of the active threads have finished. Try again in 100 milliseconds.

View File

@ -2671,6 +2671,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void verifyCanStop(Connectable connectable) {
final ScheduledState state = connectable.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Cannot stop component with id " + connectable + " because it is currently disabled.");
}
}
@Override

View File

@ -33,6 +33,8 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -41,8 +43,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@ -200,6 +200,7 @@ public class DebugFlow extends AbstractProcessor {
.required(true)
.defaultValue("0 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor ON_STOPPED_FAIL = new PropertyDescriptor.Builder()
.name("Fail When @OnStopped called")
@ -339,7 +340,7 @@ public class DebugFlow extends AbstractProcessor {
@OnStopped
public void onStopped(final ProcessContext context) throws InterruptedException {
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS),
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
fail(context.getProperty(ON_STOPPED_FAIL).asBoolean(), OnStopped.class);