mirror of https://github.com/apache/nifi.git
NIFI-1464 refactored the latest commit
This commit is contained in:
parent
48af0bfbc5
commit
1c22f3f012
|
@ -113,17 +113,6 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
|
|||
*/
|
||||
@Override
|
||||
public ScheduledState getScheduledState() {
|
||||
return this.scheduledState.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the logical state of this processor. Logical state ignores
|
||||
* transition states such as STOPPING and STARTING rounding it up to the
|
||||
* next logical state of STOPPED and RUNNING respectively.
|
||||
*
|
||||
* @return the logical state of this processor [DISABLED, STOPPED, RUNNING]
|
||||
*/
|
||||
public ScheduledState getLogicalScheduledState() {
|
||||
ScheduledState sc = this.scheduledState.get();
|
||||
if (sc == ScheduledState.STARTING) {
|
||||
return ScheduledState.RUNNING;
|
||||
|
@ -133,6 +122,17 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
|
|||
return sc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the physical state of this processor which includes transition
|
||||
* states such as STOPPING and STARTING.
|
||||
*
|
||||
* @return the physical state of this processor [DISABLED, STOPPED, RUNNING,
|
||||
* STARTIING, STOPIING]
|
||||
*/
|
||||
public ScheduledState getPhysicalScheduledState() {
|
||||
return this.scheduledState.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Will start the {@link Processor} represented by this
|
||||
* {@link ProcessorNode}. Starting processor typically means invoking its
|
||||
|
|
|
@ -311,7 +311,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
|||
addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
|
||||
addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
|
||||
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
|
||||
addTextElement(element, "scheduledState", processor.getLogicalScheduledState().name());
|
||||
addTextElement(element, "scheduledState", processor.getScheduledState().name());
|
||||
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
|
||||
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
|
||||
|
||||
|
|
|
@ -98,16 +98,16 @@ public class TestProcessorLifecycle {
|
|||
UUID.randomUUID().toString());
|
||||
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState());
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
|
||||
// validates idempotency
|
||||
for (int i = 0; i < 2; i++) {
|
||||
testProcNode.enable();
|
||||
}
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState());
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
|
||||
testProcNode.disable();
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState());
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
|
||||
fc.shutdown(true);
|
||||
}
|
||||
|
||||
|
@ -121,17 +121,17 @@ public class TestProcessorLifecycle {
|
|||
UUID.randomUUID().toString());
|
||||
testProcNode.setProperty("P", "hello");
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState());
|
||||
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
|
||||
// validates idempotency
|
||||
for (int i = 0; i < 2; i++) {
|
||||
testProcNode.disable();
|
||||
}
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState());
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
|
||||
|
||||
ProcessScheduler ps = fc.getProcessScheduler();
|
||||
ps.startProcessor(testProcNode);
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState());
|
||||
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
|
||||
|
||||
fc.shutdown(true);
|
||||
}
|
||||
|
@ -322,13 +322,13 @@ public class TestProcessorLifecycle {
|
|||
|
||||
ps.startProcessor(testProcNode);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
|
||||
|
||||
ps.stopProcessor(testProcNode);
|
||||
Thread.sleep(100);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING);
|
||||
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
|
||||
|
||||
|
@ -359,9 +359,9 @@ public class TestProcessorLifecycle {
|
|||
|
||||
ps.startProcessor(testProcNode);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
Thread.sleep(100);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
|
||||
ps.stopProcessor(testProcNode);
|
||||
|
@ -392,12 +392,12 @@ public class TestProcessorLifecycle {
|
|||
|
||||
ps.startProcessor(testProcNode);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
ps.stopProcessor(testProcNode);
|
||||
Thread.sleep(100);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
|
||||
Thread.sleep(500);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
|
||||
fc.shutdown(true);
|
||||
|
@ -422,12 +422,12 @@ public class TestProcessorLifecycle {
|
|||
|
||||
ps.startProcessor(testProcNode);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
ps.stopProcessor(testProcNode);
|
||||
Thread.sleep(100);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
|
||||
Thread.sleep(4000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
|
||||
fc.shutdown(true);
|
||||
|
@ -452,17 +452,17 @@ public class TestProcessorLifecycle {
|
|||
|
||||
ps.startProcessor(testProcNode);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
Thread.sleep(1000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
ps.disableProcessor(testProcNode); // no effect
|
||||
Thread.sleep(100);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
|
||||
ps.stopProcessor(testProcNode);
|
||||
Thread.sleep(100);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING);
|
||||
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED);
|
||||
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
|
||||
Thread.sleep(4000);
|
||||
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
|
||||
fc.shutdown(true);
|
||||
|
|
|
@ -1644,7 +1644,7 @@ public final class DtoFactory {
|
|||
|
||||
dto.setType(node.getProcessor().getClass().getCanonicalName());
|
||||
dto.setName(node.getName());
|
||||
dto.setState(node.getLogicalScheduledState().toString());
|
||||
dto.setState(node.getScheduledState().toString());
|
||||
|
||||
// build the relationship dtos
|
||||
final List<RelationshipDTO> relationships = new ArrayList<>();
|
||||
|
|
Loading…
Reference in New Issue