NIFI-2465 - InferAvroSchema EL support based on incoming FlowFiles

This closes #863.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Simon Elliston Ball 2016-08-15 17:54:50 +01:00 committed by Bryan Bende
parent 26f5c496d1
commit b0122c6a73
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
2 changed files with 91 additions and 22 deletions

View File

@ -95,7 +95,7 @@ public class InferAvroSchema
.subject(subject)
.input(input)
.explanation("Only non-null single characters are supported")
.valid(input.length() == 1 && input.charAt(0) != 0)
.valid(input.length() == 1 && input.charAt(0) != 0 || context.isExpressionLanguagePresent(input))
.build();
}
};
@ -175,6 +175,7 @@ public class InferAvroSchema
public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
.name("CSV delimiter")
.description("Delimiter character for CSV records")
.expressionLanguageSupported(true)
.addValidator(CHAR_VALIDATOR)
.defaultValue(",")
.build();
@ -212,6 +213,7 @@ public class InferAvroSchema
.description("Character encoding of CSV data.")
.required(true)
.defaultValue("UTF-8")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
@ -391,14 +393,14 @@ public class InferAvroSchema
}
//Prepares the CSVProperties for kite
final CSVProperties props = new CSVProperties.Builder()
.delimiter(context.getProperty(DELIMITER).getValue())
.escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions().getValue())
.quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions().getValue())
CSVProperties props = new CSVProperties.Builder()
.charset(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue())
.delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(inputFlowFile).getValue())
.quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue())
.escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue())
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions(inputFlowFile).asInteger())
.header(header.get())
.hasHeader(hasHeader.get())
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions().asInteger())
.charset(context.getProperty(CHARSET).getValue())
.build();
final AtomicReference<String> avroSchema = new AtomicReference<>();
@ -408,7 +410,7 @@ public class InferAvroSchema
public void process(InputStream in) throws IOException {
avroSchema.set(CSVUtil
.inferSchema(
context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), in, props)
context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(), in, props)
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
}
});
@ -435,8 +437,8 @@ public class InferAvroSchema
@Override
public void process(InputStream in) throws IOException {
Schema as = JsonUtil.inferSchema(
in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(),
context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions().asInteger());
in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(),
context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions(inputFlowFile).asInteger());
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
}

View File

@ -106,9 +106,7 @@ public class TestInferAvroSchema {
MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
String knownSchema = new String(unix2PlatformSpecificLineEndings(
new File("src/test/resources/Shapes.json.avro")),
StandardCharsets.UTF_8);
String knownSchema = new String(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes.json.avro")), StandardCharsets.UTF_8);
Assert.assertEquals(avroSchema, knownSchema);
// Since that avro schema is written to an attribute this should be teh same as the original
@ -223,9 +221,77 @@ public class TestInferAvroSchema {
flowFile.assertContentEquals(new File("src/test/resources/Shapes_Header_TabDelimited.csv").toPath());
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/csv");
}
@Test
public void specifyCSVparametersInExpressionLanguage() throws Exception {
runner.setProperty(InferAvroSchema.DELIMITER, "${csv.delimiter}");
runner.setProperty(InferAvroSchema.ESCAPE_STRING, "${csv.escape}");
runner.setProperty(InferAvroSchema.QUOTE_STRING, "${csv.quote}");
runner.setProperty(InferAvroSchema.CHARSET, "${csv.charset}");
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
runner.assertValid();
@SuppressWarnings("serial")
Map<String, String> attributes = new HashMap<String, String>() {
{
put("csv.delimiter",",");
put("csv.escape", "\\");
put("csv.quote", "\"");
put("csv.charset", "UTF-8");
put(CoreAttributes.MIME_TYPE.key(), "text/csv");
}
};
runner.enqueue(new File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
runner.run();
runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
flowFile.assertContentEquals(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes_header.csv.avro")));
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
}
@Test
public void specifyJsonParametersInExpressionLanguage() throws Exception {
runner.assertValid();
runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
// Purposely set to True to test that none of the JSON file is read which would cause issues.
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE);
runner.setProperty(InferAvroSchema.RECORD_NAME, "${record.name}");
runner.setProperty(InferAvroSchema.NUM_RECORDS_TO_ANALYZE, "${records.analyze}");
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
attributes.put("record.name", "myrecord");
attributes.put("records.analyze", "2");
runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), attributes);
runner.run();
runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
Assert.assertTrue(avroSchema.contains("\"name\" : \"myrecord\""));
// Since that avro schema is written to an attribute this should be teh same as the original
data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
}
static byte[] unix2PlatformSpecificLineEndings(final File file) throws IOException {
try ( final BufferedInputStream in = new BufferedInputStream(new FileInputStream(file));
final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (final BufferedInputStream in = new BufferedInputStream(new FileInputStream(file)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
byte eol[] = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
int justRead;
while ((justRead = in.read()) != -1) {
@ -238,4 +304,5 @@ public class TestInferAvroSchema {
return out.toByteArray();
}
}
}