NIFI-2996

- validate processors only when they are in STOPPED state
- report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state
- This closes #1192
This commit is contained in:
Mike Moser 2016-11-09 12:38:04 -05:00 committed by Matt Gilman
parent fb9cbccc38
commit 15af764dd8
7 changed files with 114 additions and 69 deletions

View File

@ -2722,16 +2722,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
}
// determine the run status and get any validation errors... must check
// is valid when not disabled since a processors validity could change due
// to environmental conditions (property configured with a file path and
// the file being externally removed)
// Determine the run status and get any validation error... only validating while STOPPED
// is a trade-off we are willing to make, even though processor validity could change due to
// environmental conditions (property configured with a file path and the file being externally
// removed). This saves on validation costs that would be unnecessary most of the time.
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Disabled);
} else if (!procNode.isValid()) {
status.setRunStatus(RunStatus.Invalid);
} else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Running);
} else if (!procNode.isValid()) {
status.setRunStatus(RunStatus.Invalid);
} else {
status.setRunStatus(RunStatus.Stopped);
}

View File

@ -987,50 +987,54 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public Collection<ValidationResult> getValidationErrors() {
final List<ValidationResult> results = new ArrayList<>();
try {
final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
// Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off
// we are willing to make in order to save on validation costs that would be unnecessary most of the time.
if (getScheduledState() == ScheduledState.STOPPED) {
final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
validationResults = getProcessor().validate(validationContext);
}
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
validationResults = getProcessor().validate(validationContext);
}
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
results.add(result);
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
results.add(result);
}
}
}
for (final Relationship relationship : getUndefinedRelationships()) {
if (!isAutoTerminated(relationship)) {
final ValidationResult error = new ValidationResult.Builder()
.explanation("Relationship '" + relationship.getName()
+ "' is not connected to any component and is not auto-terminated")
.subject("Relationship " + relationship.getName()).valid(false).build();
results.add(error);
for (final Relationship relationship : getUndefinedRelationships()) {
if (!isAutoTerminated(relationship)) {
final ValidationResult error = new ValidationResult.Builder()
.explanation("Relationship '" + relationship.getName()
+ "' is not connected to any component and is not auto-terminated")
.subject("Relationship " + relationship.getName()).valid(false).build();
results.add(error);
}
}
}
switch (getInputRequirement()) {
case INPUT_ALLOWED:
break;
case INPUT_FORBIDDEN: {
final int incomingConnCount = getIncomingNonLoopConnections().size();
if (incomingConnCount != 0) {
results.add(new ValidationResult.Builder().explanation(
"Processor does not allow upstream connections but currently has " + incomingConnCount)
.subject("Upstream Connections").valid(false).build());
switch (getInputRequirement()) {
case INPUT_ALLOWED:
break;
case INPUT_FORBIDDEN: {
final int incomingConnCount = getIncomingNonLoopConnections().size();
if (incomingConnCount != 0) {
results.add(new ValidationResult.Builder().explanation(
"Processor does not allow upstream connections but currently has " + incomingConnCount)
.subject("Upstream Connections").valid(false).build());
}
break;
}
case INPUT_REQUIRED: {
if (getIncomingNonLoopConnections().isEmpty()) {
results.add(new ValidationResult.Builder()
.explanation("Processor requires an upstream connection but currently has none")
.subject("Upstream Connections").valid(false).build());
}
break;
}
}
break;
}
case INPUT_REQUIRED: {
if (getIncomingNonLoopConnections().isEmpty()) {
results.add(new ValidationResult.Builder()
.explanation("Processor requires an upstream connection but currently has none")
.subject("Upstream Connections").valid(false).build());
}
break;
}
}
} catch (final Throwable t) {
results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString())

View File

@ -34,6 +34,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -241,4 +242,13 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public String getProcessGroupIdentifier() {
return null;
}
@Override
public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
Collection<ValidationResult> results = null;
if (getScheduledState() == ScheduledState.STOPPED) {
results = super.getValidationErrors(serviceIdentifiersNotToValidate);
}
return results != null ? results : Collections.emptySet();
}
}

View File

@ -438,4 +438,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
final ProcessGroup procGroup = getProcessGroup();
return procGroup == null ? null : procGroup.getIdentifier();
}
@Override
public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) {
Collection<ValidationResult> results = null;
if (stateRef.get() == ControllerServiceState.DISABLED) {
results = super.getValidationErrors(serviceIdentifiersNotToValidate);
}
return results != null ? results : Collections.emptySet();
}
}

View File

@ -117,6 +117,22 @@ public class TestStandardProcessorNode {
assertEquals(1, processor.onStoppedCount);
}
@Test
public void testDisabledValidationErrors() {
final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor();
final StandardProcessorNode procNode = createProcessorNode(processor);
// Set a property to an invalid value
final Map<String, String> properties = new HashMap<>();
properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(), "");
procNode.setProperties(properties);
Assert.assertTrue(procNode.getValidationErrors().size() > 0);
// Disabled processors skip property validation
procNode.disable();
Assert.assertFalse(procNode.getValidationErrors().size() > 0);
}
@Test
public void testSinglePropertyDynamicallyModifiesClasspath() throws MalformedURLException {
final PropertyDescriptor classpathProp = new PropertyDescriptor.Builder().name("Classpath Resources")

View File

@ -34,6 +34,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@ -367,7 +368,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived);
logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
}
@ -381,31 +382,33 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public boolean isValid() {
return getValidationErrors().isEmpty();
return targetExists.get()
&& (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true);
}
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
ValidationResult error = null;
if (!targetExists.get()) {
error = new ValidationResult.Builder()
.explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
.subject(String.format("Remote port '%s'", getName()))
.valid(false)
.build();
} else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
error = new ValidationResult.Builder()
.explanation(String.format("Port '%s' has no outbound connections", getName()))
.subject(String.format("Remote port '%s'", getName()))
.valid(false)
.build();
}
if (getScheduledState() == ScheduledState.STOPPED) {
ValidationResult error = null;
if (!targetExists.get()) {
error = new ValidationResult.Builder()
.explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
.subject(String.format("Remote port '%s'", getName()))
.valid(false)
.build();
} else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
error = new ValidationResult.Builder()
.explanation(String.format("Port '%s' has no outbound connections", getName()))
.subject(String.format("Remote port '%s'", getName()))
.valid(false)
.build();
}
if (error != null) {
validationErrors.add(error);
if (error != null) {
validationErrors.add(error);
}
}
return validationErrors;
}

View File

@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
@ -273,13 +274,15 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
@Override
public Collection<ValidationResult> getValidationErrors() {
final Collection<ValidationResult> validationErrors = new ArrayList<>();
if (!isValid()) {
final ValidationResult error = new ValidationResult.Builder()
.explanation(String.format("Output connection for port '%s' is not defined.", getName()))
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build();
validationErrors.add(error);
if (getScheduledState() == ScheduledState.STOPPED) {
if (!isValid()) {
final ValidationResult error = new ValidationResult.Builder()
.explanation(String.format("Output connection for port '%s' is not defined.", getName()))
.subject(String.format("Port '%s'", getName()))
.valid(false)
.build();
validationErrors.add(error);
}
}
return validationErrors;
}