enable EL on FF level

This commit is contained in:
Marco Gaido 2018-01-29 16:20:30 +01:00 committed by Pierre Villard
parent 836c324258
commit ba192d22da
2 changed files with 28 additions and 33 deletions

View File

@ -23,7 +23,6 @@ import java.io.OutputStream;
import java.io.Reader; import java.io.Reader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -39,11 +38,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -185,8 +181,6 @@ public class ValidateCsv extends AbstractProcessor {
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private Set<Relationship> relationships; private Set<Relationship> relationships;
private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -215,32 +209,14 @@ public class ValidateCsv extends AbstractProcessor {
return properties; return properties;
} }
@Override public CsvPreference getPreference(final ProcessContext context, final FlowFile flowFile) {
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
String schema = validationContext.getProperty(SCHEMA).evaluateAttributeExpressions().getValue();
try {
this.parseSchema(schema);
} catch (Exception e) {
final List<ValidationResult> problems = new ArrayList<>(1);
problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
.input(schema)
.valid(false)
.explanation("Error while parsing the schema: " + e.getMessage())
.build());
return problems;
}
return super.customValidate(validationContext);
}
@OnScheduled
public void setPreference(final ProcessContext context) {
// When going from the UI to Java, the characters are escaped so that what you // When going from the UI to Java, the characters are escaped so that what you
// input is transferred over to Java as is. So when you type the characters "\" // input is transferred over to Java as is. So when you type the characters "\"
// and "n" into the UI the Java string will end up being those two characters // and "n" into the UI the Java string will end up being those two characters
// not the interpreted value "\n". // not the interpreted value "\n".
final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).evaluateAttributeExpressions().getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).evaluateAttributeExpressions().getValue().charAt(0), return new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0),
context.getProperty(DELIMITER_CHARACTER).evaluateAttributeExpressions().getValue().charAt(0), msgDemarcator).build()); context.getProperty(DELIMITER_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0), msgDemarcator).build();
} }
/** /**
@ -248,15 +224,15 @@ public class ValidateCsv extends AbstractProcessor {
* to a list of cell processors used to validate the CSV data. * to a list of cell processors used to validate the CSV data.
* @param schema Schema to parse * @param schema Schema to parse
*/ */
private void parseSchema(String schema) { private CellProcessor[] parseSchema(String schema) {
List<CellProcessor> processorsList = new ArrayList<CellProcessor>(); List<CellProcessor> processorsList = new ArrayList<>();
String remaining = schema; String remaining = schema;
while(remaining.length() > 0) { while(remaining.length() > 0) {
remaining = setProcessor(remaining, processorsList); remaining = setProcessor(remaining, processorsList);
} }
this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()])); return processorsList.toArray(new CellProcessor[processorsList.size()]);
} }
private String setProcessor(String remaining, List<CellProcessor> processorsList) { private String setProcessor(String remaining, List<CellProcessor> processorsList) {
@ -435,10 +411,11 @@ public class ValidateCsv extends AbstractProcessor {
return; return;
} }
final CsvPreference csvPref = this.preference.get(); final CsvPreference csvPref = getPreference(context, flowFile);
final boolean header = context.getProperty(HEADER).asBoolean(); final boolean header = context.getProperty(HEADER).asBoolean();
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
final CellProcessor[] cellProcs = this.processors.get(); final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
final CellProcessor[] cellProcs = this.parseSchema(schema);
final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue()); final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true); final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);

View File

@ -313,4 +313,22 @@ public class TestValidateCsv {
runner.assertValid(); runner.assertValid();
} }
@Test
public void testMultipleRuns() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
runner.enqueue("John\r\nBob\r\nTom");
runner.enqueue("John\r\nBob\r\nTom");
runner.run(2);
runner.assertTransferCount(ValidateCsv.REL_VALID, 2);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
} }