NIFI-1164 addressed PR comment

Added isActive check to the StandardControllerServiceNode:280 to ensure that
the IF statement can only have a chance to succeed if service is active. The service
will be indiscriminately deactivated as soon as disable(..) operation is invoked. This itself will
eliminate the race condition discovered by Mark

NIFI-1164 addressed PR comments
fixed the race condition described by Mark during disable call

NIFI-1164 polished javadoc
This commit is contained in:
Oleg Zhurakousky 2015-12-28 11:11:58 -05:00
parent 909c0decd6
commit 0d09054d9f
3 changed files with 130 additions and 21 deletions

View File

@ -66,8 +66,9 @@ public interface ControllerServiceNode extends ConfiguredComponent {
* the amount of milliseconds to wait for administrative yield
* @param heartbeater
* the instance of {@link Heartbeater}
* @return 'true' if service was enabled
*/
void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
boolean enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
/**
* Will disable this service. Disabling of the service typically means
@ -78,8 +79,9 @@ public interface ControllerServiceNode extends ConfiguredComponent {
* initiate service disabling task
* @param heartbeater
* the instance of {@link Heartbeater}
* @return 'true' if service was disabled
*/
void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
boolean disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
/**
* @return the ControllerServiceReference that describes which components are referencing this Controller Service
@ -140,8 +142,7 @@ public interface ControllerServiceNode extends ConfiguredComponent {
* {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
* has been invoked and the service has been transitioned to ENABLING state.
* The service will also remain 'active' after its been transitioned to
* ENABLED state.
* <br>
* ENABLED state. <br>
* The service will be de-activated upon invocation of
* {@link #disable(ScheduledExecutorService, Heartbeater)}.
*/

View File

@ -268,8 +268,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
* as it reached ENABLED state.
*/
@Override
public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)){
public boolean enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, final Heartbeater heartbeater) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
this.active.set(true);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
scheduler.execute(new Runnable() {
@ -277,13 +277,17 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
if (stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED)) {
boolean shouldEnable = false;
synchronized (configContext) {
shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
}
if (shouldEnable) {
heartbeater.heartbeat();
} else {
LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
// Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
// set to DISABLING (see disable() operation)
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
invokeDisable(configContext, heartbeater);
stateRef.set(ControllerServiceState.DISABLED);
}
} catch (Exception e) {
@ -291,7 +295,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
invokeDisable(configContext, heartbeater);
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
}
@ -303,6 +307,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
});
}
return this.active.get();
}
/**
@ -320,22 +325,24 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
* DISABLED state.
*/
@Override
public void disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
this.active.set(false); // de-activating regardless of CAS operation
// that follows since this operation will always result in service state being DISABLING
public boolean disable(final ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
/*
* The reason for synchronization is to ensure consistency of the
* service state when another thread is in the middle of enabling this
* service since it will attempt to transition service state from
* ENABLING to ENABLED but only if it's active.
*/
synchronized (this.active) {
this.active.set(false);
}
if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
invokeDisable(configContext, heartbeater);
} finally {
stateRef.set(ControllerServiceState.DISABLED);
heartbeater.heartbeat();
@ -345,5 +352,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
} else {
this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
}
return !this.active.get();
}
/**
*
*/
private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
}
}
}

View File

@ -24,14 +24,13 @@ import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -50,6 +49,7 @@ import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
@ -306,6 +306,71 @@ public class TestStandardProcessScheduler {
assertEquals(1, ts.disableInvocationCount());
}
@Test
public void validateDisablingOfTheFailedService() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
"1", false);
scheduler.enableControllerService(serviceNode);
Thread.sleep(1000);
scheduler.shutdown();
/*
* Because it was never disabled it will remain active since its
* enabling is being retried. This may actually be a bug in the
* scheduler since it probably has to shut down all components (disable
* services, shut down processors etc) before shutting down itself
*/
assertTrue(serviceNode.isActive());
assertTrue(serviceNode.getState() == ControllerServiceState.ENABLING);
}
/**
* Validates that in multi threaded environment enabling service can still
* be disabled. This test is set up in such way that disabling of the
* service could be initiated by both disable and enable methods. In other
* words it tests two conditions in
* {@link StandardControllerServiceNode#disable(java.util.concurrent.ScheduledExecutorService, Heartbeater)}
* where the disabling of the service can be initiated right there (if
* ENABLED), or if service is still enabling its disabling will be deferred
* to the logic in
* {@link StandardControllerServiceNode#enable(java.util.concurrent.ScheduledExecutorService, long, Heartbeater)}
* IN any even the resulting state of the service is DISABLED
*/
@Test
public void validateEnabledDisableMultiThread() throws Exception {
final ProcessScheduler scheduler = createScheduler();
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 200; i++) {
final ControllerServiceNode serviceNode = provider
.createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false);
executor.execute(new Runnable() {
@Override
public void run() {
scheduler.enableControllerService(serviceNode);
}
});
Thread.sleep(2); // ensure that enable gets initiated before disable
executor.execute(new Runnable() {
@Override
public void run() {
scheduler.disableControllerService(serviceNode);
}
});
Thread.sleep(25);
assertFalse(serviceNode.isActive());
assertTrue(serviceNode.getState() == ControllerServiceState.DISABLED);
}
// need to sleep a while since we are emulating async invocations on
// method that is also internally async
Thread.sleep(500);
executor.shutdown();
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
}
/**
* Validates that service that is infinitely blocking in @OnEnabled can
* still have DISABLE operation initiated. The service itself will be set to
@ -365,6 +430,26 @@ public class TestStandardProcessScheduler {
assertEquals(1, ts.disableInvocationCount());
}
public static class FailingService extends AbstractControllerService {
@OnEnabled
public void enable(ConfigurationContext context) {
throw new RuntimeException("intentional");
}
}
public static class RandomShortDelayEnablingService extends AbstractControllerService {
private final Random random = new Random();
@OnEnabled
public void enable(ConfigurationContext context) {
try {
Thread.sleep(random.nextInt(20));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static class SimpleTestService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();