mirror of https://github.com/apache/nifi.git
NIFI-5944: When components are started on NiFi startup, if they are invalid, don't fail immediately and give up. Instead, keep attempting to start the component when it becomes valid.
This closes #3259
This commit is contained in:
parent
3e52ae952d
commit
706cf7dcff
|
@ -373,9 +373,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
}
|
||||
|
||||
@Override
|
||||
public final void performValidation() {
|
||||
boolean replaced = false;
|
||||
do {
|
||||
public final ValidationStatus performValidation() {
|
||||
while (true) {
|
||||
final ValidationState validationState = getValidationState();
|
||||
|
||||
final ValidationContext validationContext = getValidationContext();
|
||||
|
@ -391,8 +390,11 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
|||
|
||||
final ValidationStatus status = results.isEmpty() ? ValidationStatus.VALID : ValidationStatus.INVALID;
|
||||
final ValidationState updatedState = new ValidationState(status, results);
|
||||
replaced = replaceValidationState(validationState, updatedState);
|
||||
} while (!replaced);
|
||||
final boolean replaced = replaceValidationState(validationState, updatedState);
|
||||
if (replaced) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Collection<ValidationResult> computeValidationErrors(final ValidationContext validationContext) {
|
||||
|
|
|
@ -179,7 +179,7 @@ public interface ComponentNode extends ComponentAuthorizable {
|
|||
/**
|
||||
* Asynchronously begins the validation process
|
||||
*/
|
||||
public abstract void performValidation();
|
||||
public abstract ValidationStatus performValidation();
|
||||
|
||||
/**
|
||||
* Returns a {@link List} of all {@link PropertyDescriptor}s that this
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.controller;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.components.validation.ValidationTrigger;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.controller.scheduling.LifecycleState;
|
||||
|
@ -144,7 +145,13 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
|
|||
public ScheduledState getScheduledState() {
|
||||
ScheduledState sc = this.scheduledState.get();
|
||||
if (sc == ScheduledState.STARTING) {
|
||||
return ScheduledState.RUNNING;
|
||||
final ValidationStatus validationStatus = getValidationStatus();
|
||||
|
||||
if (validationStatus == ValidationStatus.INVALID) {
|
||||
return ScheduledState.STOPPED;
|
||||
} else {
|
||||
return ScheduledState.RUNNING;
|
||||
}
|
||||
} else if (sc == ScheduledState.STOPPING) {
|
||||
return ScheduledState.STOPPED;
|
||||
}
|
||||
|
@ -240,4 +247,12 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
|
|||
* will result in the WARN message if processor can not be enabled.
|
||||
*/
|
||||
public abstract void disable();
|
||||
|
||||
/**
|
||||
* Returns the Scheduled State that is desired for this Processor. This may vary from the current state if the Processor is not
|
||||
* currently valid, is in the process of stopping but should then transition to Running, etc.
|
||||
*
|
||||
* @return the desired state for this Processor
|
||||
*/
|
||||
public abstract ScheduledState getDesiredState();
|
||||
}
|
||||
|
|
|
@ -912,7 +912,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
|
||||
try {
|
||||
if (connectable instanceof ProcessorNode) {
|
||||
((ProcessorNode) connectable).getValidationStatus(5, TimeUnit.SECONDS);
|
||||
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
|
||||
} else {
|
||||
startConnectable(connectable);
|
||||
|
@ -1242,7 +1241,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
return ScheduledState.RUNNING;
|
||||
}
|
||||
|
||||
return procNode.getScheduledState();
|
||||
return procNode.getDesiredState();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1699,7 +1698,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
|
||||
}
|
||||
|
||||
reportingTaskNode.performValidation(); // ensure that the reporting task has completed its validation before attempting to start it
|
||||
reportingTaskNode.verifyCanStart();
|
||||
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
|
||||
processScheduler.schedule(reportingTaskNode);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.nifi.components.ConfigurableComponent;
|
|||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.validation.ValidationState;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.components.validation.ValidationTrigger;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.connectable.ConnectableType;
|
||||
|
@ -139,7 +140,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
private final ProcessScheduler processScheduler;
|
||||
private long runNanos = 0L;
|
||||
private volatile long yieldNanos;
|
||||
private volatile ScheduledState desiredState;
|
||||
private volatile ScheduledState desiredState = ScheduledState.STOPPED;
|
||||
private volatile LogLevel bulletinLevel = LogLevel.WARN;
|
||||
|
||||
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
|
||||
|
@ -1343,13 +1344,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final ProcessContext processContext,
|
||||
final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {
|
||||
|
||||
switch (getValidationStatus()) {
|
||||
case INVALID:
|
||||
throw new IllegalStateException("Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
|
||||
case VALIDATING:
|
||||
throw new IllegalStateException("Processor " + this.getName() + " cannot be started because its validation is still being performed");
|
||||
}
|
||||
|
||||
final Processor processor = processorRef.get().getProcessor();
|
||||
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
|
||||
|
||||
|
@ -1487,6 +1481,25 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
|
||||
// Create a task to invoke the @OnScheduled annotation of the processor
|
||||
final Callable<Void> startupTask = () -> {
|
||||
final ScheduledState currentScheduleState = scheduledState.get();
|
||||
if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED) {
|
||||
LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this);
|
||||
schedulingAgentCallback.onTaskComplete();
|
||||
return null;
|
||||
}
|
||||
|
||||
final ValidationStatus validationStatus = getValidationStatus();
|
||||
if (validationStatus != ValidationStatus.VALID) {
|
||||
LOG.debug("Cannot start {} because Processor is currently not valid; will try again after 5 seconds", StandardProcessorNode.this);
|
||||
|
||||
// re-initiate the entire process
|
||||
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContext, schedulingAgentCallback);
|
||||
taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
|
||||
|
||||
schedulingAgentCallback.onTaskComplete();
|
||||
return null;
|
||||
}
|
||||
|
||||
LOG.debug("Invoking @OnScheduled methods of {}", processor);
|
||||
|
||||
// Now that the task has been scheduled, set the timeout
|
||||
|
@ -1696,6 +1709,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledState getDesiredState() {
|
||||
return desiredState;
|
||||
}
|
||||
|
||||
private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> monitoringFuture, final long completionTimestamp) {
|
||||
if (taskFuture.isDone()) {
|
||||
|
|
|
@ -16,16 +16,11 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.reporting;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.configuration.DefaultSchedule;
|
||||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.components.ConfigurableComponent;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.components.validation.ValidationTrigger;
|
||||
import org.apache.nifi.controller.AbstractComponentNode;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
|
@ -51,6 +46,12 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public abstract class AbstractReportingTaskNode extends AbstractComponentNode implements ReportingTaskNode {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class);
|
||||
|
@ -176,7 +177,7 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
|
|||
|
||||
@Override
|
||||
public boolean isValidationNecessary() {
|
||||
return !processScheduler.isScheduled(this);
|
||||
return !processScheduler.isScheduled(this) || getValidationStatus() != ValidationStatus.VALID;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.components.state.StateManagerProvider;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.connectable.Funnel;
|
||||
import org.apache.nifi.connectable.Port;
|
||||
|
@ -186,13 +187,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
|
||||
}
|
||||
|
||||
switch (taskNode.getValidationStatus()) {
|
||||
case INVALID:
|
||||
throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors());
|
||||
case VALIDATING:
|
||||
throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be scheduled because it is in the process of validating its configuration");
|
||||
}
|
||||
|
||||
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
|
||||
lifecycleState.setScheduled(true);
|
||||
|
||||
|
@ -216,6 +210,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
return;
|
||||
}
|
||||
|
||||
final ValidationStatus validationStatus = taskNode.getValidationStatus();
|
||||
if (validationStatus != ValidationStatus.VALID) {
|
||||
LOG.debug("Cannot schedule {} to run because it is currently invalid. Will try again in 5 seconds", taskNode);
|
||||
componentLifeCycleThreadPool.schedule(this, 5, TimeUnit.SECONDS);
|
||||
return;
|
||||
}
|
||||
|
||||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
|
||||
}
|
||||
|
@ -231,8 +232,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
|||
+ "ReportingTask and will attempt to schedule it again after {}",
|
||||
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
|
||||
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
|
||||
|
||||
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
|
||||
}
|
||||
|
||||
componentLifeCycleThreadPool.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public interface ScheduledStateLookup {
|
|||
public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() {
|
||||
@Override
|
||||
public ScheduledState getScheduledState(final ProcessorNode procNode) {
|
||||
return procNode.getScheduledState();
|
||||
return procNode.getDesiredState();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,25 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.service;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.Restricted;
|
||||
import org.apache.nifi.annotation.documentation.DeprecationNotice;
|
||||
|
@ -47,8 +28,7 @@ import org.apache.nifi.authorization.resource.ResourceType;
|
|||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.components.ConfigurableComponent;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.validation.ValidationState;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.components.validation.ValidationTrigger;
|
||||
import org.apache.nifi.controller.AbstractComponentNode;
|
||||
import org.apache.nifi.controller.ComponentNode;
|
||||
|
@ -71,6 +51,24 @@ import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
|
||||
|
@ -319,14 +317,6 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
if (getState() != ControllerServiceState.DISABLED) {
|
||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
|
||||
}
|
||||
|
||||
final ValidationState validationState = getValidationState();
|
||||
switch (validationState.getStatus()) {
|
||||
case INVALID:
|
||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + validationState.getValidationErrors());
|
||||
case VALIDATING:
|
||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because its validation has not yet completed");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -334,11 +324,6 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
if (getState() != ControllerServiceState.DISABLED) {
|
||||
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
|
||||
}
|
||||
|
||||
final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences);
|
||||
if (ignoredReferences != null && !validationErrors.isEmpty()) {
|
||||
throw new IllegalStateException("Controller Service with ID " + getIdentifier() + " cannot be enabled because it is not currently valid: " + validationErrors);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -389,8 +374,11 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
case DISABLED:
|
||||
case DISABLING:
|
||||
return true;
|
||||
case ENABLED:
|
||||
case ENABLING:
|
||||
// If enabling and currently not valid, then we must trigger validation to occur. This allows the #enable method
|
||||
// to continue running in the background and complete enabling when the service becomes valid.
|
||||
return getValidationStatus() != ValidationStatus.VALID;
|
||||
case ENABLED:
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
@ -398,7 +386,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
|
||||
/**
|
||||
* Will atomically enable this service by invoking its @OnEnabled operation.
|
||||
* It uses CAS operation on {@link #stateRef} to transition this service
|
||||
* It uses CAS operation on {@link #stateTransition} to transition this service
|
||||
* from DISABLED to ENABLING state. If such transition succeeds the service
|
||||
* will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
|
||||
* If such transition doesn't succeed then no enabling logic will be
|
||||
|
@ -429,6 +417,20 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
scheduler.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!isActive()) {
|
||||
LOG.debug("{} is no longer active so will not attempt to enable it", StandardControllerServiceNode.this);
|
||||
stateTransition.disable();
|
||||
return;
|
||||
}
|
||||
|
||||
final ValidationStatus validationStatus = getValidationStatus();
|
||||
if (validationStatus != ValidationStatus.VALID) {
|
||||
LOG.debug("Cannot enable {} because it is not currently valid. Will try again in 5 seconds", StandardControllerServiceNode.this);
|
||||
scheduler.schedule(this, 5, TimeUnit.SECONDS);
|
||||
future.completeExceptionally(new RuntimeException(this + " cannot be enabled because it is not currently valid. Will try again in 5 seconds."));
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
|
||||
|
@ -446,7 +448,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
invokeDisable(configContext);
|
||||
stateTransition.disable();
|
||||
} else {
|
||||
LOG.debug("Successfully enabled {}", service);
|
||||
LOG.info("Successfully enabled {}", service);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
|
@ -478,7 +480,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
|
||||
/**
|
||||
* Will atomically disable this service by invoking its @OnDisabled operation.
|
||||
* It uses CAS operation on {@link #stateRef} to transition this service
|
||||
* It uses CAS operation on {@link #stateTransition} to transition this service
|
||||
* from ENABLED to DISABLING state. If such transition succeeds the service
|
||||
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
|
||||
* If such transition doesn't succeed (the service is still in ENABLING state)
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.bundle.Bundle;
|
|||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.state.StateManagerProvider;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.components.validation.ValidationTrigger;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
|
@ -92,6 +93,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -342,6 +344,8 @@ public class TestStandardProcessScheduler {
|
|||
final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
|
||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
|
||||
|
||||
serviceNode.performValidation();
|
||||
|
||||
assertFalse(serviceNode.isActive());
|
||||
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
|
||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
||||
|
@ -361,10 +365,10 @@ public class TestStandardProcessScheduler {
|
|||
}
|
||||
});
|
||||
}
|
||||
// need to sleep a while since we are emulating async invocations on
|
||||
// method that is also internally async
|
||||
Thread.sleep(500);
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
|
||||
assertFalse(asyncFailed.get());
|
||||
assertEquals(1, ts.enableInvocationCount());
|
||||
}
|
||||
|
@ -399,10 +403,9 @@ public class TestStandardProcessScheduler {
|
|||
}
|
||||
});
|
||||
}
|
||||
// need to sleep a while since we are emulating async invocations on
|
||||
// method that is also internally async
|
||||
Thread.sleep(500);
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
assertFalse(asyncFailed.get());
|
||||
assertEquals(0, ts.disableInvocationCount());
|
||||
}
|
||||
|
@ -419,8 +422,10 @@ public class TestStandardProcessScheduler {
|
|||
final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
|
||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
|
||||
|
||||
assertSame(ValidationStatus.VALID, serviceNode.performValidation());
|
||||
|
||||
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
|
||||
scheduler.enableControllerService(serviceNode);
|
||||
scheduler.enableControllerService(serviceNode).get();
|
||||
assertTrue(serviceNode.isActive());
|
||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
||||
|
||||
|
@ -441,8 +446,8 @@ public class TestStandardProcessScheduler {
|
|||
}
|
||||
// need to sleep a while since we are emulating async invocations on
|
||||
// method that is also internally async
|
||||
Thread.sleep(500);
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
|
||||
assertFalse(asyncFailed.get());
|
||||
assertEquals(1, ts.disableInvocationCount());
|
||||
}
|
||||
|
@ -453,9 +458,17 @@ public class TestStandardProcessScheduler {
|
|||
|
||||
final ControllerServiceNode serviceNode = flowManager.createControllerService(FailingService.class.getName(),
|
||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
|
||||
scheduler.enableControllerService(serviceNode);
|
||||
Thread.sleep(1000);
|
||||
serviceNode.performValidation();
|
||||
|
||||
final Future<?> future = scheduler.enableControllerService(serviceNode);
|
||||
try {
|
||||
future.get();
|
||||
} catch (final Exception e) {
|
||||
// Expected behavior because the FailingService throws Exception when attempting to enable
|
||||
}
|
||||
|
||||
scheduler.shutdown();
|
||||
|
||||
/*
|
||||
* Because it was never disabled it will remain active since its
|
||||
* enabling is being retried. This may actually be a bug in the
|
||||
|
@ -528,14 +541,20 @@ public class TestStandardProcessScheduler {
|
|||
|
||||
final ControllerServiceNode serviceNode = flowManager.createControllerService(LongEnablingService.class.getName(),
|
||||
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
|
||||
|
||||
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
|
||||
ts.setLimit(Long.MAX_VALUE);
|
||||
|
||||
serviceNode.performValidation();
|
||||
scheduler.enableControllerService(serviceNode);
|
||||
Thread.sleep(100);
|
||||
|
||||
assertTrue(serviceNode.isActive());
|
||||
final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
|
||||
while (ts.enableInvocationCount() != 1 && System.nanoTime() <= maxTime) {
|
||||
Thread.sleep(1L);
|
||||
}
|
||||
assertEquals(1, ts.enableInvocationCount());
|
||||
|
||||
Thread.sleep(1000);
|
||||
scheduler.disableControllerService(serviceNode);
|
||||
assertFalse(serviceNode.isActive());
|
||||
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.nifi.bundle.Bundle;
|
|||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.components.state.StateManagerProvider;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.components.validation.ValidationTrigger;
|
||||
import org.apache.nifi.controller.ExtensionBuilder;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
|
@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestStandardControllerServiceProvider {
|
||||
|
@ -197,8 +199,8 @@ public class TestStandardControllerServiceProvider {
|
|||
provider.disableControllerService(serviceNode);
|
||||
}
|
||||
|
||||
@Test(timeout = 1000000)
|
||||
public void testEnableDisableWithReference() {
|
||||
@Test(timeout = 10000)
|
||||
public void testEnableDisableWithReference() throws InterruptedException {
|
||||
final ProcessGroup group = new MockProcessGroup(controller);
|
||||
final FlowController controller = Mockito.mock(FlowController.class);
|
||||
final FlowManager flowManager = Mockito.mock(FlowManager.class);
|
||||
|
@ -221,17 +223,24 @@ public class TestStandardControllerServiceProvider {
|
|||
|
||||
try {
|
||||
provider.enableControllerService(serviceNodeA);
|
||||
Assert.fail("Was able to enable Service A but Service B is disabled.");
|
||||
} catch (final IllegalStateException expected) {
|
||||
}
|
||||
|
||||
assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
|
||||
|
||||
serviceNodeB.performValidation();
|
||||
serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
|
||||
assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS));
|
||||
provider.enableControllerService(serviceNodeB);
|
||||
|
||||
serviceNodeA.performValidation();
|
||||
serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
|
||||
provider.enableControllerService(serviceNodeA);
|
||||
assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS));
|
||||
|
||||
final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
|
||||
// Wait for Service A to become ENABLED. This will happen in a background thread after approximately 5 seconds, now that Service A is valid.
|
||||
while (serviceNodeA.getState() != ControllerServiceState.ENABLED && System.nanoTime() <= maxTime) {
|
||||
Thread.sleep(5L);
|
||||
}
|
||||
assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
|
||||
|
||||
try {
|
||||
provider.disableControllerService(serviceNodeB);
|
||||
|
|
Loading…
Reference in New Issue