NIFI-1606: Run the onComponentRemoved logic of state providers in a background thread

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2016-03-08 13:21:00 -05:00 committed by joewitt
parent 07d4d7005b
commit 5a8b2cf7f1
3 changed files with 36 additions and 8 deletions

View File

@ -38,6 +38,7 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@ -46,12 +47,12 @@ import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
@ -73,7 +74,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final AtomicReference<Position> position;
private final AtomicReference<String> comments;
private final ProcessScheduler scheduler;
private final StandardProcessScheduler scheduler;
private final ControllerServiceProvider controllerServiceProvider;
private final FlowController flowController;
@ -93,8 +94,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor,
final FlowController flowController) {
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler,
final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController) {
this.id = id;
this.controllerServiceProvider = serviceProvider;
this.parent = new AtomicReference<>();
@ -729,7 +730,13 @@ public final class StandardProcessGroup implements ProcessGroup {
processors.remove(id);
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
flowController.getStateManagerProvider().onComponentRemoved(processor.getIdentifier());
final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
scheduler.submitFrameworkTask(new Runnable() {
@Override
public void run() {
stateManagerProvider.onComponentRemoved(processor.getIdentifier());
}
});
// must copy to avoid a concurrent modification
final Set<Connection> copy = new HashSet<>(processor.getConnections());

View File

@ -75,7 +75,7 @@ public class TestStandardControllerServiceProvider {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
}
private ProcessScheduler createScheduler() {
private StandardProcessScheduler createScheduler() {
final Heartbeater heartbeater = Mockito.mock(Heartbeater.class);
return new StandardProcessScheduler(heartbeater, null, null, stateManagerProvider);
}
@ -331,7 +331,7 @@ public class TestStandardControllerServiceProvider {
assertTrue(ordered.get(1) == serviceNode3);
}
private ProcessorNode createProcessor(final ProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) {
private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) {
final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider);
@ -344,7 +344,7 @@ public class TestStandardControllerServiceProvider {
@Test
public void testEnableReferencingComponents() {
final ProcessScheduler scheduler = createScheduler();
final StandardProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider);
final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false);

View File

@ -188,5 +188,26 @@ public abstract class AbstractTestStateProvider {
assertFalse(replaced);
}
@Test
public void testOnComponentRemoved() throws IOException, InterruptedException {
final StateProvider provider = getProvider();
final Map<String, String> newValue = new HashMap<>();
newValue.put("value", "value");
provider.setState(newValue, componentId);
final StateMap stateMap = provider.getState(componentId);
assertEquals(0L, stateMap.getVersion());
provider.onComponentRemoved(componentId);
// wait for the background process to complete
Thread.sleep(1000L);
final StateMap stateMapAfterRemoval = provider.getState(componentId);
// version should be -1 because the state has been removed entirely.
assertEquals(-1L, stateMapAfterRemoval.getVersion());
}
protected abstract StateProvider getProvider();
}