NIFI-839: When disabling controller services, do so one-at-a-time in the process scheduler, so that we can ensure that all are disable-able and then disable them atomically

This commit is contained in:
Mark Payne 2015-08-11 15:03:39 -04:00
parent 8e5347156f
commit e409b6c5c1
6 changed files with 227 additions and 74 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller;
import java.util.List;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@ -162,6 +164,12 @@ public interface ProcessScheduler {
*/
void enableControllerService(ControllerServiceNode service);
/**
* Disables all of the given Controller Services in the order provided by the List
* @param services the controller services to disable
*/
void disableControllerServices(List<ControllerServiceNode> services);
/**
* Disables the Controller Service so that it can be updated
*

View File

@ -23,18 +23,57 @@ import org.apache.nifi.controller.ControllerService;
public interface ControllerServiceNode extends ConfiguredComponent {
/**
* <p>
* Returns a proxy implementation of the Controller Service that this ControllerServiceNode
* encapsulates. The object returned by this method may be passed to other components, as
* required. Invocations of methods on the object returned will be intercepted and the service's
* state will be verified before the underlying implementation's method is called.
* </p>
*
* @return a proxied ControllerService that can be addressed outside of the framework.
*/
ControllerService getProxiedControllerService();
/**
* <p>
* Returns the actual implementation of the Controller Service that this ControllerServiceNode
* encapsulates. This direct implementation should <strong>NEVER</strong> be passed to another
* pluggable component. This implementation should be addressed only by the framework itself.
* If providing the controller service to another pluggable component, provide it with the
* proxied entity obtained via {@link #getProxiedControllerService()}
* </p>
*
* @return the actual implementation of the Controller Service
*/
ControllerService getControllerServiceImplementation();
/**
* @return the current state of the Controller Service
*/
ControllerServiceState getState();
/**
* Updates the state of the Controller Service to the provided new state
* @param state the state to set the service to
*/
void setState(ControllerServiceState state);
/**
* @return the ControllerServiceReference that describes which components are referencing this Controller Service
*/
ControllerServiceReference getReferences();
/**
* Indicates that the given component is now referencing this Controller Service
* @param referringComponent the component referencing this service
*/
void addReference(ConfiguredComponent referringComponent);
/**
* Indicates that the given component is no longer referencing this Controller Service
* @param referringComponent the component that is no longer referencing this service
*/
void removeReference(ConfiguredComponent referringComponent);
void setComments(String comment);

View File

@ -19,7 +19,12 @@ package org.apache.nifi.controller.scheduling;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -670,47 +675,84 @@ public final class StandardProcessScheduler implements ProcessScheduler {
componentLifeCycleThreadPool.execute(enableRunnable);
}
@Override
public void disableControllerService(final ControllerServiceNode service) {
service.verifyCanDisable();
disableControllerServices(Collections.singletonList(service));
}
final ScheduleState state = getScheduleState(requireNonNull(service));
@Override
public void disableControllerServices(final List<ControllerServiceNode> services) {
if (requireNonNull(services).isEmpty()) {
return;
}
final List<ControllerServiceNode> servicesToDisable = new ArrayList<>(services.size());
for (final ControllerServiceNode serviceToDisable : services) {
if (serviceToDisable.getState() == ControllerServiceState.DISABLED || serviceToDisable.getState() == ControllerServiceState.DISABLING) {
continue;
}
servicesToDisable.add(serviceToDisable);
}
if (servicesToDisable.isEmpty()) {
return;
}
// ensure that all controller services can be disabled.
for (final ControllerServiceNode serviceNode : servicesToDisable) {
final Set<ControllerServiceNode> ignoredReferences = new HashSet<>(services);
ignoredReferences.remove(serviceNode);
serviceNode.verifyCanDisable(ignoredReferences);
}
// mark services as disabling
for (final ControllerServiceNode serviceNode : servicesToDisable) {
serviceNode.setState(ControllerServiceState.DISABLING);
final ScheduleState scheduleState = getScheduleState(serviceNode);
synchronized (scheduleState) {
scheduleState.setScheduled(false);
}
}
final Queue<ControllerServiceNode> nodes = new LinkedList<>(servicesToDisable);
final Runnable disableRunnable = new Runnable() {
@Override
public void run() {
synchronized (state) {
state.setScheduled(false);
}
ControllerServiceNode service;
while ((service = nodes.poll()) != null) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null);
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", cause);
}
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
} finally {
service.setState(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
}
} finally {
service.setState(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
}
}
}
};
service.setState(ControllerServiceState.DISABLING);
componentLifeCycleThreadPool.execute(disableRunnable);
}
}

View File

@ -153,11 +153,17 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
final ControllerServiceReference references = getReferences();
final Set<ConfiguredComponent> activeReferences = new HashSet<>();
for (final ConfiguredComponent activeReference : references.getActiveReferences()) {
if (!ignoreReferences.contains(activeReference)) {
throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
activeReferences.add(activeReference);
}
}
if (!activeReferences.isEmpty()) {
throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + activeReferences.size() +
" components that are currently running: " + activeReferences);
}
}
@Override

View File

@ -214,13 +214,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
Collections.reverse(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
disableControllerService(nodeToDisable);
}
}
processScheduler.disableControllerServices(toDisable);
}
@Override
@ -428,7 +422,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
processScheduler.disableControllerService(serviceNode);
}

View File

@ -20,22 +20,28 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -43,29 +49,14 @@ import org.mockito.stubbing.Answer;
public class TestStandardControllerServiceProvider {
@BeforeClass
public static void setNiFiProps() {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
}
private ProcessScheduler createScheduler() {
final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final ControllerServiceNode node = (ControllerServiceNode) invocation.getArguments()[0];
node.verifyCanEnable();
node.setState(ControllerServiceState.ENABLED);
return null;
}
}).when(scheduler).enableControllerService(Mockito.any(ControllerServiceNode.class));
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final ControllerServiceNode node = (ControllerServiceNode) invocation.getArguments()[0];
node.verifyCanDisable();
node.setState(ControllerServiceState.DISABLED);
return null;
}
}).when(scheduler).disableControllerService(Mockito.any(ControllerServiceNode.class));
return scheduler;
final Heartbeater heartbeater = Mockito.mock(Heartbeater.class);
return new StandardProcessScheduler(heartbeater, null, null);
}
@Test
@ -78,7 +69,7 @@ public class TestStandardControllerServiceProvider {
provider.disableControllerService(serviceNode);
}
@Test
@Test(timeout=10000)
public void testEnableDisableWithReference() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
@ -104,10 +95,22 @@ public class TestStandardControllerServiceProvider {
}
provider.disableControllerService(serviceNodeA);
waitForServiceState(serviceNodeA, ControllerServiceState.DISABLED);
provider.disableControllerService(serviceNodeB);
waitForServiceState(serviceNodeB, ControllerServiceState.DISABLED);
}
@Test
private void waitForServiceState(final ControllerServiceNode service, final ControllerServiceState desiredState) {
while (service.getState() != desiredState) {
try {
Thread.sleep(50L);
} catch (final InterruptedException e) {
}
}
}
@Test(timeout=10000)
public void testEnableReferencingServicesGraph() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
@ -137,12 +140,20 @@ public class TestStandardControllerServiceProvider {
provider.enableControllerService(serviceNode4);
provider.enableReferencingServices(serviceNode4);
assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
// Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED.
// Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED.
final Set<ControllerServiceState> validStates = new HashSet<>();
validStates.add(ControllerServiceState.ENABLED);
validStates.add(ControllerServiceState.ENABLING);
while (serviceNode3.getState() != ControllerServiceState.ENABLED || serviceNode2.getState() != ControllerServiceState.ENABLED || serviceNode1.getState() != ControllerServiceState.ENABLED) {
assertTrue(validStates.contains(serviceNode3.getState()));
assertTrue(validStates.contains(serviceNode2.getState()));
assertTrue(validStates.contains(serviceNode1.getState()));
}
}
@Test
@Test(timeout=10000)
public void testStartStopReferencingComponents() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
@ -208,9 +219,17 @@ public class TestStandardControllerServiceProvider {
provider.enableReferencingServices(serviceNode4);
provider.scheduleReferencingComponents(serviceNode4);
assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
final Set<ControllerServiceState> enableStates = new HashSet<>();
enableStates.add(ControllerServiceState.ENABLED);
enableStates.add(ControllerServiceState.ENABLING);
while (serviceNode3.getState() != ControllerServiceState.ENABLED
|| serviceNode2.getState() != ControllerServiceState.ENABLED
|| serviceNode1.getState() != ControllerServiceState.ENABLED) {
assertTrue(enableStates.contains(serviceNode3.getState()));
assertTrue(enableStates.contains(serviceNode2.getState()));
assertTrue(enableStates.contains(serviceNode1.getState()));
}
assertTrue(procNodeA.isRunning());
assertTrue(procNodeB.isRunning());
@ -218,18 +237,32 @@ public class TestStandardControllerServiceProvider {
provider.unscheduleReferencingComponents(serviceNode4);
assertFalse(procNodeA.isRunning());
assertFalse(procNodeB.isRunning());
assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
while (serviceNode3.getState() != ControllerServiceState.ENABLED
|| serviceNode2.getState() != ControllerServiceState.ENABLED
|| serviceNode1.getState() != ControllerServiceState.ENABLED) {
assertTrue(enableStates.contains(serviceNode3.getState()));
assertTrue(enableStates.contains(serviceNode2.getState()));
assertTrue(enableStates.contains(serviceNode1.getState()));
}
provider.disableReferencingServices(serviceNode4);
assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
final Set<ControllerServiceState> disableStates = new HashSet<>();
disableStates.add(ControllerServiceState.DISABLED);
disableStates.add(ControllerServiceState.DISABLING);
// Wait for the services to be disabled.
while (serviceNode3.getState() != ControllerServiceState.DISABLED
|| serviceNode2.getState() != ControllerServiceState.DISABLED
|| serviceNode1.getState() != ControllerServiceState.DISABLED) {
assertTrue(disableStates.contains(serviceNode3.getState()));
assertTrue(disableStates.contains(serviceNode2.getState()));
assertTrue(disableStates.contains(serviceNode1.getState()));
}
assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
provider.disableControllerService(serviceNode4);
assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
assertTrue(disableStates.contains(serviceNode4.getState()));
}
@Test
@ -376,4 +409,36 @@ public class TestStandardControllerServiceProvider {
assertTrue(ordered.get(1) == serviceNode3);
}
private ProcessorNode createProcessor(final ProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) {
final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider);
final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null);
group.addProcessor(procNode);
procNode.setProcessGroup(group);
return procNode;
}
@Test
public void testEnableReferencingComponents() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false);
final ProcessorNode procNode = createProcessor(scheduler, provider);
serviceNode.addReference(procNode);
procNode.setScheduledState(ScheduledState.STOPPED);
provider.unscheduleReferencingComponents(serviceNode);
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
procNode.setScheduledState(ScheduledState.RUNNING);
provider.unscheduleReferencingComponents(serviceNode);
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
procNode.setScheduledState(ScheduledState.DISABLED);
provider.unscheduleReferencingComponents(serviceNode);
assertEquals(ScheduledState.DISABLED, procNode.getScheduledState());
}
}