NIFI-1464 addressed latest PR comments

NIFI-1464 polishing
This commit is contained in:
Oleg Zhurakousky 2016-02-26 12:42:48 -05:00 committed by Mark Payne
parent f53f45def3
commit 48af0bfbc5
6 changed files with 143 additions and 41 deletions

View File

@ -21,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.connectable.Connectable;
@ -31,12 +32,19 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class);
protected final AtomicReference<ScheduledState> scheduledState;
public ProcessorNode(final Processor processor, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
super(processor, id, validationContextFactory, serviceProvider);
this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
}
public abstract boolean isIsolated();
@ -100,6 +108,31 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
*/
public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
/**
*
*/
@Override
public ScheduledState getScheduledState() {
return this.scheduledState.get();
}
/**
* Returns the logical state of this processor. Logical state ignores
* transition states such as STOPPING and STARTING rounding it up to the
* next logical state of STOPPED and RUNNING respectively.
*
* @return the logical state of this processor [DISABLED, STOPPED, RUNNING]
*/
public ScheduledState getLogicalScheduledState() {
ScheduledState sc = this.scheduledState.get();
if (sc == ScheduledState.STARTING) {
return ScheduledState.RUNNING;
} else if (sc == ScheduledState.STOPPING) {
return ScheduledState.STOPPED;
}
return sc;
}
/**
* Will start the {@link Processor} represented by this
* {@link ProcessorNode}. Starting processor typically means invoking its
@ -145,4 +178,26 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
*/
public abstract <T extends ProcessContext & ControllerServiceLookup> void stop(ScheduledExecutorService scheduler,
T processContext, Callable<Boolean> activeThreadMonitorCallback);
/**
* Will set the state of the processor to STOPPED which essentially implies
* that this processor can be started. This is idempotent operation and will
* result in the WARN message if processor can not be enabled.
*/
public void enable() {
if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED)) {
logger.warn("Processor cannot be enabled because it is not disabled");
}
}
/**
* Will set the state of the processor to DISABLED which essentially implies
* that this processor can NOT be started. This is idempotent operation and
* will result in the WARN message if processor can not be enabled.
*/
public void disable() {
if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED)) {
logger.warn("Processor cannot be disabled because its state is set to " + this.scheduledState);
}
}
}

View File

@ -255,7 +255,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", port.getScheduledState().name());
addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression()));
addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
parentElement.appendChild(element);
}
@ -311,7 +311,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
addTextElement(element, "scheduledState", processor.getScheduledState().name());
addTextElement(element, "scheduledState", processor.getLogicalScheduledState().name());
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));

View File

@ -106,7 +106,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final AtomicReference<List<Connection>> incomingConnectionsRef;
private final AtomicBoolean isolated;
private final AtomicBoolean lossTolerant;
private final AtomicReference<ScheduledState> scheduledState;
private final AtomicReference<String> comments;
private final AtomicReference<Position> position;
private final AtomicReference<String> annotationData;
@ -145,7 +144,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
destinations = new HashMap<>();
connections = new HashMap<>();
incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>());
scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
lossTolerant = new AtomicBoolean(false);
final Set<Relationship> emptySetOfRelationships = new HashSet<>();
undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
@ -213,11 +211,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
this.comments.set(comments);
}
@Override
public ScheduledState getScheduledState() {
return this.scheduledState.get();
}
@Override
public Position getPosition() {
return position.get();
@ -1276,7 +1269,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
};
taskScheduler.execute(startProcRunnable);
} else {
LOG.warn("Can not start Processor since it's already in the process of being started");
LOG.warn("Can not start Processor since it's already in the process of being started or it is DISABLED");
}
}

View File

@ -473,16 +473,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public synchronized void enableProcessor(final ProcessorNode procNode) {
if (procNode.getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
}
procNode.enable();
}
@Override
public synchronized void disableProcessor(final ProcessorNode procNode) {
if (procNode.getScheduledState() != ScheduledState.STOPPED) {
throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
}
procNode.disable();
}
public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {

View File

@ -89,6 +89,53 @@ public class TestProcessorLifecycle {
FileUtils.deleteDirectory(new File("./target/content_repository"));
}
@Test
public void validateEnableOperation() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString());
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState());
// validates idempotency
for (int i = 0; i < 2; i++) {
testProcNode.enable();
}
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState());
testProcNode.disable();
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState());
fc.shutdown(true);
}
@Test
public void validateDisableOperation() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState());
// validates idempotency
for (int i = 0; i < 2; i++) {
testProcNode.disable();
}
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState());
ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode);
assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState());
fc.shutdown(true);
}
/**
* Will validate the idempotent nature of processor start operation which
* can be called multiple times without any side-effects.
@ -276,10 +323,12 @@ public class TestProcessorLifecycle {
ps.startProcessor(testProcNode);
Thread.sleep(1000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode);
Thread.sleep(100);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING);
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED);
Thread.sleep(1000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
@ -360,7 +409,7 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5000");
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec");
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
@ -390,7 +439,7 @@ public class TestProcessorLifecycle {
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5000");
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec");
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
@ -406,9 +455,14 @@ public class TestProcessorLifecycle {
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
Thread.sleep(1000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
ps.disableProcessor(testProcNode); // no effect
Thread.sleep(100);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING);
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode);
Thread.sleep(100);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING);
assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED);
Thread.sleep(4000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
fc.shutdown(true);
@ -435,6 +489,9 @@ public class TestProcessorLifecycle {
ps.startProcessor(testProcNode);
Thread.sleep(1000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.disableProcessor(testProcNode);
Thread.sleep(100);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode);
Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);

View File

@ -16,6 +16,29 @@
*/
package org.apache.nifi.web.api.dto;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.component.details.ComponentDetails;
@ -128,28 +151,6 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusDTO;
import javax.ws.rs.WebApplicationException;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@ -1643,7 +1644,7 @@ public final class DtoFactory {
dto.setType(node.getProcessor().getClass().getCanonicalName());
dto.setName(node.getName());
dto.setState(node.getScheduledState().toString());
dto.setState(node.getLogicalScheduledState().toString());
// build the relationship dtos
final List<RelationshipDTO> relationships = new ArrayList<>();