From 48af0bfbc5cafe06e003b2bd086b970cf49b5025 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 26 Feb 2016 12:42:48 -0500 Subject: [PATCH] NIFI-1464 addressed latest PR comments NIFI-1464 polishing --- .../apache/nifi/controller/ProcessorNode.java | 55 +++++++++++++++++ .../controller/StandardFlowSerializer.java | 4 +- .../controller/StandardProcessorNode.java | 9 +-- .../scheduling/StandardProcessScheduler.java | 8 +-- .../scheduling/TestProcessorLifecycle.java | 61 ++++++++++++++++++- .../apache/nifi/web/api/dto/DtoFactory.java | 47 +++++++------- 6 files changed, 143 insertions(+), 41 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index ff7977db5d..a428349e1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.connectable.Connectable; @@ -31,12 +32,19 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { + private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class); + + protected final AtomicReference scheduledState; + public ProcessorNode(final Processor processor, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { super(processor, id, validationContextFactory, serviceProvider); + this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } public abstract boolean isIsolated(); @@ -100,6 +108,31 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen */ public abstract void verifyCanStart(Set ignoredReferences); + /** + * + */ + @Override + public ScheduledState getScheduledState() { + return this.scheduledState.get(); + } + + /** + * Returns the logical state of this processor. Logical state ignores + * transition states such as STOPPING and STARTING rounding it up to the + * next logical state of STOPPED and RUNNING respectively. + * + * @return the logical state of this processor [DISABLED, STOPPED, RUNNING] + */ + public ScheduledState getLogicalScheduledState() { + ScheduledState sc = this.scheduledState.get(); + if (sc == ScheduledState.STARTING) { + return ScheduledState.RUNNING; + } else if (sc == ScheduledState.STOPPING) { + return ScheduledState.STOPPED; + } + return sc; + } + /** * Will start the {@link Processor} represented by this * {@link ProcessorNode}. Starting processor typically means invoking its @@ -145,4 +178,26 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen */ public abstract void stop(ScheduledExecutorService scheduler, T processContext, Callable activeThreadMonitorCallback); + + /** + * Will set the state of the processor to STOPPED which essentially implies + * that this processor can be started. This is idempotent operation and will + * result in the WARN message if processor can not be enabled. + */ + public void enable() { + if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED)) { + logger.warn("Processor cannot be enabled because it is not disabled"); + } + } + + /** + * Will set the state of the processor to DISABLED which essentially implies + * that this processor can NOT be started. This is idempotent operation and + * will result in the WARN message if processor can not be enabled. + */ + public void disable() { + if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED)) { + logger.warn("Processor cannot be disabled because its state is set to " + this.scheduledState); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java index 1ee85a212a..e9f958cdd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java @@ -255,7 +255,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "comments", port.getComments()); addTextElement(element, "scheduledState", port.getScheduledState().name()); addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); - addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression())); + addTextElement(element, "useCompression", String.valueOf(port.isUseCompression())); parentElement.appendChild(element); } @@ -311,7 +311,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "yieldPeriod", processor.getYieldPeriod()); addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString()); addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); - addTextElement(element, "scheduledState", processor.getScheduledState().name()); + addTextElement(element, "scheduledState", processor.getLogicalScheduledState().name()); addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index dac56b5f36..2a0819cf92 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -106,7 +106,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final AtomicReference> incomingConnectionsRef; private final AtomicBoolean isolated; private final AtomicBoolean lossTolerant; - private final AtomicReference scheduledState; private final AtomicReference comments; private final AtomicReference position; private final AtomicReference annotationData; @@ -145,7 +144,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable destinations = new HashMap<>(); connections = new HashMap<>(); incomingConnectionsRef = new AtomicReference>(new ArrayList()); - scheduledState = new AtomicReference<>(ScheduledState.STOPPED); lossTolerant = new AtomicBoolean(false); final Set emptySetOfRelationships = new HashSet<>(); undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); @@ -213,11 +211,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable this.comments.set(comments); } - @Override - public ScheduledState getScheduledState() { - return this.scheduledState.get(); - } - @Override public Position getPosition() { return position.get(); @@ -1276,7 +1269,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable }; taskScheduler.execute(startProcRunnable); } else { - LOG.warn("Can not start Processor since it's already in the process of being started"); + LOG.warn("Can not start Processor since it's already in the process of being started or it is DISABLED"); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 84d667f6cf..bbaa23b507 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -473,16 +473,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public synchronized void enableProcessor(final ProcessorNode procNode) { - if (procNode.getScheduledState() != ScheduledState.DISABLED) { - throw new IllegalStateException("Processor cannot be enabled because it is not disabled"); - } + procNode.enable(); } @Override public synchronized void disableProcessor(final ProcessorNode procNode) { - if (procNode.getScheduledState() != ScheduledState.STOPPED) { - throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState()); - } + procNode.disable(); } public synchronized void enableReportingTask(final ReportingTaskNode taskNode) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 2cce14d895..d78009c8c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -89,6 +89,53 @@ public class TestProcessorLifecycle { FileUtils.deleteDirectory(new File("./target/content_repository")); } + @Test + public void validateEnableOperation() throws Exception { + FlowController fc = this.buildFlowControllerForTest(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); + this.setControllerRootGroup(fc, testGroup); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), + UUID.randomUUID().toString()); + + assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState()); + // validates idempotency + for (int i = 0; i < 2; i++) { + testProcNode.enable(); + } + assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState()); + testProcNode.disable(); + assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState()); + fc.shutdown(true); + } + + + @Test + public void validateDisableOperation() throws Exception { + FlowController fc = this.buildFlowControllerForTest(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); + this.setControllerRootGroup(fc, testGroup); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), + UUID.randomUUID().toString()); + testProcNode.setProperty("P", "hello"); + assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState()); + // validates idempotency + for (int i = 0; i < 2; i++) { + testProcNode.disable(); + } + assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState()); + + ProcessScheduler ps = fc.getProcessScheduler(); + ps.startProcessor(testProcNode); + assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState()); + + fc.shutdown(true); + } + /** * Will validate the idempotent nature of processor start operation which * can be called multiple times without any side-effects. @@ -276,10 +323,12 @@ public class TestProcessorLifecycle { ps.startProcessor(testProcNode); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING); ps.stopProcessor(testProcNode); Thread.sleep(100); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); @@ -360,7 +409,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { - NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5000"); + NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -390,7 +439,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { - NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5000"); + NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -406,9 +455,14 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); + ps.disableProcessor(testProcNode); // no effect + Thread.sleep(100); + assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING); ps.stopProcessor(testProcNode); Thread.sleep(100); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED); Thread.sleep(4000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); fc.shutdown(true); @@ -435,6 +489,9 @@ public class TestProcessorLifecycle { ps.startProcessor(testProcNode); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + ps.disableProcessor(testProcNode); + Thread.sleep(100); + assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); ps.stopProcessor(testProcNode); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 652fd04362..64ffdaf2c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,29 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.component.details.ComponentDetails; @@ -128,28 +151,6 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusDTO; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -1643,7 +1644,7 @@ public final class DtoFactory { dto.setType(node.getProcessor().getClass().getCanonicalName()); dto.setName(node.getName()); - dto.setState(node.getScheduledState().toString()); + dto.setState(node.getLogicalScheduledState().toString()); // build the relationship dtos final List relationships = new ArrayList<>();