diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml index 989f762b1b..68ce57ca12 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml @@ -39,6 +39,10 @@ com.fasterxml.jackson.core jackson-databind + + commons-codec + commons-codec + org.apache.nifi nifi-mock @@ -64,6 +68,7 @@ src/test/resources/user.avsc + src/test/resources/array.avsc diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java new file mode 100644 index 0000000000..48aad7dcef --- /dev/null +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ExtractAvroMetadata.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +@SideEffectFree +@SupportsBatching +@Tags({ "avro", "schema", "metadata" }) +@CapabilityDescription("Extracts metadata from the header of an Avro datafile.") +@WritesAttributes({ + @WritesAttribute(attribute = "schema.type", description = "The type of the schema (i.e. record, enum, etc.)."), + @WritesAttribute(attribute = "schema.name", description = "Contains the name when the type is a record, enum or fixed, " + + "otherwise contains the name of the primitive type."), + @WritesAttribute(attribute = "schema.fingerprint", description = "The result of the Fingerprint Algorithm as a Hex string."), + @WritesAttribute(attribute = "item.count", description = "The total number of items in the datafile, only written if Count Items " + + "is set to true.") +}) +public class ExtractAvroMetadata extends AbstractProcessor { + + static final AllowableValue CRC_64_AVRO = new AllowableValue("CRC-64-AVRO"); + static final AllowableValue MD5 = new AllowableValue("MD5"); + static final AllowableValue SHA_256 = new AllowableValue("SHA-256"); + + static final PropertyDescriptor FINGERPRINT_ALGORITHM = new PropertyDescriptor.Builder() + .name("Fingerprint Algorithm") + .description("The algorithm used to generate the schema fingerprint. Available choices are based on the Avro recommended practices for " + + "fingerprint generation.") + .allowableValues(CRC_64_AVRO, MD5, SHA_256) + .defaultValue(CRC_64_AVRO.getValue()) + .required(true) + .build(); + + static final PropertyDescriptor METADATA_KEYS = new PropertyDescriptor.Builder() + .name("Metadata Keys") + .description("A comma-separated list of keys indicating key/value pairs to extract from the Avro file header. The key 'avro.schema' can " + + "be used to extract the full schema in JSON format, and 'avro.codec' can be used to extract the codec name if one exists.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + static final PropertyDescriptor COUNT_ITEMS = new PropertyDescriptor.Builder() + .name("Count Items") + .description("If true the number of items in the datafile will be counted and stored in a FlowFile attribute 'item.count'. The counting is done " + + "by reading blocks and getting the number of items for each block, thus avoiding de-serializing. The items being counted will be the top-level " + + "items in the datafile. For example, with a schema of type record the items will be the records, and for a schema of type Array the items will " + + "be the arrays (not the number of entries in each array).") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after metadata has been extracted.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or metadata cannot be extracted for any reason") + .build(); + + static final String SCHEMA_TYPE_ATTR = "schema.type"; + static final String SCHEMA_NAME_ATTR = "schema.name"; + static final String SCHEMA_FINGERPRINT_ATTR = "schema.fingerprint"; + static final String ITEM_COUNT_ATTR = "item.count"; + + private List properties; + private Set relationships; + + @Override + protected void init(ProcessorInitializationContext context) { + super.init(context); + + final List properties = new ArrayList<>(); + properties.add(FINGERPRINT_ALGORITHM); + properties.add(METADATA_KEYS); + properties.add(COUNT_ITEMS); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Map avroMetadata = new HashMap<>(); + final Set requestedMetadataKeys = new HashSet<>(); + + final boolean countRecords = context.getProperty(COUNT_ITEMS).asBoolean(); + final String fingerprintAlgorithm = context.getProperty(FINGERPRINT_ALGORITHM).getValue(); + final String metadataKeysValue = context.getProperty(METADATA_KEYS).getValue(); + + if (!StringUtils.isEmpty(metadataKeysValue)) { + final String[] keys = metadataKeysValue.split("\\s*,\\s*"); + for (final String key : keys) { + requestedMetadataKeys.add(key.trim()); + } + } + + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + final Schema schema = reader.getSchema(); + if (schema == null) { + throw new ProcessException("Avro schema was null"); + } + + for (String key : reader.getMetaKeys()) { + if (requestedMetadataKeys.contains(key)) { + avroMetadata.put(key, reader.getMetaString(key)); + } + } + + try { + final byte[] rawFingerprint = SchemaNormalization.parsingFingerprint(fingerprintAlgorithm, schema); + avroMetadata.put(SCHEMA_FINGERPRINT_ATTR, Hex.encodeHexString(rawFingerprint)); + avroMetadata.put(SCHEMA_TYPE_ATTR, schema.getType().getName()); + avroMetadata.put(SCHEMA_NAME_ATTR, schema.getName()); + } catch (NoSuchAlgorithmException e) { + // shouldn't happen since allowable values are valid algorithms + throw new ProcessException(e); + } + + if (countRecords) { + long recordCount = reader.getBlockCount(); + try { + while (reader.nextBlock() != null) { + recordCount += reader.getBlockCount(); + } + } catch (NoSuchElementException e) { + // happens at end of file + } + avroMetadata.put(ITEM_COUNT_ATTR, String.valueOf(recordCount)); + } + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to extract Avro metadata for {} due to {}; transferring to failure", new Object[] {flowFile, pe}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + flowFile = session.putAllAttributes(flowFile, avroMetadata); + session.transfer(flowFile, REL_SUCCESS); + } + +} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 67a6cd324b..192ec00f39 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.avro.ConvertAvroToJSON \ No newline at end of file +org.apache.nifi.processors.avro.ConvertAvroToJSON +org.apache.nifi.processors.avro.ExtractAvroMetadata \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java new file mode 100644 index 0000000000..1315b18385 --- /dev/null +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/AvroTestUtil.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import java.io.IOException; + +public class AvroTestUtil { + + public static ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter datumWriter, final GenericRecord... users) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, out); + for (final GenericRecord user : users) { + dataFileWriter.append(user); + } + } + return out; + } + +} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 2d84202bcb..cfd26c187b 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -16,22 +16,20 @@ */ package org.apache.nifi.processors.avro; -import java.io.File; -import java.io.IOException; - import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.nifi.processors.avro.ConvertAvroToJSON; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import java.io.File; +import java.io.IOException; + public class TestConvertAvroToJSON { @Test @@ -44,7 +42,7 @@ public class TestConvertAvroToJSON { user1.put("favorite_number", 256); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); runner.enqueue(out1.toByteArray()); runner.run(); @@ -69,7 +67,7 @@ public class TestConvertAvroToJSON { user2.put("favorite_color", "red"); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2); runner.enqueue(out1.toByteArray()); runner.run(); @@ -87,16 +85,4 @@ public class TestConvertAvroToJSON { runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); } - private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter datumWriter, final GenericRecord... users) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, out); - for (final GenericRecord user : users) { - dataFileWriter.append(user); - } - - dataFileWriter.close(); - return out; - } - } diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java new file mode 100644 index 0000000000..474b34c8a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestExtractAvroMetadata.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +public class TestExtractAvroMetadata { + + static final String AVRO_SCHEMA_ATTR = "avro.schema"; + static final String AVRO_CODEC_ATTR = "avro.codec"; + + @Test + public void testDefaultExtraction() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema); + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "b2d1d8d3de2833ce"); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User"); + flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR); + flowFile.assertAttributeNotExists(ExtractAvroMetadata.ITEM_COUNT_ATTR); + } + + @Test + public void testExtractionWithItemCount() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true"); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithMultipleUsers(schema, 6000); // creates 2 blocks + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "6000"); + } + + @Test + public void testExtractionWithZeroUsers() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true"); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithMultipleUsers(schema, 0); + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "b2d1d8d3de2833ce"); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User"); + flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "0"); + } + + @Test + public void testExtractionWithMD5() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.FINGERPRINT_ALGORITHM, ExtractAvroMetadata.MD5); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema); + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "3c6a7bee8994be20314dd28c6a3af4f2"); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User"); + flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR); + } + + @Test + public void testExtractionWithSHA256() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.FINGERPRINT_ALGORITHM, ExtractAvroMetadata.SHA_256); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema); + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "683f8f51ecd208038f4f0d39820ee9dd0ef3e32a3bee9371de0a2016d501b113"); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User"); + flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR); + } + + @Test + public void testExtractionWithMetadataKey() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema); + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User"); + flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString()); + } + + @Test + public void testExtractionWithMetadataKeysWhitespace() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, "foo, bar, " + AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema); + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User"); + flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString()); + } + + @Test + public void testExtractionWithNonRecordSchema() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true"); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array.avsc")); + + final GenericData.Array data = new GenericData.Array<>(schema, Arrays.asList("one", "two", "three")); + final DatumWriter> datumWriter = new GenericDatumWriter<>(schema); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final DataFileWriter> dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.create(schema, out); + dataFileWriter.append(data); + dataFileWriter.append(data); + dataFileWriter.close(); + + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.ARRAY.getName()); + flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "array"); + flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "2"); // number of arrays, not elements + } + + @Test + public void testExtractionWithCodec() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, AVRO_CODEC_ATTR); // test dynamic attribute avro.codec + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array.avsc")); + + final GenericData.Array data = new GenericData.Array<>(schema, Arrays.asList("one", "two", "three")); + final DatumWriter> datumWriter = new GenericDatumWriter<>(schema); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final DataFileWriter> dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.setCodec(CodecFactory.deflateCodec(1)); + dataFileWriter.create(schema, out); + dataFileWriter.append(data); + dataFileWriter.close(); + + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals("avro.codec", "deflate"); + } + + @Test + public void testExtractionWithBadInput() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata()); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write("not avro".getBytes("UTF-8")); + out.flush(); + + runner.enqueue(out.toByteArray()); + runner.run(); + runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_FAILURE, 1); + } + + private ByteArrayOutputStream getOutputStreamWithOneUser(Schema schema) throws IOException { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "Alyssa"); + user.put("favorite_number", 256); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + return AvroTestUtil.serializeAvroRecord(schema, datumWriter, user); + } + + private ByteArrayOutputStream getOutputStreamWithMultipleUsers(Schema schema, int numUsers) throws IOException { + final GenericRecord[] users = new GenericRecord[numUsers]; + for (int i=0; i < numUsers; i++) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "user" + i); + user.put("favorite_number", i); + users[i] = user; + } + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + return AvroTestUtil.serializeAvroRecord(schema, datumWriter, users); + } +} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc new file mode 100644 index 0000000000..dace49a482 --- /dev/null +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/resources/array.avsc @@ -0,0 +1 @@ +{"type": "array", "items": "string"} \ No newline at end of file