Merge branch 'NIFI-1164' of https://github.com/olegz/nifi into NIFI-1164

This commit is contained in:
Mark Payne 2015-12-31 15:22:09 -05:00
commit 71544cd22b
7 changed files with 531 additions and 179 deletions

View File

@ -17,9 +17,11 @@
package org.apache.nifi.controller.service;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
public interface ControllerServiceNode extends ConfiguredComponent {
@ -53,11 +55,31 @@ public interface ControllerServiceNode extends ConfiguredComponent {
*/
ControllerServiceState getState();
/**
* Updates the state of the Controller Service to the provided new state
* @param state the state to set the service to
/**
* Will enable this service. Enabling of the service typically means
* invoking it's operation that is annotated with @OnEnabled.
*
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate service enabling task as well as its re-tries
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
* @param heartbeater
* the instance of {@link Heartbeater}
*/
void setState(ControllerServiceState state);
void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
/**
* Will disable this service. Disabling of the service typically means
* invoking it's operation that is annotated with @OnDisabled.
*
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate service disabling task
* @param heartbeater
* the instance of {@link Heartbeater}
*/
void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
/**
* @return the ControllerServiceReference that describes which components are referencing this Controller Service
@ -111,4 +133,16 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void verifyCanDelete();
void verifyCanUpdate();
/**
* Returns 'true' if this service is active. The service is considered to be
* active if and only if it's
* {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
* has been invoked and the service has been transitioned to ENABLING state.
* The service will also remain 'active' after its been transitioned to
* ENABLED state. <br>
* The service will be de-activated upon invocation of
* {@link #disable(ScheduledExecutorService, Heartbeater)}.
*/
boolean isActive();
}

View File

@ -19,23 +19,15 @@ package org.apache.nifi.controller.scheduling;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -54,8 +46,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
@ -89,7 +79,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ScheduledExecutorService frameworkTaskExecutor;
private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
// thread pool for starting/stopping components
private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000));
private final ScheduledExecutorService componentLifeCycleThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private final StringEncryptor encryptor;
public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
@ -625,145 +617,20 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void enableControllerService(final ControllerServiceNode service) {
service.setState(ControllerServiceState.ENABLING);
final ScheduleState scheduleState = getScheduleState(service);
final Runnable enableRunnable = new Runnable() {
@Override
public void run() {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final long lastStopTime = scheduleState.getLastStopTime();
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
while (true) {
try {
synchronized (scheduleState) {
// if no longer enabled, then we're finished. This can happen, for example,
// if the @OnEnabled method throws an Exception and the user disables the service
// while we're administratively yielded.
//
// we also check if the schedule state's last stop time is equal to what it was before.
// if not, then means that the service has been disabled and enabled again, so we should just
// bail; another thread will be responsible for invoking the @OnEnabled methods.
if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
return;
}
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
heartbeater.heartbeat();
service.setState(ControllerServiceState.ENABLED);
return;
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
Thread.sleep(administrativeYieldMillis);
continue;
}
}
} catch (final Throwable t) {
final Throwable cause = t instanceof InvocationTargetException ? t.getCause() : t;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
}
}
};
scheduleState.setScheduled(true);
componentLifeCycleThreadPool.execute(enableRunnable);
service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
}
@Override
public void disableControllerService(final ControllerServiceNode service) {
disableControllerServices(Collections.singletonList(service));
service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
}
@Override
public void disableControllerServices(final List<ControllerServiceNode> services) {
if (requireNonNull(services).isEmpty()) {
return;
}
final List<ControllerServiceNode> servicesToDisable = new ArrayList<>(services.size());
for (final ControllerServiceNode serviceToDisable : services) {
if (serviceToDisable.getState() == ControllerServiceState.DISABLED || serviceToDisable.getState() == ControllerServiceState.DISABLING) {
continue;
}
servicesToDisable.add(serviceToDisable);
}
if (servicesToDisable.isEmpty()) {
return;
}
// ensure that all controller services can be disabled.
for (final ControllerServiceNode serviceNode : servicesToDisable) {
final Set<ControllerServiceNode> ignoredReferences = new HashSet<>(services);
ignoredReferences.remove(serviceNode);
serviceNode.verifyCanDisable(ignoredReferences);
}
// mark services as disabling
for (final ControllerServiceNode serviceNode : servicesToDisable) {
serviceNode.setState(ControllerServiceState.DISABLING);
final ScheduleState scheduleState = getScheduleState(serviceNode);
synchronized (scheduleState) {
scheduleState.setScheduled(false);
if (!requireNonNull(services).isEmpty()) {
for (ControllerServiceNode controllerServiceNode : services) {
this.disableControllerService(controllerServiceNode);
}
}
final Queue<ControllerServiceNode> nodes = new LinkedList<>(servicesToDisable);
final Runnable disableRunnable = new Runnable() {
@Override
public void run() {
ControllerServiceNode service;
while ((service = nodes.poll()) != null) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
} finally {
service.setState(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
}
}
}
}
};
componentLifeCycleThreadPool.execute(disableRunnable);
}
}

View File

@ -16,28 +16,41 @@
*/
package org.apache.nifi.controller.service;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
private final ControllerService proxedControllerService;
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
@ -51,12 +64,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
private String comment;
private final AtomicBoolean active;
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(implementation, id, validationContextFactory, serviceProvider);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;
this.active = new AtomicBoolean();
}
@Override
@ -146,8 +162,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
final ControllerServiceState state = getState();
if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
if (!this.isActive()) {
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
}
@ -229,7 +244,123 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
@Override
public void setState(final ControllerServiceState state) {
this.stateRef.set(state);
public boolean isActive() {
return this.active.get();
}
/**
* Will atomically enable this service by invoking its @OnEnabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from DISABLED to ENABLING state. If such transition succeeds the service
* will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed then no enabling logic will be
* performed and the method will exit. In other words it is safe to invoke
* this operation multiple times and from multiple threads.
* <br>
* This operation will also perform re-try of service enabling in the event
* of exception being thrown by previous invocation of @OnEnabled.
* <br>
* Upon successful invocation of @OnEnabled this service will be transitioned to
* ENABLED state.
* <br>
* In the event where enabling took longer then expected by the user and such user
* initiated disable operation, this service will be automatically disabled as soon
* as it reached ENABLED state.
*/
@Override
public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis,
final Heartbeater heartbeater) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
this.active.set(true);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
boolean shouldEnable = false;
synchronized (active) {
shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
}
if (shouldEnable) {
heartbeater.heartbeat();
} else {
LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
// Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
// set to DISABLING (see disable() operation)
invokeDisable(configContext, heartbeater);
stateRef.set(ControllerServiceState.DISABLED);
}
} catch (Exception e) {
invokeDisable(configContext, heartbeater);
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
}
else {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
stateRef.set(ControllerServiceState.DISABLED);
}
}
}
});
}
}
/**
* Will atomically disable this service by invoking its @OnDisabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from ENABLED to DISABLING state. If such transition succeeds the service
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed (the service is still in ENABLING state)
* then the service will still be transitioned to DISABLING state to ensure that
* no other transition could happen on this service. However in such event
* (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)}
* operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)}
* <br>
* Upon successful invocation of @OnDisabled this service will be transitioned to
* DISABLED state.
*/
@Override
public void disable(ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
/*
* The reason for synchronization is to ensure consistency of the
* service state when another thread is in the middle of enabling this
* service since it will attempt to transition service state from
* ENABLING to ENABLED but only if it's active.
*/
synchronized (this.active) {
this.active.set(false);
}
if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
invokeDisable(configContext, heartbeater);
} finally {
stateRef.set(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
}
}
});
} else {
this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
}
}
/**
*
*/
private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}
}
}

View File

@ -203,14 +203,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
nodeToDisable.verifyCanDisable(serviceSet);
}
nodeToDisable.verifyCanDisable(serviceSet);
}
Collections.reverse(toDisable);
@ -319,14 +316,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
logger.info("Will enable {} Controller Services", servicesToEnable.size());
}
// Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
// to be valid so that they can be scheduled.
for (final List<ControllerServiceNode> branch : branches) {
for (final ControllerServiceNode nodeToEnable : branch) {
nodeToEnable.setState(ControllerServiceState.ENABLING);
}
}
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
for (final List<ControllerServiceNode> branch : branches) {
@ -422,6 +411,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
processScheduler.disableControllerService(serviceNode);
}
@ -545,23 +535,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) {
if (!serviceNode.isActive()) {
serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
}
final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
for (final ControllerServiceNode nodeToEnable : toEnable) {
final ControllerServiceState state = nodeToEnable.getState();
if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
nodeToEnable.verifyCanEnable(ifEnabled);
ifEnabled.add(nodeToEnable);
}
}
for (final ControllerServiceNode nodeToEnable : toEnable) {
final ControllerServiceState state = nodeToEnable.getState();
if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
enableControllerService(nodeToEnable);
}
}
@ -606,11 +593,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
nodeToDisable.verifyCanDisable(serviceSet);
}
nodeToDisable.verifyCanDisable(serviceSet);
}
}

View File

@ -66,8 +66,7 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
if (component instanceof ControllerServiceNode) {
serviceNodes.add((ControllerServiceNode) component);
final ControllerServiceState state = ((ControllerServiceNode) component).getState();
if (state != ControllerServiceState.DISABLED) {
if (((ControllerServiceNode) component).isActive()) {
activeReferences.add(component);
}
} else if (isRunning(component)) {

View File

@ -16,18 +16,30 @@
*/
package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
@ -36,6 +48,8 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
@ -116,7 +130,13 @@ public class TestStandardProcessScheduler {
Thread.sleep(1000L);
scheduler.stopProcessor(procNode);
assertTrue(service.isActive());
assertTrue(service.getState() == ControllerServiceState.ENABLING);
scheduler.disableControllerService(service);
assertTrue(service.getState() == ControllerServiceState.DISABLING);
assertFalse(service.isActive());
Thread.sleep(1000);
assertTrue(service.getState() == ControllerServiceState.DISABLED);
}
@ -169,4 +189,322 @@ public class TestStandardProcessScheduler {
throw new IllegalStateException(e);
}
}
/**
* Validates the atomic nature of ControllerServiceNode.enable() method
* which must only trigger @OnEnabled once, regardless of how many threads
* may have a reference to the underlying ProcessScheduler and
* ControllerServiceNode.
*/
@Test
public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false);
assertFalse(serviceNode.isActive());
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive());
} catch (Exception e) {
e.printStackTrace();
asyncFailed.set(true);
}
}
});
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
assertFalse(asyncFailed.get());
assertEquals(1, ts.enableInvocationCount());
}
/**
* Validates the atomic nature of ControllerServiceNode.disable(..) method
* which must never trigger @OnDisabled, regardless of how many threads may
* have a reference to the underlying ProcessScheduler and
* ControllerServiceNode.
*/
@Test
public void validateDisabledServiceCantBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false);
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
} catch (Exception e) {
e.printStackTrace();
asyncFailed.set(true);
}
}
});
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
assertFalse(asyncFailed.get());
assertEquals(0, ts.disableInvocationCount());
}
/**
* Validates the atomic nature of ControllerServiceNode.disable() method
* which must only trigger @OnDisabled once, regardless of how many threads
* may have a reference to the underlying ProcessScheduler and
* ControllerServiceNode.
*/
@Test
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
"1", false);
SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
scheduler.enableControllerService(serviceNode);
assertTrue(serviceNode.isActive());
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean asyncFailed = new AtomicBoolean();
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
} catch (Exception e) {
e.printStackTrace();
asyncFailed.set(true);
}
}
});
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
assertFalse(asyncFailed.get());
assertEquals(1, ts.disableInvocationCount());
}
@Test
public void validateDisablingOfTheFailedService() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
"1", false);
scheduler.enableControllerService(serviceNode);
Thread.sleep(1000);
scheduler.shutdown();
/*
* Because it was never disabled it will remain active since its
* enabling is being retried. This may actually be a bug in the
* scheduler since it probably has to shut down all components (disable
* services, shut down processors etc) before shutting down itself
*/
assertTrue(serviceNode.isActive());
assertTrue(serviceNode.getState() == ControllerServiceState.ENABLING);
}
/**
* Validates that in multi threaded environment enabling service can still
* be disabled. This test is set up in such way that disabling of the
* service could be initiated by both disable and enable methods. In other
* words it tests two conditions in
* {@link StandardControllerServiceNode#disable(java.util.concurrent.ScheduledExecutorService, Heartbeater)}
* where the disabling of the service can be initiated right there (if
* ENABLED), or if service is still enabling its disabling will be deferred
* to the logic in
* {@link StandardControllerServiceNode#enable(java.util.concurrent.ScheduledExecutorService, long, Heartbeater)}
* IN any even the resulting state of the service is DISABLED
*/
@Test
public void validateEnabledDisableMultiThread() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 200; i++) {
final ControllerServiceNode serviceNode = provider
.createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false);
executor.execute(new Runnable() {
@Override
public void run() {
scheduler.enableControllerService(serviceNode);
}
});
Thread.sleep(2); // ensure that enable gets initiated before disable
executor.execute(new Runnable() {
@Override
public void run() {
scheduler.disableControllerService(serviceNode);
}
});
Thread.sleep(25);
assertFalse(serviceNode.isActive());
assertTrue(serviceNode.getState() == ControllerServiceState.DISABLED);
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
}
/**
* Validates that service that is infinitely blocking in @OnEnabled can
* still have DISABLE operation initiated. The service itself will be set to
* DISABLING state at which point UI and all will know that such service can
* not be transitioned any more into any other state until it finishes
* enabling (which will never happen in our case thus should be addressed by
* user). However, regardless of user's mistake NiFi will remain
* functioning.
*/
@Test
public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", false);
LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);
scheduler.enableControllerService(serviceNode);
Thread.sleep(100);
assertTrue(serviceNode.isActive());
assertEquals(1, ts.enableInvocationCount());
Thread.sleep(1000);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
assertEquals(0, ts.disableInvocationCount());
}
/**
* Validates that the service that is currently in ENABLING state can be
* disabled and that its @OnDisabled operation will be invoked as soon
* as @OnEnable finishes.
*/
@Test
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", false);
LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(3000);
scheduler.enableControllerService(serviceNode);
Thread.sleep(100);
assertTrue(serviceNode.isActive());
assertEquals(1, ts.enableInvocationCount());
Thread.sleep(100);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
assertEquals(0, ts.disableInvocationCount());
// wait a bit. . . Enabling will finish and @OnDisabled will be invoked
// automatically
Thread.sleep(3000);
assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
assertEquals(1, ts.disableInvocationCount());
}
public static class FailingService extends AbstractControllerService {
@OnEnabled
public void enable(ConfigurationContext context) {
throw new RuntimeException("intentional");
}
}
public static class RandomShortDelayEnablingService extends AbstractControllerService {
private final Random random = new Random();
@OnEnabled
public void enable(ConfigurationContext context) {
try {
Thread.sleep(random.nextInt(20));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static class SimpleTestService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger();
@OnEnabled
public void enable(ConfigurationContext context) {
this.enableCounter.incrementAndGet();
}
@OnDisabled
public void disable(ConfigurationContext context) {
this.disableCounter.incrementAndGet();
}
public int enableInvocationCount() {
return this.enableCounter.get();
}
public int disableInvocationCount() {
return this.disableCounter.get();
}
}
public static class LongEnablingService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger();
private volatile long limit;
@OnEnabled
public void enable(ConfigurationContext context) throws Exception {
this.enableCounter.incrementAndGet();
Thread.sleep(limit);
}
@OnDisabled
public void disable(ConfigurationContext context) {
this.disableCounter.incrementAndGet();
}
public int enableInvocationCount() {
return this.enableCounter.get();
}
public int disableInvocationCount() {
return this.disableCounter.get();
}
public void setLimit(long limit) {
this.limit = limit;
}
}
private ProcessScheduler createScheduler() {
return new StandardProcessScheduler(mock(Heartbeater.class), null, null);
}
}

View File

@ -120,7 +120,7 @@ public class TestStandardControllerServiceProvider {
@Test(timeout = 10000)
public void testConcurrencyWithEnablingReferencingServicesGraph() {
final ProcessScheduler scheduler = createScheduler();
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 10000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}