NIFI-1164 Fixed race condition and refactored

Changed ControllerServiceNode by adding enable(..), disable(..) and isActive() operations. See javadocs for more details in both ControllerServiceNode and StandardControllerServiceNode

Refactored service enable/disable logic in StandardProcessScheduler and StandardControllerServiceNode . Below are some of the notes:
- No need for resetting class loader since its going to derive from the class loader of the service. In other words any classes that aren’t loaded and will be loaded within the scope of the already loaded service will be loaded by the class lower of that service
- No need to control 'scheduleState.isScheduled()’ since the logic has changed to use CAS operation on state update and the service state change is now atomic.
- Removed Thread.sleep(..) and while(true) loop in favor of rescheduling re-tries achieving better thread utilization since the thread that would normally block in Thread.sleep(..) is now reused.
- Added tests and validated that the race condition no longer happening

Added additional logic that allows the initiation of the service disabling while it is in ENABLING state. See javadoc of StandardProcessScheduler.enable/disable for more details.

NIFI-1164 polishing
This commit is contained in:
Oleg Zhurakousky 2015-12-15 13:21:00 -05:00
parent ebcefaac23
commit 909c0decd6
7 changed files with 430 additions and 179 deletions

View File

@ -17,9 +17,11 @@
package org.apache.nifi.controller.service;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
public interface ControllerServiceNode extends ConfiguredComponent {
@ -53,11 +55,31 @@ public interface ControllerServiceNode extends ConfiguredComponent {
*/
ControllerServiceState getState();
/**
* Updates the state of the Controller Service to the provided new state
* @param state the state to set the service to
/**
* Will enable this service. Enabling of the service typically means
* invoking it's operation that is annotated with @OnEnabled.
*
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate service enabling task as well as its re-tries
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
* @param heartbeater
* the instance of {@link Heartbeater}
*/
void setState(ControllerServiceState state);
void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
/**
* Will disable this service. Disabling of the service typically means
* invoking it's operation that is annotated with @OnDisabled.
*
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate service disabling task
* @param heartbeater
* the instance of {@link Heartbeater}
*/
void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
/**
* @return the ControllerServiceReference that describes which components are referencing this Controller Service
@ -111,4 +133,17 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void verifyCanDelete();
void verifyCanUpdate();
/**
* Returns 'true' if this service is active. The service is considered to be
* active if and only if it's
* {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
* has been invoked and the service has been transitioned to ENABLING state.
* The service will also remain 'active' after its been transitioned to
* ENABLED state.
* <br>
* The service will be de-activated upon invocation of
* {@link #disable(ScheduledExecutorService, Heartbeater)}.
*/
boolean isActive();
}

View File

@ -19,23 +19,15 @@ package org.apache.nifi.controller.scheduling;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -54,8 +46,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
@ -89,7 +79,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ScheduledExecutorService frameworkTaskExecutor;
private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
// thread pool for starting/stopping components
private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000));
private final ScheduledExecutorService componentLifeCycleThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private final StringEncryptor encryptor;
public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
@ -625,145 +617,20 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void enableControllerService(final ControllerServiceNode service) {
service.setState(ControllerServiceState.ENABLING);
final ScheduleState scheduleState = getScheduleState(service);
final Runnable enableRunnable = new Runnable() {
@Override
public void run() {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final long lastStopTime = scheduleState.getLastStopTime();
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
while (true) {
try {
synchronized (scheduleState) {
// if no longer enabled, then we're finished. This can happen, for example,
// if the @OnEnabled method throws an Exception and the user disables the service
// while we're administratively yielded.
//
// we also check if the schedule state's last stop time is equal to what it was before.
// if not, then means that the service has been disabled and enabled again, so we should just
// bail; another thread will be responsible for invoking the @OnEnabled methods.
if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
return;
}
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
heartbeater.heartbeat();
service.setState(ControllerServiceState.ENABLED);
return;
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
Thread.sleep(administrativeYieldMillis);
continue;
}
}
} catch (final Throwable t) {
final Throwable cause = t instanceof InvocationTargetException ? t.getCause() : t;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
}
}
};
scheduleState.setScheduled(true);
componentLifeCycleThreadPool.execute(enableRunnable);
service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
}
@Override
public void disableControllerService(final ControllerServiceNode service) {
disableControllerServices(Collections.singletonList(service));
service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
}
@Override
public void disableControllerServices(final List<ControllerServiceNode> services) {
if (requireNonNull(services).isEmpty()) {
return;
}
final List<ControllerServiceNode> servicesToDisable = new ArrayList<>(services.size());
for (final ControllerServiceNode serviceToDisable : services) {
if (serviceToDisable.getState() == ControllerServiceState.DISABLED || serviceToDisable.getState() == ControllerServiceState.DISABLING) {
continue;
}
servicesToDisable.add(serviceToDisable);
}
if (servicesToDisable.isEmpty()) {
return;
}
// ensure that all controller services can be disabled.
for (final ControllerServiceNode serviceNode : servicesToDisable) {
final Set<ControllerServiceNode> ignoredReferences = new HashSet<>(services);
ignoredReferences.remove(serviceNode);
serviceNode.verifyCanDisable(ignoredReferences);
}
// mark services as disabling
for (final ControllerServiceNode serviceNode : servicesToDisable) {
serviceNode.setState(ControllerServiceState.DISABLING);
final ScheduleState scheduleState = getScheduleState(serviceNode);
synchronized (scheduleState) {
scheduleState.setScheduled(false);
if (!requireNonNull(services).isEmpty()) {
for (ControllerServiceNode controllerServiceNode : services) {
this.disableControllerService(controllerServiceNode);
}
}
final Queue<ControllerServiceNode> nodes = new LinkedList<>(servicesToDisable);
final Runnable disableRunnable = new Runnable() {
@Override
public void run() {
ControllerServiceNode service;
while ((service = nodes.poll()) != null) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
} finally {
service.setState(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
}
}
}
}
};
componentLifeCycleThreadPool.execute(disableRunnable);
}
}

View File

@ -16,28 +16,41 @@
*/
package org.apache.nifi.controller.service;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
private final ControllerService proxedControllerService;
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
@ -51,12 +64,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
private String comment;
private final AtomicBoolean active;
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(implementation, id, validationContextFactory, serviceProvider);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;
this.active = new AtomicBoolean();
}
@Override
@ -146,8 +162,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
final ControllerServiceState state = getState();
if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
if (!this.isActive()) {
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
}
@ -229,7 +244,106 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
@Override
public void setState(final ControllerServiceState state) {
this.stateRef.set(state);
public boolean isActive() {
return this.active.get();
}
/**
* Will atomically enable this service by invoking its @OnEnabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from DISABLED to ENABLING state. If such transition succeeds the service
* will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed then no enabling logic will be
* performed and the method will exit. In other words it is safe to invoke
* this operation multiple times and from multiple threads.
* <br>
* This operation will also perform re-try of service enabling in the event
* of exception being thrown by previous invocation of @OnEnabled.
* <br>
* Upon successful invocation of @OnEnabled this service will be transitioned to
* ENABLED state.
* <br>
* In the event where enabling took longer then expected by the user and such user
* initiated disable operation, this service will be automatically disabled as soon
* as it reached ENABLED state.
*/
@Override
public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)){
this.active.set(true);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
if (stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED)) {
heartbeater.heartbeat();
} else {
LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
// Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
// set to DISABLING (see disable() operation)
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
stateRef.set(ControllerServiceState.DISABLED);
}
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
}
else {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
stateRef.set(ControllerServiceState.DISABLED);
}
}
}
});
}
}
/**
* Will atomically disable this service by invoking its @OnDisabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from ENABLED to DISABLING state. If such transition succeeds the service
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed (the service is still in ENABLING state)
* then the service will still be transitioned to DISABLING state to ensure that
* no other transition could happen on this service. However in such event
* (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)}
* operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)}
* <br>
* Upon successful invocation of @OnDisabled this service will be transitioned to
* DISABLED state.
*/
@Override
public void disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
this.active.set(false); // de-activating regardless of CAS operation
// that follows since this operation will always result in service state being DISABLING
if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
} finally {
stateRef.set(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
}
}
});
} else {
this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
}
}
}

View File

@ -203,14 +203,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
nodeToDisable.verifyCanDisable(serviceSet);
}
nodeToDisable.verifyCanDisable(serviceSet);
}
Collections.reverse(toDisable);
@ -319,14 +316,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
logger.info("Will enable {} Controller Services", servicesToEnable.size());
}
// Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
// to be valid so that they can be scheduled.
for (final List<ControllerServiceNode> branch : branches) {
for (final ControllerServiceNode nodeToEnable : branch) {
nodeToEnable.setState(ControllerServiceState.ENABLING);
}
}
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
for (final List<ControllerServiceNode> branch : branches) {
@ -422,6 +411,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
processScheduler.disableControllerService(serviceNode);
}
@ -545,23 +535,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) {
if (!serviceNode.isActive()) {
serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
}
final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
for (final ControllerServiceNode nodeToEnable : toEnable) {
final ControllerServiceState state = nodeToEnable.getState();
if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
nodeToEnable.verifyCanEnable(ifEnabled);
ifEnabled.add(nodeToEnable);
}
}
for (final ControllerServiceNode nodeToEnable : toEnable) {
final ControllerServiceState state = nodeToEnable.getState();
if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
enableControllerService(nodeToEnable);
}
}
@ -606,11 +593,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
nodeToDisable.verifyCanDisable(serviceSet);
}
nodeToDisable.verifyCanDisable(serviceSet);
}
}

View File

@ -66,8 +66,7 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
if (component instanceof ControllerServiceNode) {
serviceNodes.add((ControllerServiceNode) component);
final ControllerServiceState state = ((ControllerServiceNode) component).getState();
if (state != ControllerServiceState.DISABLED) {
if (((ControllerServiceNode) component).isActive()) {
activeReferences.add(component);
}
} else if (isRunning(component)) {

View File

@ -16,18 +16,31 @@
*/
package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
@ -36,6 +49,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
@ -116,7 +130,13 @@ public class TestStandardProcessScheduler {
Thread.sleep(1000L);
scheduler.stopProcessor(procNode);
assertTrue(service.isActive());
assertTrue(service.getState() == ControllerServiceState.ENABLING);
scheduler.disableControllerService(service);
assertTrue(service.getState() == ControllerServiceState.DISABLING);
assertFalse(service.isActive());
Thread.sleep(1000);
assertTrue(service.getState() == ControllerServiceState.DISABLED);
}
@ -169,4 +189,237 @@ public class TestStandardProcessScheduler {
throw new IllegalStateException(e);
}
}
/**
* Validates the atomic nature of ControllerServiceNode.enable() method
* which must only trigger @OnEnabled once, regardless of how many threads
* may have a reference to the underlying ProcessScheduler and
* ControllerServiceNode.
*/
@Test
public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false);
assertFalse(serviceNode.isActive());
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive());
} catch (Exception e) {
e.printStackTrace();
asyncFailed.set(true);
}
}
});
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
assertFalse(asyncFailed.get());
assertEquals(1, ts.enableInvocationCount());
}
/**
* Validates the atomic nature of ControllerServiceNode.disable(..) method
* which must never trigger @OnDisabled, regardless of how many threads may
* have a reference to the underlying ProcessScheduler and
* ControllerServiceNode.
*/
@Test
public void validateDisabledServiceCantBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false);
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
} catch (Exception e) {
e.printStackTrace();
asyncFailed.set(true);
}
}
});
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
assertFalse(asyncFailed.get());
assertEquals(0, ts.disableInvocationCount());
}
/**
* Validates the atomic nature of ControllerServiceNode.disable() method
* which must only trigger @OnDisabled once, regardless of how many threads
* may have a reference to the underlying ProcessScheduler and
* ControllerServiceNode.
*/
@Test
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false);
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive());
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
} catch (Exception e) {
e.printStackTrace();
asyncFailed.set(true);
}
}
});
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
assertFalse(asyncFailed.get());
assertEquals(1, ts.disableInvocationCount());
}
/**
* Validates that service that is infinitely blocking in @OnEnabled can
* still have DISABLE operation initiated. The service itself will be set to
* DISABLING state at which point UI and all will know that such service can
* not be transitioned any more into any other state until it finishes
* enabling (which will never happen in our case thus should be addressed by
* user). However, regardless of user's mistake NiFi will remain
* functioning.
*/
@Test
public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", false);
LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);
scheduler.enableControllerService(serviceNode);
Thread.sleep(100);
assertTrue(serviceNode.isActive());
assertEquals(1, ts.enableInvocationCount());
Thread.sleep(1000);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
assertEquals(0, ts.disableInvocationCount());
}
/**
* Validates that the service that is currently in ENABLING state can be
* disabled and that its @OnDisabled operation will be invoked as soon
* as @OnEnable finishes.
*/
@Test
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", false);
LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(3000);
scheduler.enableControllerService(serviceNode);
Thread.sleep(100);
assertTrue(serviceNode.isActive());
assertEquals(1, ts.enableInvocationCount());
Thread.sleep(100);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
assertEquals(0, ts.disableInvocationCount());
// wait a bit. . . Enabling will finish and @OnDisabled will be invoked
// automatically
Thread.sleep(3000);
assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
assertEquals(1, ts.disableInvocationCount());
}
public static class SimpleTestService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger();
@OnEnabled
public void enable(ConfigurationContext context) {
this.enableCounter.incrementAndGet();
}
@OnDisabled
public void disable(ConfigurationContext context) {
this.disableCounter.incrementAndGet();
}
public int enableInvocationCount() {
return this.enableCounter.get();
}
public int disableInvocationCount() {
return this.disableCounter.get();
}
}
public static class LongEnablingService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger();
private volatile long limit;
@OnEnabled
public void enable(ConfigurationContext context) throws Exception {
this.enableCounter.incrementAndGet();
Thread.sleep(limit);
}
@OnDisabled
public void disable(ConfigurationContext context) {
this.disableCounter.incrementAndGet();
}
public int enableInvocationCount() {
return this.enableCounter.get();
}
public int disableInvocationCount() {
return this.disableCounter.get();
}
public void setLimit(long limit) {
this.limit = limit;
}
}
private ProcessScheduler createScheduler() {
return new StandardProcessScheduler(mock(Heartbeater.class), null, null);
}
}

View File

@ -120,7 +120,7 @@ public class TestStandardControllerServiceProvider {
@Test(timeout = 10000)
public void testConcurrencyWithEnablingReferencingServicesGraph() {
final ProcessScheduler scheduler = createScheduler();
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 10000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}