NIFI-4192: Add more Avro/metadata merge options to MergeContent

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2067.
This commit is contained in:
Matt Burgess 2017-08-08 13:32:17 -04:00 committed by Pierre Villard
parent 67819e5019
commit 9c4fdd4ef3
2 changed files with 306 additions and 17 deletions

View File

@ -142,6 +142,33 @@ public class MergeContent extends BinFiles {
public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata",
"For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile.");
public static final AllowableValue METADATA_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Metadata", "Keep Only Common Metadata",
"For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata "
+ "will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");
public static final AllowableValue METADATA_STRATEGY_IGNORE = new AllowableValue("Ignore Metadata", "Ignore Metadata",
"Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata.");
public static final AllowableValue METADATA_STRATEGY_DO_NOT_MERGE = new AllowableValue("Do Not Merge Uncommon Metadata", "Do Not Merge Uncommon Metadata",
"For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");
public static final PropertyDescriptor METADATA_STRATEGY = new PropertyDescriptor.Builder()
.required(true)
.name("mergecontent-metadata-strategy")
.displayName("Metadata Strategy")
.description("For FlowFiles whose input format supports metadata (Avro, e.g.), this property determines which metadata should be added to the bundle. "
+ "If 'Use First Metadata' is selected, the metadata keys/values from the first FlowFile to be bundled will be used. If 'Keep Only Common Metadata' is selected, "
+ "only the metadata that exists on all FlowFiles in the bundle, with the same value, will be preserved. If 'Ignore Metadata' is selected, no metadata is transferred to "
+ "the outgoing bundled FlowFile. If 'Do Not Merge Uncommon Metadata' is selected, any FlowFile whose metadata values do not match those of the first bundled FlowFile "
+ "will not be merged.")
.allowableValues(METADATA_STRATEGY_USE_FIRST, METADATA_STRATEGY_ALL_COMMON, METADATA_STRATEGY_DO_NOT_MERGE, METADATA_STRATEGY_IGNORE)
.defaultValue(METADATA_STRATEGY_DO_NOT_MERGE.getValue())
.build();
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm",
"Bin-Packing Algorithm",
@ -307,6 +334,7 @@ public class MergeContent extends BinFiles {
descriptors.add(MERGE_FORMAT);
descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
descriptors.add(CORRELATION_ATTRIBUTE_NAME);
descriptors.add(METADATA_STRATEGY);
descriptors.add(MIN_ENTRIES);
descriptors.add(MAX_ENTRIES);
descriptors.add(MIN_SIZE);
@ -861,6 +889,7 @@ public class MergeContent extends BinFiles {
final ProcessSession session = bin.getSession();
final List<FlowFile> contents = bin.getContents();
final String metadataStrategy = context.getProperty(METADATA_STRATEGY).getValue();
final Map<String, byte[]> metadata = new TreeMap<>();
final AtomicReference<Schema> schema = new AtomicReference<>(null);
final AtomicReference<String> inputCodec = new AtomicReference<>(null);
@ -883,11 +912,13 @@ public class MergeContent extends BinFiles {
// this is the first file - set up the writer, and store the
// Schema & metadata we'll use.
schema.set(reader.getSchema());
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
metadata.put(key, metadatum);
writer.setMeta(key, metadatum);
if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) {
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
metadata.put(key, metadatum);
writer.setMeta(key, metadatum);
}
}
}
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
@ -905,19 +936,25 @@ public class MergeContent extends BinFiles {
unmerged.add(flowFile);
}
// check that we're appending to the same metadata
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
byte[] writersMetadatum = metadata.get(key);
if (!Arrays.equals(metadatum, writersMetadatum)) {
getLogger().debug("Input file {} has different non-reserved metadata, not merging",
new Object[]{flowFile.getId()});
canMerge = false;
unmerged.add(flowFile);
if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
|| METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) {
// check that we're appending to the same metadata
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
byte[] writersMetadatum = metadata.get(key);
if (!Arrays.equals(metadatum, writersMetadatum)) {
// Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge
if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) {
getLogger().debug("Input file {} has different non-reserved metadata, not merging",
new Object[]{flowFile.getId()});
canMerge = false;
unmerged.add(flowFile);
}
}
}
}
}
} // else the metadata in the first FlowFile was either ignored or retained in the if-clause above
// check that we're appending to the same codec
String thisCodec = reader.getMetaString(DataFileConstants.CODEC);

View File

@ -197,6 +197,251 @@ public class TestMergeContent {
Assert.assertTrue(places.containsKey("Some Place"));
}
@Test
public void testAvroConcatWithDifferentMetadataDoNotMerge() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_DO_NOT_MERGE);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(1, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
}
@Test
public void testAvroConcatWithDifferentMetadataIgnore() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_IGNORE);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(3, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("Ben"));
Assert.assertTrue(users.containsKey("John"));
}
@Test
public void testAvroConcatWithDifferentMetadataUseFirst() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_USE_FIRST);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(3, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("Ben"));
Assert.assertTrue(users.containsKey("John"));
}
@Test
public void testAvroConcatWithDifferentMetadataKeepCommon() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_ALL_COMMON);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final Map<String, String> userMeta1 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
}};
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final Map<String, String> userMeta2 = new HashMap<String, String>() {{
put("test_metadata1", "Test 2"); // Test non-matching values
}};
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final Map<String, String> userMeta3 = new HashMap<String, String>() {{
put("test_metadata1", "Test 1");
put("test_metadata2", "Test"); // Test unique
}};
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged content
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(2, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("John"));
}
private Map<String, GenericRecord> getGenericRecordMap(byte[] data, Schema schema, String key) throws IOException {
// create a reader for the merged contet
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
@ -213,8 +458,15 @@ public class TestMergeContent {
}
private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter) throws IOException {
return serializeAvroRecord(schema, user2, datumWriter, null);
}
private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter, Map<String, String> metadata) throws IOException {
ByteArrayOutputStream out2 = new ByteArrayOutputStream();
DataFileWriter<GenericRecord> dataFileWriter2 = new DataFileWriter<GenericRecord>(datumWriter);
DataFileWriter<GenericRecord> dataFileWriter2 = new DataFileWriter<>(datumWriter);
if (metadata != null) {
metadata.forEach(dataFileWriter2::setMeta);
}
dataFileWriter2.create(schema, out2);
dataFileWriter2.append(user2);
dataFileWriter2.close();