diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 481a23181c..b229af006a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -17,9 +17,11 @@
package org.apache.nifi.controller.service;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Heartbeater;
public interface ControllerServiceNode extends ConfiguredComponent {
@@ -53,11 +55,31 @@ public interface ControllerServiceNode extends ConfiguredComponent {
*/
ControllerServiceState getState();
- /**
- * Updates the state of the Controller Service to the provided new state
- * @param state the state to set the service to
+ /**
+ * Will enable this service. Enabling of the service typically means
+ * invoking it's operation that is annotated with @OnEnabled.
+ *
+ * @param scheduler
+ * implementation of {@link ScheduledExecutorService} used to
+ * initiate service enabling task as well as its re-tries
+ * @param administrativeYieldMillis
+ * the amount of milliseconds to wait for administrative yield
+ * @param heartbeater
+ * the instance of {@link Heartbeater}
*/
- void setState(ControllerServiceState state);
+ void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
+
+ /**
+ * Will disable this service. Disabling of the service typically means
+ * invoking it's operation that is annotated with @OnDisabled.
+ *
+ * @param scheduler
+ * implementation of {@link ScheduledExecutorService} used to
+ * initiate service disabling task
+ * @param heartbeater
+ * the instance of {@link Heartbeater}
+ */
+ void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
/**
* @return the ControllerServiceReference that describes which components are referencing this Controller Service
@@ -111,4 +133,17 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void verifyCanDelete();
void verifyCanUpdate();
+
+ /**
+ * Returns 'true' if this service is active. The service is considered to be
+ * active if and only if it's
+ * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
+ * has been invoked and the service has been transitioned to ENABLING state.
+ * The service will also remain 'active' after its been transitioned to
+ * ENABLED state.
+ *
+ * The service will be de-activated upon invocation of
+ * {@link #disable(ScheduledExecutorService, Heartbeater)}.
+ */
+ boolean isActive();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index ddd4d5c374..82fc812368 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -19,23 +19,15 @@ package org.apache.nifi.controller.scheduling;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -54,8 +46,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
@@ -89,7 +79,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ScheduledExecutorService frameworkTaskExecutor;
private final ConcurrentMap strategyAgentMap = new ConcurrentHashMap<>();
// thread pool for starting/stopping components
- private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(5000));
+
+ private final ScheduledExecutorService componentLifeCycleThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+
private final StringEncryptor encryptor;
public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
@@ -625,145 +617,20 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void enableControllerService(final ControllerServiceNode service) {
- service.setState(ControllerServiceState.ENABLING);
- final ScheduleState scheduleState = getScheduleState(service);
-
- final Runnable enableRunnable = new Runnable() {
- @Override
- public void run() {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- final long lastStopTime = scheduleState.getLastStopTime();
- final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
-
- while (true) {
- try {
- synchronized (scheduleState) {
- // if no longer enabled, then we're finished. This can happen, for example,
- // if the @OnEnabled method throws an Exception and the user disables the service
- // while we're administratively yielded.
- //
- // we also check if the schedule state's last stop time is equal to what it was before.
- // if not, then means that the service has been disabled and enabled again, so we should just
- // bail; another thread will be responsible for invoking the @OnEnabled methods.
- if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
- return;
- }
-
- ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
- heartbeater.heartbeat();
- service.setState(ControllerServiceState.ENABLED);
- return;
- }
- } catch (final Exception e) {
- final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
-
- final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
- componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
- LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", cause);
- }
-
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
- Thread.sleep(administrativeYieldMillis);
- continue;
- }
- }
- } catch (final Throwable t) {
- final Throwable cause = t instanceof InvocationTargetException ? t.getCause() : t;
- final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
- componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-
- LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", cause);
- }
- }
- }
- };
-
- scheduleState.setScheduled(true);
- componentLifeCycleThreadPool.execute(enableRunnable);
+ service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
}
-
@Override
public void disableControllerService(final ControllerServiceNode service) {
- disableControllerServices(Collections.singletonList(service));
+ service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
}
@Override
public void disableControllerServices(final List services) {
- if (requireNonNull(services).isEmpty()) {
- return;
- }
-
- final List servicesToDisable = new ArrayList<>(services.size());
- for (final ControllerServiceNode serviceToDisable : services) {
- if (serviceToDisable.getState() == ControllerServiceState.DISABLED || serviceToDisable.getState() == ControllerServiceState.DISABLING) {
- continue;
- }
-
- servicesToDisable.add(serviceToDisable);
- }
-
- if (servicesToDisable.isEmpty()) {
- return;
- }
-
- // ensure that all controller services can be disabled.
- for (final ControllerServiceNode serviceNode : servicesToDisable) {
- final Set ignoredReferences = new HashSet<>(services);
- ignoredReferences.remove(serviceNode);
- serviceNode.verifyCanDisable(ignoredReferences);
- }
-
- // mark services as disabling
- for (final ControllerServiceNode serviceNode : servicesToDisable) {
- serviceNode.setState(ControllerServiceState.DISABLING);
-
- final ScheduleState scheduleState = getScheduleState(serviceNode);
- synchronized (scheduleState) {
- scheduleState.setScheduled(false);
+ if (!requireNonNull(services).isEmpty()) {
+ for (ControllerServiceNode controllerServiceNode : services) {
+ this.disableControllerService(controllerServiceNode);
}
}
-
- final Queue nodes = new LinkedList<>(servicesToDisable);
- final Runnable disableRunnable = new Runnable() {
- @Override
- public void run() {
- ControllerServiceNode service;
- while ((service = nodes.poll()) != null) {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
-
- try {
- ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
- } catch (final Exception e) {
- final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
- final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
- componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
-
- LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
- if (LOG.isDebugEnabled()) {
- LOG.error("", cause);
- }
-
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
- } finally {
- service.setState(ControllerServiceState.DISABLED);
- heartbeater.heartbeat();
- }
- }
- }
- }
- };
-
- componentLifeCycleThreadPool.execute(disableRunnable);
}
-
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 8e7f1f5f4f..ce4a767e94 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,28 +16,41 @@
*/
package org.apache.nifi.controller.service;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
+ private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
+
private final ControllerService proxedControllerService;
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
@@ -51,12 +64,15 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final Set referencingComponents = new HashSet<>();
private String comment;
+ private final AtomicBoolean active;
+
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(implementation, id, validationContextFactory, serviceProvider);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;
+ this.active = new AtomicBoolean();
}
@Override
@@ -146,8 +162,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDisable(final Set ignoreReferences) {
- final ControllerServiceState state = getState();
- if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
+ if (!this.isActive()) {
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
}
@@ -229,7 +244,106 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
@Override
- public void setState(final ControllerServiceState state) {
- this.stateRef.set(state);
+ public boolean isActive() {
+ return this.active.get();
+ }
+
+ /**
+ * Will atomically enable this service by invoking its @OnEnabled operation.
+ * It uses CAS operation on {@link #stateRef} to transition this service
+ * from DISABLED to ENABLING state. If such transition succeeds the service
+ * will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
+ * If such transition doesn't succeed then no enabling logic will be
+ * performed and the method will exit. In other words it is safe to invoke
+ * this operation multiple times and from multiple threads.
+ *
+ * This operation will also perform re-try of service enabling in the event
+ * of exception being thrown by previous invocation of @OnEnabled.
+ *
+ * Upon successful invocation of @OnEnabled this service will be transitioned to
+ * ENABLED state.
+ *
+ * In the event where enabling took longer then expected by the user and such user
+ * initiated disable operation, this service will be automatically disabled as soon
+ * as it reached ENABLED state.
+ */
+ @Override
+ public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
+ if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)){
+ this.active.set(true);
+ final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
+ scheduler.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
+ if (stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED)) {
+ heartbeater.heartbeat();
+ } else {
+ LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
+ // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
+ // set to DISABLING (see disable() operation)
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+ stateRef.set(ControllerServiceState.DISABLED);
+ }
+ } catch (Exception e) {
+ final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+ final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
+ componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+ LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+ if (isActive()) {
+ scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
+ }
+ else {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+ stateRef.set(ControllerServiceState.DISABLED);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Will atomically disable this service by invoking its @OnDisabled operation.
+ * It uses CAS operation on {@link #stateRef} to transition this service
+ * from ENABLED to DISABLING state. If such transition succeeds the service
+ * will be de-activated (see {@link ControllerServiceNode#isActive()}).
+ * If such transition doesn't succeed (the service is still in ENABLING state)
+ * then the service will still be transitioned to DISABLING state to ensure that
+ * no other transition could happen on this service. However in such event
+ * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)}
+ * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)}
+ *
+ * Upon successful invocation of @OnDisabled this service will be transitioned to
+ * DISABLED state.
+ */
+ @Override
+ public void disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
+ this.active.set(false); // de-activating regardless of CAS operation
+ // that follows since this operation will always result in service state being DISABLING
+ if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
+ scheduler.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
+ } catch (Exception e) {
+ final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
+ final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
+ componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
+ LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
+ } finally {
+ stateRef.set(ControllerServiceState.DISABLED);
+ heartbeater.heartbeat();
+ }
+ }
+ });
+ } else {
+ this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 1f1a1c0f43..6561eb8ee5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -203,14 +203,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+
final Set serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
- final ControllerServiceState state = nodeToDisable.getState();
-
- if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
- nodeToDisable.verifyCanDisable(serviceSet);
- }
+ nodeToDisable.verifyCanDisable(serviceSet);
}
Collections.reverse(toDisable);
@@ -319,14 +316,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
logger.info("Will enable {} Controller Services", servicesToEnable.size());
}
- // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
- // to be valid so that they can be scheduled.
- for (final List branch : branches) {
- for (final ControllerServiceNode nodeToEnable : branch) {
- nodeToEnable.setState(ControllerServiceState.ENABLING);
- }
- }
-
final Set enabledNodes = Collections.synchronizedSet(new HashSet());
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
for (final List branch : branches) {
@@ -422,6 +411,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
+ serviceNode.verifyCanDisable();
processScheduler.disableControllerService(serviceNode);
}
@@ -545,23 +535,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
private void enableReferencingServices(final ControllerServiceNode serviceNode, final List recursiveReferences) {
- if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) {
+ if (!serviceNode.isActive()) {
serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
}
final Set ifEnabled = new HashSet<>();
- final List toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
- for (final ControllerServiceNode nodeToEnable : toEnable) {
- final ControllerServiceState state = nodeToEnable.getState();
- if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
+ for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
+ if (!nodeToEnable.isActive()) {
nodeToEnable.verifyCanEnable(ifEnabled);
ifEnabled.add(nodeToEnable);
}
}
- for (final ControllerServiceNode nodeToEnable : toEnable) {
- final ControllerServiceState state = nodeToEnable.getState();
- if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
+ for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
+ if (!nodeToEnable.isActive()) {
enableControllerService(nodeToEnable);
}
}
@@ -606,11 +593,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final Set serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
- final ControllerServiceState state = nodeToDisable.getState();
-
- if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
- nodeToDisable.verifyCanDisable(serviceSet);
- }
+ nodeToDisable.verifyCanDisable(serviceSet);
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index 701adcf29f..d2f3833750 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -66,8 +66,7 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
if (component instanceof ControllerServiceNode) {
serviceNodes.add((ControllerServiceNode) component);
- final ControllerServiceState state = ((ControllerServiceNode) component).getState();
- if (state != ControllerServiceState.DISABLED) {
+ if (((ControllerServiceNode) component).isActive()) {
activeReferences.add(component);
}
} else if (isRunning(component)) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 5575a23cf7..1f655de99d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -16,18 +16,31 @@
*/
package org.apache.nifi.controller.scheduling;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.Heartbeater;
+import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
@@ -36,6 +49,7 @@ import org.apache.nifi.controller.reporting.StandardReportingInitializationConte
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
@@ -116,7 +130,13 @@ public class TestStandardProcessScheduler {
Thread.sleep(1000L);
scheduler.stopProcessor(procNode);
+ assertTrue(service.isActive());
+ assertTrue(service.getState() == ControllerServiceState.ENABLING);
scheduler.disableControllerService(service);
+ assertTrue(service.getState() == ControllerServiceState.DISABLING);
+ assertFalse(service.isActive());
+ Thread.sleep(1000);
+ assertTrue(service.getState() == ControllerServiceState.DISABLED);
}
@@ -169,4 +189,237 @@ public class TestStandardProcessScheduler {
throw new IllegalStateException(e);
}
}
+ /**
+ * Validates the atomic nature of ControllerServiceNode.enable() method
+ * which must only trigger @OnEnabled once, regardless of how many threads
+ * may have a reference to the underlying ProcessScheduler and
+ * ControllerServiceNode.
+ */
+ @Test
+ public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
+ final ProcessScheduler scheduler = createScheduler();
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+ final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
+ "1", false);
+ assertFalse(serviceNode.isActive());
+ SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final AtomicBoolean asyncFailed = new AtomicBoolean();
+ for (int i = 0; i < 1000; i++) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ scheduler.enableControllerService(serviceNode);
+ assertTrue(serviceNode.isActive());
+ } catch (Exception e) {
+ e.printStackTrace();
+ asyncFailed.set(true);
+ }
+ }
+ });
+ }
+ // need to sleep a while since we are emulating async invocations on
+ // method that is also internally async
+ Thread.sleep(500);
+ executor.shutdown();
+ assertFalse(asyncFailed.get());
+ assertEquals(1, ts.enableInvocationCount());
+ }
+
+ /**
+ * Validates the atomic nature of ControllerServiceNode.disable(..) method
+ * which must never trigger @OnDisabled, regardless of how many threads may
+ * have a reference to the underlying ProcessScheduler and
+ * ControllerServiceNode.
+ */
+ @Test
+ public void validateDisabledServiceCantBeDisabled() throws Exception {
+ final ProcessScheduler scheduler = createScheduler();
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+ final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
+ "1", false);
+ SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final AtomicBoolean asyncFailed = new AtomicBoolean();
+ for (int i = 0; i < 1000; i++) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ scheduler.disableControllerService(serviceNode);
+ assertFalse(serviceNode.isActive());
+ } catch (Exception e) {
+ e.printStackTrace();
+ asyncFailed.set(true);
+ }
+ }
+ });
+ }
+ // need to sleep a while since we are emulating async invocations on
+ // method that is also internally async
+ Thread.sleep(500);
+ executor.shutdown();
+ assertFalse(asyncFailed.get());
+ assertEquals(0, ts.disableInvocationCount());
+ }
+
+ /**
+ * Validates the atomic nature of ControllerServiceNode.disable() method
+ * which must only trigger @OnDisabled once, regardless of how many threads
+ * may have a reference to the underlying ProcessScheduler and
+ * ControllerServiceNode.
+ */
+ @Test
+ public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
+ final ProcessScheduler scheduler = createScheduler();
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+ final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
+ "1", false);
+ SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
+ scheduler.enableControllerService(serviceNode);
+ assertTrue(serviceNode.isActive());
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final AtomicBoolean asyncFailed = new AtomicBoolean();
+ for (int i = 0; i < 1000; i++) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ scheduler.disableControllerService(serviceNode);
+ assertFalse(serviceNode.isActive());
+ } catch (Exception e) {
+ e.printStackTrace();
+ asyncFailed.set(true);
+ }
+ }
+ });
+ }
+ // need to sleep a while since we are emulating async invocations on
+ // method that is also internally async
+ Thread.sleep(500);
+ executor.shutdown();
+ assertFalse(asyncFailed.get());
+ assertEquals(1, ts.disableInvocationCount());
+ }
+
+ /**
+ * Validates that service that is infinitely blocking in @OnEnabled can
+ * still have DISABLE operation initiated. The service itself will be set to
+ * DISABLING state at which point UI and all will know that such service can
+ * not be transitioned any more into any other state until it finishes
+ * enabling (which will never happen in our case thus should be addressed by
+ * user). However, regardless of user's mistake NiFi will remain
+ * functioning.
+ */
+ @Test
+ public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
+ final ProcessScheduler scheduler = createScheduler();
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+ final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
+ "1", false);
+ LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
+ ts.setLimit(Long.MAX_VALUE);
+ scheduler.enableControllerService(serviceNode);
+ Thread.sleep(100);
+ assertTrue(serviceNode.isActive());
+ assertEquals(1, ts.enableInvocationCount());
+
+ Thread.sleep(1000);
+ scheduler.disableControllerService(serviceNode);
+ assertFalse(serviceNode.isActive());
+ assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
+ assertEquals(0, ts.disableInvocationCount());
+ }
+
+ /**
+ * Validates that the service that is currently in ENABLING state can be
+ * disabled and that its @OnDisabled operation will be invoked as soon
+ * as @OnEnable finishes.
+ */
+ @Test
+ public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
+ final ProcessScheduler scheduler = createScheduler();
+ StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
+ final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
+ "1", false);
+ LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
+ ts.setLimit(3000);
+ scheduler.enableControllerService(serviceNode);
+ Thread.sleep(100);
+ assertTrue(serviceNode.isActive());
+ assertEquals(1, ts.enableInvocationCount());
+
+ Thread.sleep(100);
+ scheduler.disableControllerService(serviceNode);
+ assertFalse(serviceNode.isActive());
+ assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
+ assertEquals(0, ts.disableInvocationCount());
+ // wait a bit. . . Enabling will finish and @OnDisabled will be invoked
+ // automatically
+ Thread.sleep(3000);
+ assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
+ assertEquals(1, ts.disableInvocationCount());
+ }
+
+ public static class SimpleTestService extends AbstractControllerService {
+
+ private final AtomicInteger enableCounter = new AtomicInteger();
+ private final AtomicInteger disableCounter = new AtomicInteger();
+
+ @OnEnabled
+ public void enable(ConfigurationContext context) {
+ this.enableCounter.incrementAndGet();
+ }
+
+ @OnDisabled
+ public void disable(ConfigurationContext context) {
+ this.disableCounter.incrementAndGet();
+ }
+
+ public int enableInvocationCount() {
+ return this.enableCounter.get();
+ }
+
+ public int disableInvocationCount() {
+ return this.disableCounter.get();
+ }
+ }
+
+ public static class LongEnablingService extends AbstractControllerService {
+ private final AtomicInteger enableCounter = new AtomicInteger();
+ private final AtomicInteger disableCounter = new AtomicInteger();
+
+ private volatile long limit;
+
+ @OnEnabled
+ public void enable(ConfigurationContext context) throws Exception {
+ this.enableCounter.incrementAndGet();
+ Thread.sleep(limit);
+ }
+
+ @OnDisabled
+ public void disable(ConfigurationContext context) {
+ this.disableCounter.incrementAndGet();
+ }
+
+ public int enableInvocationCount() {
+ return this.enableCounter.get();
+ }
+
+ public int disableInvocationCount() {
+ return this.disableCounter.get();
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
+ }
+
+ private ProcessScheduler createScheduler() {
+ return new StandardProcessScheduler(mock(Heartbeater.class), null, null);
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 5cd3648a1c..240000919b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -120,7 +120,7 @@ public class TestStandardControllerServiceProvider {
@Test(timeout = 10000)
public void testConcurrencyWithEnablingReferencingServicesGraph() {
final ProcessScheduler scheduler = createScheduler();
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 10000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}