From b0122c6a73846eba4dcbff402c19b2cf1ded7951 Mon Sep 17 00:00:00 2001 From: Simon Elliston Ball Date: Mon, 15 Aug 2016 17:54:50 +0100 Subject: [PATCH] NIFI-2465 - InferAvroSchema EL support based on incoming FlowFiles This closes #863. Signed-off-by: Bryan Bende --- .../nifi/processors/kite/InferAvroSchema.java | 22 +++-- .../processors/kite/TestInferAvroSchema.java | 91 ++++++++++++++++--- 2 files changed, 91 insertions(+), 22 deletions(-) diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java index 1923785293..aad48ae02d 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java @@ -95,7 +95,7 @@ public class InferAvroSchema .subject(subject) .input(input) .explanation("Only non-null single characters are supported") - .valid(input.length() == 1 && input.charAt(0) != 0) + .valid(input.length() == 1 && input.charAt(0) != 0 || context.isExpressionLanguagePresent(input)) .build(); } }; @@ -175,6 +175,7 @@ public class InferAvroSchema public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() .name("CSV delimiter") .description("Delimiter character for CSV records") + .expressionLanguageSupported(true) .addValidator(CHAR_VALIDATOR) .defaultValue(",") .build(); @@ -212,6 +213,7 @@ public class InferAvroSchema .description("Character encoding of CSV data.") .required(true) .defaultValue("UTF-8") + .expressionLanguageSupported(true) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); @@ -391,14 +393,14 @@ public class InferAvroSchema } //Prepares the CSVProperties for kite - final CSVProperties props = new CSVProperties.Builder() - .delimiter(context.getProperty(DELIMITER).getValue()) - .escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions().getValue()) - .quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions().getValue()) + CSVProperties props = new CSVProperties.Builder() + .charset(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()) + .delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(inputFlowFile).getValue()) + .quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue()) + .escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions(inputFlowFile).getValue()) + .linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions(inputFlowFile).asInteger()) .header(header.get()) .hasHeader(hasHeader.get()) - .linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions().asInteger()) - .charset(context.getProperty(CHARSET).getValue()) .build(); final AtomicReference avroSchema = new AtomicReference<>(); @@ -408,7 +410,7 @@ public class InferAvroSchema public void process(InputStream in) throws IOException { avroSchema.set(CSVUtil .inferSchema( - context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), in, props) + context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(), in, props) .toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean())); } }); @@ -435,8 +437,8 @@ public class InferAvroSchema @Override public void process(InputStream in) throws IOException { Schema as = JsonUtil.inferSchema( - in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), - context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions().asInteger()); + in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions(inputFlowFile).getValue(), + context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions(inputFlowFile).asInteger()); avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean())); } diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java index 125a631da6..171a64a892 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java @@ -49,7 +49,7 @@ public class TestInferAvroSchema { public void setup() { runner = TestRunners.newTestRunner(InferAvroSchema.class); - //Prepare the common setup. + // Prepare the common setup. runner.assertNotValid(); runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE); @@ -90,7 +90,7 @@ public class TestInferAvroSchema { runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE); - //Purposely set to True to test that none of the JSON file is read which would cause issues. + // Purposely set to True to test that none of the JSON file is read which would cause issues. runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true"); runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE); @@ -106,12 +106,10 @@ public class TestInferAvroSchema { MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0); String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME); - String knownSchema = new String(unix2PlatformSpecificLineEndings( - new File("src/test/resources/Shapes.json.avro")), - StandardCharsets.UTF_8); + String knownSchema = new String(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes.json.avro")), StandardCharsets.UTF_8); Assert.assertEquals(avroSchema, knownSchema); - //Since that avro schema is written to an attribute this should be teh same as the original + // Since that avro schema is written to an attribute this should be teh same as the original data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); } @@ -120,7 +118,7 @@ public class TestInferAvroSchema { runner.assertValid(); - //Read in the header + // Read in the header StringWriter writer = new StringWriter(); IOUtils.copy((Files.newInputStream(Paths.get("src/test/resources/ShapesHeader.csv"), StandardOpenOption.READ)), writer, "UTF-8"); runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, writer.toString()); @@ -168,7 +166,7 @@ public class TestInferAvroSchema { } @Test - public void inferSchemaFromEmptyContent() throws Exception { + public void inferSchemaFromEmptyContent() throws Exception { runner.assertValid(); Map attributes = new HashMap<>(); @@ -223,13 +221,81 @@ public class TestInferAvroSchema { flowFile.assertContentEquals(new File("src/test/resources/Shapes_Header_TabDelimited.csv").toPath()); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/csv"); } - static byte [] unix2PlatformSpecificLineEndings(final File file) throws IOException { - try ( final BufferedInputStream in = new BufferedInputStream(new FileInputStream(file)); - final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + + + @Test + public void specifyCSVparametersInExpressionLanguage() throws Exception { + runner.setProperty(InferAvroSchema.DELIMITER, "${csv.delimiter}"); + runner.setProperty(InferAvroSchema.ESCAPE_STRING, "${csv.escape}"); + runner.setProperty(InferAvroSchema.QUOTE_STRING, "${csv.quote}"); + runner.setProperty(InferAvroSchema.CHARSET, "${csv.charset}"); + runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true"); + + runner.assertValid(); + + @SuppressWarnings("serial") + Map attributes = new HashMap() { + { + put("csv.delimiter",","); + put("csv.escape", "\\"); + put("csv.quote", "\""); + put("csv.charset", "UTF-8"); + put(CoreAttributes.MIME_TYPE.key(), "text/csv"); + } + }; + + runner.enqueue(new File("src/test/resources/Shapes_Header.csv").toPath(), attributes); + + runner.run(); + runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0); + runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0); + runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1); + runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0); + flowFile.assertContentEquals(unix2PlatformSpecificLineEndings(new File("src/test/resources/Shapes_header.csv.avro"))); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + } + + @Test + public void specifyJsonParametersInExpressionLanguage() throws Exception { + runner.assertValid(); + runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE); + + // Purposely set to True to test that none of the JSON file is read which would cause issues. + runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true"); + runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE); + runner.setProperty(InferAvroSchema.RECORD_NAME, "${record.name}"); + runner.setProperty(InferAvroSchema.NUM_RECORDS_TO_ANALYZE, "${records.analyze}"); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put("record.name", "myrecord"); + attributes.put("records.analyze", "2"); + runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), attributes); + + runner.run(); + runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0); + runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0); + runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1); + runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1); + + MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0); + String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME); + Assert.assertTrue(avroSchema.contains("\"name\" : \"myrecord\"")); + + // Since that avro schema is written to an attribute this should be teh same as the original + data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + } + + + static byte[] unix2PlatformSpecificLineEndings(final File file) throws IOException { + try (final BufferedInputStream in = new BufferedInputStream(new FileInputStream(file)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { byte eol[] = System.lineSeparator().getBytes(StandardCharsets.UTF_8); int justRead; while ((justRead = in.read()) != -1) { - if (justRead == '\n'){ + if (justRead == '\n') { out.write(eol); } else { out.write(justRead); @@ -238,4 +304,5 @@ public class TestInferAvroSchema { return out.toByteArray(); } } + }