NIFI-912 Initial version of ExtractAvroMetadata processor.

- Adding optional ability to extract record count
- Renaming record.count to item.count for clarity, and updating documentation
- Adding a test case with 0 records
- Removing validators from properties that use allowable values
This commit is contained in:
Bryan Bende 2015-08-31 18:03:50 -04:00
parent 3d4ce34529
commit 1007999415
7 changed files with 528 additions and 20 deletions

View File

@ -39,6 +39,10 @@
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
</dependency> </dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
@ -64,6 +68,7 @@
<configuration> <configuration>
<excludes combine.children="append"> <excludes combine.children="append">
<exclude>src/test/resources/user.avsc</exclude> <exclude>src/test/resources/user.avsc</exclude>
<exclude>src/test/resources/array.avsc</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.avro;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "schema", "metadata" })
@CapabilityDescription("Extracts metadata from the header of an Avro datafile.")
@WritesAttributes({
@WritesAttribute(attribute = "schema.type", description = "The type of the schema (i.e. record, enum, etc.)."),
@WritesAttribute(attribute = "schema.name", description = "Contains the name when the type is a record, enum or fixed, " +
"otherwise contains the name of the primitive type."),
@WritesAttribute(attribute = "schema.fingerprint", description = "The result of the Fingerprint Algorithm as a Hex string."),
@WritesAttribute(attribute = "item.count", description = "The total number of items in the datafile, only written if Count Items " +
"is set to true.")
})
public class ExtractAvroMetadata extends AbstractProcessor {
static final AllowableValue CRC_64_AVRO = new AllowableValue("CRC-64-AVRO");
static final AllowableValue MD5 = new AllowableValue("MD5");
static final AllowableValue SHA_256 = new AllowableValue("SHA-256");
static final PropertyDescriptor FINGERPRINT_ALGORITHM = new PropertyDescriptor.Builder()
.name("Fingerprint Algorithm")
.description("The algorithm used to generate the schema fingerprint. Available choices are based on the Avro recommended practices for " +
"fingerprint generation.")
.allowableValues(CRC_64_AVRO, MD5, SHA_256)
.defaultValue(CRC_64_AVRO.getValue())
.required(true)
.build();
static final PropertyDescriptor METADATA_KEYS = new PropertyDescriptor.Builder()
.name("Metadata Keys")
.description("A comma-separated list of keys indicating key/value pairs to extract from the Avro file header. The key 'avro.schema' can " +
"be used to extract the full schema in JSON format, and 'avro.codec' can be used to extract the codec name if one exists.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
static final PropertyDescriptor COUNT_ITEMS = new PropertyDescriptor.Builder()
.name("Count Items")
.description("If true the number of items in the datafile will be counted and stored in a FlowFile attribute 'item.count'. The counting is done " +
"by reading blocks and getting the number of items for each block, thus avoiding de-serializing. The items being counted will be the top-level " +
"items in the datafile. For example, with a schema of type record the items will be the records, and for a schema of type Array the items will " +
"be the arrays (not the number of entries in each array).")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after metadata has been extracted.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or metadata cannot be extracted for any reason")
.build();
static final String SCHEMA_TYPE_ATTR = "schema.type";
static final String SCHEMA_NAME_ATTR = "schema.name";
static final String SCHEMA_FINGERPRINT_ATTR = "schema.fingerprint";
static final String ITEM_COUNT_ATTR = "item.count";
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(FINGERPRINT_ALGORITHM);
properties.add(METADATA_KEYS);
properties.add(COUNT_ITEMS);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Map<String,String> avroMetadata = new HashMap<>();
final Set<String> requestedMetadataKeys = new HashSet<>();
final boolean countRecords = context.getProperty(COUNT_ITEMS).asBoolean();
final String fingerprintAlgorithm = context.getProperty(FINGERPRINT_ALGORITHM).getValue();
final String metadataKeysValue = context.getProperty(METADATA_KEYS).getValue();
if (!StringUtils.isEmpty(metadataKeysValue)) {
final String[] keys = metadataKeysValue.split("\\s*,\\s*");
for (final String key : keys) {
requestedMetadataKeys.add(key.trim());
}
}
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final Schema schema = reader.getSchema();
if (schema == null) {
throw new ProcessException("Avro schema was null");
}
for (String key : reader.getMetaKeys()) {
if (requestedMetadataKeys.contains(key)) {
avroMetadata.put(key, reader.getMetaString(key));
}
}
try {
final byte[] rawFingerprint = SchemaNormalization.parsingFingerprint(fingerprintAlgorithm, schema);
avroMetadata.put(SCHEMA_FINGERPRINT_ATTR, Hex.encodeHexString(rawFingerprint));
avroMetadata.put(SCHEMA_TYPE_ATTR, schema.getType().getName());
avroMetadata.put(SCHEMA_NAME_ATTR, schema.getName());
} catch (NoSuchAlgorithmException e) {
// shouldn't happen since allowable values are valid algorithms
throw new ProcessException(e);
}
if (countRecords) {
long recordCount = reader.getBlockCount();
try {
while (reader.nextBlock() != null) {
recordCount += reader.getBlockCount();
}
} catch (NoSuchElementException e) {
// happens at end of file
}
avroMetadata.put(ITEM_COUNT_ATTR, String.valueOf(recordCount));
}
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to extract Avro metadata for {} due to {}; transferring to failure", new Object[] {flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAllAttributes(flowFile, avroMetadata);
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.avro.ConvertAvroToJSON org.apache.nifi.processors.avro.ConvertAvroToJSON
org.apache.nifi.processors.avro.ExtractAvroMetadata

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import java.io.IOException;
public class AvroTestUtil {
public static ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter<GenericRecord> datumWriter, final GenericRecord... users) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, out);
for (final GenericRecord user : users) {
dataFileWriter.append(user);
}
}
return out;
}
}

View File

@ -16,22 +16,20 @@
*/ */
package org.apache.nifi.processors.avro; package org.apache.nifi.processors.avro;
import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DatumWriter;
import org.apache.nifi.processors.avro.ConvertAvroToJSON;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class TestConvertAvroToJSON { public class TestConvertAvroToJSON {
@Test @Test
@ -44,7 +42,7 @@ public class TestConvertAvroToJSON {
user1.put("favorite_number", 256); user1.put("favorite_number", 256);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1); final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
runner.enqueue(out1.toByteArray()); runner.enqueue(out1.toByteArray());
runner.run(); runner.run();
@ -69,7 +67,7 @@ public class TestConvertAvroToJSON {
user2.put("favorite_color", "red"); user2.put("favorite_color", "red");
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2); final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2);
runner.enqueue(out1.toByteArray()); runner.enqueue(out1.toByteArray());
runner.run(); runner.run();
@ -87,16 +85,4 @@ public class TestConvertAvroToJSON {
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
} }
private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter<GenericRecord> datumWriter, final GenericRecord... users) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, out);
for (final GenericRecord user : users) {
dataFileWriter.append(user);
}
dataFileWriter.close();
return out;
}
} }

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
public class TestExtractAvroMetadata {
static final String AVRO_SCHEMA_ATTR = "avro.schema";
static final String AVRO_CODEC_ATTR = "avro.codec";
@Test
public void testDefaultExtraction() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "b2d1d8d3de2833ce");
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR);
flowFile.assertAttributeNotExists(ExtractAvroMetadata.ITEM_COUNT_ATTR);
}
@Test
public void testExtractionWithItemCount() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithMultipleUsers(schema, 6000); // creates 2 blocks
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "6000");
}
@Test
public void testExtractionWithZeroUsers() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithMultipleUsers(schema, 0);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "b2d1d8d3de2833ce");
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "0");
}
@Test
public void testExtractionWithMD5() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.FINGERPRINT_ALGORITHM, ExtractAvroMetadata.MD5);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "3c6a7bee8994be20314dd28c6a3af4f2");
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR);
}
@Test
public void testExtractionWithSHA256() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.FINGERPRINT_ALGORITHM, ExtractAvroMetadata.SHA_256);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "683f8f51ecd208038f4f0d39820ee9dd0ef3e32a3bee9371de0a2016d501b113");
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeNotExists(AVRO_SCHEMA_ATTR);
}
@Test
public void testExtractionWithMetadataKey() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString());
}
@Test
public void testExtractionWithMetadataKeysWhitespace() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, "foo, bar, " + AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString());
}
@Test
public void testExtractionWithNonRecordSchema() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array.avsc"));
final GenericData.Array<String> data = new GenericData.Array<>(schema, Arrays.asList("one", "two", "three"));
final DatumWriter<GenericData.Array<String>> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final DataFileWriter<GenericData.Array<String>> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, out);
dataFileWriter.append(data);
dataFileWriter.append(data);
dataFileWriter.close();
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.ARRAY.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "array");
flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "2"); // number of arrays, not elements
}
@Test
public void testExtractionWithCodec() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, AVRO_CODEC_ATTR); // test dynamic attribute avro.codec
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array.avsc"));
final GenericData.Array<String> data = new GenericData.Array<>(schema, Arrays.asList("one", "two", "three"));
final DatumWriter<GenericData.Array<String>> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final DataFileWriter<GenericData.Array<String>> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.setCodec(CodecFactory.deflateCodec(1));
dataFileWriter.create(schema, out);
dataFileWriter.append(data);
dataFileWriter.close();
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals("avro.codec", "deflate");
}
@Test
public void testExtractionWithBadInput() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
final ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write("not avro".getBytes("UTF-8"));
out.flush();
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_FAILURE, 1);
}
private ByteArrayOutputStream getOutputStreamWithOneUser(Schema schema) throws IOException {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "Alyssa");
user.put("favorite_number", 256);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
return AvroTestUtil.serializeAvroRecord(schema, datumWriter, user);
}
private ByteArrayOutputStream getOutputStreamWithMultipleUsers(Schema schema, int numUsers) throws IOException {
final GenericRecord[] users = new GenericRecord[numUsers];
for (int i=0; i < numUsers; i++) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "user" + i);
user.put("favorite_number", i);
users[i] = user;
}
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
return AvroTestUtil.serializeAvroRecord(schema, datumWriter, users);
}
}

View File

@ -0,0 +1 @@
{"type": "array", "items": "string"}