mirror of https://github.com/apache/nifi.git
NIFI-1464 addressed PR comments from @apiri and @markap14
This commit is contained in:
parent
0c5b1c27f2
commit
f53f45def3
|
@ -102,7 +102,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
|
|||
|
||||
/**
|
||||
* Will start the {@link Processor} represented by this
|
||||
* {@link ProcessorNode}. Starting processor typically means invoking it's
|
||||
* {@link ProcessorNode}. Starting processor typically means invoking its
|
||||
* operation that is annotated with @OnScheduled and then executing a
|
||||
* callback provided by the {@link ProcessScheduler} to which typically
|
||||
* initiates
|
||||
|
@ -126,7 +126,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
|
|||
|
||||
/**
|
||||
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.
|
||||
* Stopping processor typically means invoking it's operation that is
|
||||
* Stopping processor typically means invoking its operation that is
|
||||
* annotated with @OnUnschedule and then @OnStopped.
|
||||
*
|
||||
* @param scheduler
|
||||
|
|
|
@ -1374,8 +1374,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
}
|
||||
});
|
||||
|
||||
long onScheduleTimeout = Long.parseLong(NiFiProperties.getInstance()
|
||||
.getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, String.valueOf(Long.MAX_VALUE)));
|
||||
String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT);
|
||||
long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE
|
||||
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
|
||||
|
||||
try {
|
||||
executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
|
|||
* Base implementation of the {@link SchedulingAgent} which encapsulates the
|
||||
* updates to the {@link ScheduleState} based on invoked operation and then
|
||||
* delegates to the corresponding 'do' methods. For example; By invoking
|
||||
* {@link #schedule(Connectable, ScheduleState)} the the
|
||||
* {@link #schedule(Connectable, ScheduleState)} the
|
||||
* {@link ScheduleState#setScheduled(boolean)} with value 'true' will be
|
||||
* invoked.
|
||||
*
|
||||
|
|
|
@ -457,7 +457,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
|
||||
final Set<String> identifiers = new HashSet<>();
|
||||
for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
|
||||
Class<? extends ControllerService> c = entry.getValue().getProxiedControllerService().getClass();
|
||||
if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
|
||||
identifiers.add(entry.getKey());
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.nifi.controller.service;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.beans.PropertyDescriptor;
|
||||
|
@ -41,15 +40,11 @@ import org.apache.nifi.controller.service.mock.ServiceA;
|
|||
import org.apache.nifi.controller.service.mock.ServiceB;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.groups.StandardProcessGroup;
|
||||
import org.apache.nifi.processor.StandardProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestStandardControllerServiceProvider {
|
||||
private static StateManagerProvider stateManagerProvider = new StateManagerProvider() {
|
||||
|
@ -191,119 +186,6 @@ public class TestStandardControllerServiceProvider {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@Ignore // this may be obsolete since TestProcessorLifecycle covers this
|
||||
// scenario without mocks
|
||||
public void testStartStopReferencingComponents() {
|
||||
final ProcessScheduler scheduler = createScheduler();
|
||||
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
|
||||
|
||||
// build a graph of reporting tasks and controller services with dependencies as such:
|
||||
//
|
||||
// Processor P1 -> A -> B -> D
|
||||
// Processor P2 -> C ---^----^
|
||||
//
|
||||
// In other words, Processor P1 references Controller Service A, which references B, which references D.
|
||||
// AND
|
||||
// Processor P2 references Controller Service C, which references B and D.
|
||||
//
|
||||
// So we have to verify that if D is enabled, when we enable its referencing services,
|
||||
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
|
||||
// until B is first enabled so ensure that we enable B first.
|
||||
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
||||
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
|
||||
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||
|
||||
final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
|
||||
procNode.verifyCanStart();
|
||||
// procNode.setScheduledState(ScheduledState.RUNNING);
|
||||
return null;
|
||||
}
|
||||
}).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
|
||||
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(final InvocationOnMock invocation) throws Throwable {
|
||||
final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0];
|
||||
procNode.verifyCanStop();
|
||||
// procNode.setScheduledState(ScheduledState.STOPPED);
|
||||
return null;
|
||||
}
|
||||
}).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
|
||||
|
||||
final String id1 = UUID.randomUUID().toString();
|
||||
final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
|
||||
new StandardValidationContextFactory(provider), scheduler, provider);
|
||||
procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
|
||||
procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
|
||||
procNodeA.setProcessGroup(mockProcessGroup);
|
||||
|
||||
final String id2 = UUID.randomUUID().toString();
|
||||
final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2,
|
||||
new StandardValidationContextFactory(provider), scheduler, provider);
|
||||
procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
|
||||
procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
|
||||
procNodeB.setProcessGroup(mockProcessGroup);
|
||||
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||
|
||||
provider.enableControllerService(serviceNode4);
|
||||
provider.enableReferencingServices(serviceNode4);
|
||||
provider.scheduleReferencingComponents(serviceNode4);
|
||||
|
||||
final Set<ControllerServiceState> enableStates = new HashSet<>();
|
||||
enableStates.add(ControllerServiceState.ENABLED);
|
||||
enableStates.add(ControllerServiceState.ENABLING);
|
||||
|
||||
while (serviceNode3.getState() != ControllerServiceState.ENABLED
|
||||
|| serviceNode2.getState() != ControllerServiceState.ENABLED
|
||||
|| serviceNode1.getState() != ControllerServiceState.ENABLED) {
|
||||
assertTrue(enableStates.contains(serviceNode3.getState()));
|
||||
assertTrue(enableStates.contains(serviceNode2.getState()));
|
||||
assertTrue(enableStates.contains(serviceNode1.getState()));
|
||||
}
|
||||
assertTrue(procNodeA.isRunning());
|
||||
assertTrue(procNodeB.isRunning());
|
||||
|
||||
// stop processors and verify results.
|
||||
provider.unscheduleReferencingComponents(serviceNode4);
|
||||
assertFalse(procNodeA.isRunning());
|
||||
assertFalse(procNodeB.isRunning());
|
||||
while (serviceNode3.getState() != ControllerServiceState.ENABLED
|
||||
|| serviceNode2.getState() != ControllerServiceState.ENABLED
|
||||
|| serviceNode1.getState() != ControllerServiceState.ENABLED) {
|
||||
assertTrue(enableStates.contains(serviceNode3.getState()));
|
||||
assertTrue(enableStates.contains(serviceNode2.getState()));
|
||||
assertTrue(enableStates.contains(serviceNode1.getState()));
|
||||
}
|
||||
|
||||
provider.disableReferencingServices(serviceNode4);
|
||||
final Set<ControllerServiceState> disableStates = new HashSet<>();
|
||||
disableStates.add(ControllerServiceState.DISABLED);
|
||||
disableStates.add(ControllerServiceState.DISABLING);
|
||||
|
||||
// Wait for the services to be disabled.
|
||||
while (serviceNode3.getState() != ControllerServiceState.DISABLED
|
||||
|| serviceNode2.getState() != ControllerServiceState.DISABLED
|
||||
|| serviceNode1.getState() != ControllerServiceState.DISABLED) {
|
||||
assertTrue(disableStates.contains(serviceNode3.getState()));
|
||||
assertTrue(disableStates.contains(serviceNode2.getState()));
|
||||
assertTrue(disableStates.contains(serviceNode1.getState()));
|
||||
}
|
||||
|
||||
assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
|
||||
|
||||
provider.disableControllerService(serviceNode4);
|
||||
assertTrue(disableStates.contains(serviceNode4.getState()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOrderingOfServices() {
|
||||
|
@ -476,9 +358,5 @@ public class TestStandardControllerServiceProvider {
|
|||
// procNode.setScheduledState(ScheduledState.RUNNING);
|
||||
provider.unscheduleReferencingComponents(serviceNode);
|
||||
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
|
||||
|
||||
// procNode.setScheduledState(ScheduledState.DISABLED);
|
||||
// provider.unscheduleReferencingComponents(serviceNode);
|
||||
// assertEquals(ScheduledState.DISABLED, procNode.getScheduledState());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue