From b603cb955dcd1d3d9b5e374e5760f2f9b047bda9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 26 Jun 2017 13:15:03 -0400 Subject: [PATCH] NIFI-4060: Initial implementation of MergeRecord NIFI-4060: Addressed threading issue with RecordBin being updated after it is completed; fixed issue that caused mime.type attribute not to be written properly if all incoming flowfiles already have a different value for that attribute NIFI-4060: Bug fixes; improved documentation; added a lot of debug information; updated StandardProcessSession to produce more accurate logs in case of a session being committed/rolled back with open input/output streams Signed-off-by: Matt Burgess This closes #1958 --- .../nifi/processor/util/bin/BinFiles.java | 14 +- .../record/CommaSeparatedRecordReader.java | 102 +++++ .../repository/StandardProcessSession.java | 19 +- .../processors/standard/MergeContent.java | 104 +---- .../nifi/processors/standard/MergeRecord.java | 358 +++++++++++++++ .../standard/merge/AttributeStrategy.java | 27 ++ .../standard/merge/AttributeStrategyUtil.java | 56 +++ .../merge/KeepCommonAttributeStrategy.java | 64 +++ .../merge/KeepUniqueAttributeStrategy.java | 58 +++ .../processors/standard/merge/RecordBin.java | 424 ++++++++++++++++++ .../standard/merge/RecordBinManager.java | 295 ++++++++++++ .../standard/merge/RecordBinThresholds.java | 69 +++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 229 ++++++++++ .../processors/standard/TestMergeContent.java | 9 +- .../processors/standard/TestMergeRecord.java | 360 +++++++++++++++ 16 files changed, 2076 insertions(+), 113 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index b15d23b65a..7f79b708c5 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -131,9 +131,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { * * @param context context * @param flowFile flowFile + * @param session the session for accessing the FlowFile * @return The appropriate group ID */ - protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile); + protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session); /** * Performs any additional setup of the bin manager. Called during the OnScheduled phase. @@ -271,8 +272,15 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { final Map> flowFileGroups = new HashMap<>(); for (FlowFile flowFile : flowFiles) { flowFile = this.preprocessFlowFile(context, session, flowFile); - final String groupingIdentifier = getGroupId(context, flowFile); - flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); + + try { + final String groupingIdentifier = getGroupId(context, flowFile, session); + flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); + } catch (final Exception e) { + getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e); + session.transfer(flowFile, REL_FAILURE); + continue; + } } for (final Map.Entry> entry : flowFileGroups.entrySet()) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java new file mode 100644 index 0000000000..8973055b5b --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; + +public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory { + private int failAfterN; + private int recordCount = 0; + + public CommaSeparatedRecordReader() { + this(-1); + } + + public CommaSeparatedRecordReader(final int failAfterN) { + this.failAfterN = failAfterN; + } + + public void failAfter(final int failAfterN) { + this.failAfterN = failAfterN; + } + + @Override + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + final List fields = new ArrayList<>(); + + final String headerLine = reader.readLine(); + for (final String colName : headerLine.split(",")) { + fields.add(new RecordField(colName.trim(), RecordFieldType.STRING.getDataType())); + } + + return new RecordReader() { + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (failAfterN > -1 && recordCount >= failAfterN) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + + final String nextLine = reader.readLine(); + if (nextLine == null) { + return null; + } + + recordCount++; + + final String[] values = nextLine.split(","); + final Map valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++].trim()); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index d34d8cf3f1..d2a6af67bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -191,13 +191,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE processingStartTime = System.nanoTime(); } - private void closeStreams(final Map streamMap) { + private void closeStreams(final Map streamMap, final String action, final String streamType) { final Map openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List for (final Map.Entry entry : openStreamCopy.entrySet()) { final FlowFile flowFile = entry.getKey(); final Closeable openStream = entry.getValue(); - LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile); + LOG.warn("{} closing {} for {} because the session was {} without the {} stream being closed.", this, openStream, flowFile, action, streamType); try { openStream.close(); @@ -212,8 +212,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetWriteClaims(false); - closeStreams(openInputStreams); - closeStreams(openOutputStreams); + closeStreams(openInputStreams, "committed", "input"); + closeStreams(openOutputStreams, "committed", "output"); if (!readRecursionSet.isEmpty()) { throw new IllegalStateException(); @@ -914,8 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE deleteOnCommit.clear(); - closeStreams(openInputStreams); - closeStreams(openOutputStreams); + closeStreams(openInputStreams, "rolled back", "input"); + closeStreams(openOutputStreams, "rolled back", "output"); try { claimCache.reset(); @@ -2171,7 +2171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); } - final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false); + final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn); final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); @@ -2470,7 +2470,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long bytesWritten = countingOut.getBytesWritten(); StandardProcessSession.this.bytesWritten += bytesWritten; - openOutputStreams.remove(sourceFlowFile); + final OutputStream removed = openOutputStreams.remove(sourceFlowFile); + if (removed == null) { + LOG.error("Closed Session's OutputStream but there was no entry for it in the map; sourceFlowFile={}; map={}", sourceFlowFile, openOutputStreams); + } flush(); removeTemporaryClaim(record); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 3401d665a3..edbc03323a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -82,6 +82,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.bin.Bin; import org.apache.nifi.processor.util.bin.BinFiles; import org.apache.nifi.processor.util.bin.BinManager; +import org.apache.nifi.processors.standard.merge.AttributeStrategy; +import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil; import org.apache.nifi.stream.io.NonCloseableOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FlowFilePackager; @@ -126,7 +128,7 @@ import org.apache.nifi.util.FlowFilePackagerV3; @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") }) -@SeeAlso(SegmentContent.class) +@SeeAlso({SegmentContent.class, MergeRecord.class}) public class MergeContent extends BinFiles { // preferred attributes @@ -201,8 +203,6 @@ public class MergeContent extends BinFiles { MERGE_FORMAT_AVRO_VALUE, "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile"); - public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes"; - public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes"; public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions"; public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; @@ -224,16 +224,6 @@ public class MergeContent extends BinFiles { .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO) .defaultValue(MERGE_FORMAT_CONCAT.getValue()) .build(); - public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder() - .required(true) - .name("Attribute Strategy") - .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any " - + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. " - + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same " - + "value, will be preserved.") - .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE) - .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON) - .build(); public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("Correlation Attribute Name") @@ -315,7 +305,7 @@ public class MergeContent extends BinFiles { final List descriptors = new ArrayList<>(); descriptors.add(MERGE_STRATEGY); descriptors.add(MERGE_FORMAT); - descriptors.add(ATTRIBUTE_STRATEGY); + descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY); descriptors.add(CORRELATION_ATTRIBUTE_NAME); descriptors.add(MIN_ENTRIES); descriptors.add(MAX_ENTRIES); @@ -378,7 +368,7 @@ public class MergeContent extends BinFiles { } @Override - protected String getGroupId(final ProcessContext context, final FlowFile flowFile) { + protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) { final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME) .evaluateAttributeExpressions(flowFile).getValue(); String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName); @@ -429,16 +419,7 @@ public class MergeContent extends BinFiles { throw new AssertionError(); } - final AttributeStrategy attributeStrategy; - switch (context.getProperty(ATTRIBUTE_STRATEGY).getValue()) { - case ATTRIBUTE_STRATEGY_ALL_UNIQUE: - attributeStrategy = new KeepUniqueAttributeStrategy(); - break; - case ATTRIBUTE_STRATEGY_ALL_COMMON: - default: - attributeStrategy = new KeepCommonAttributeStrategy(); - break; - } + final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context); final List contents = bin.getContents(); final ProcessSession binSession = bin.getSession(); @@ -989,76 +970,7 @@ public class MergeContent extends BinFiles { } } - private static class KeepUniqueAttributeStrategy implements AttributeStrategy { - @Override - public Map getMergedAttributes(final List flowFiles) { - final Map newAttributes = new HashMap<>(); - final Set conflicting = new HashSet<>(); - - for (final FlowFile flowFile : flowFiles) { - for (final Map.Entry attributeEntry : flowFile.getAttributes().entrySet()) { - final String name = attributeEntry.getKey(); - final String value = attributeEntry.getValue(); - - final String existingValue = newAttributes.get(name); - if (existingValue != null && !existingValue.equals(value)) { - conflicting.add(name); - } else { - newAttributes.put(name, value); - } - } - } - - for (final String attributeToRemove : conflicting) { - newAttributes.remove(attributeToRemove); - } - - // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent. - newAttributes.remove(CoreAttributes.UUID.key()); - return newAttributes; - } - } - - private static class KeepCommonAttributeStrategy implements AttributeStrategy { - - @Override - public Map getMergedAttributes(final List flowFiles) { - final Map result = new HashMap<>(); - - //trivial cases - if (flowFiles == null || flowFiles.isEmpty()) { - return result; - } else if (flowFiles.size() == 1) { - result.putAll(flowFiles.iterator().next().getAttributes()); - } - - /* - * Start with the first attribute map and only put an entry to the - * resultant map if it is common to every map. - */ - final Map firstMap = flowFiles.iterator().next().getAttributes(); - - outer: - for (final Map.Entry mapEntry : firstMap.entrySet()) { - final String key = mapEntry.getKey(); - final String value = mapEntry.getValue(); - - for (final FlowFile flowFile : flowFiles) { - final Map currMap = flowFile.getAttributes(); - final String curVal = currMap.get(key); - if (curVal == null || !curVal.equals(value)) { - continue outer; - } - } - result.put(key, value); - } - - // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent. - result.remove(CoreAttributes.UUID.key()); - return result; - } - } private static class FragmentComparator implements Comparator { @@ -1079,8 +991,4 @@ public class MergeContent extends BinFiles { List getUnmergedFlowFiles(); } - private interface AttributeStrategy { - - Map getMergedAttributes(List flowFiles); - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java new file mode 100644 index 0000000000..b0e3f48928 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.FlowFileFilters; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil; +import org.apache.nifi.processors.standard.merge.RecordBinManager; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; + + +@SideEffectFree +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"merge", "record", "content", "correlation", "stream", "event"}) +@CapabilityDescription("This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. " + + "This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into " + + "a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two " + + "FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the property " + + "is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the property is set to Defragment. " + + "All FlowFiles with the same value for this attribute will be bundled together."), + @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the property is set to Defragment. This " + + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " + + "in the given bundle."), +}) +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The merged FlowFile will have a 'record.count' attribute indicating the number of records " + + "that were written to the FlowFile."), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"), + @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), + @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " + + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"), + @WritesAttribute(attribute = "", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.") +}) +@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class}) +public class MergeRecord extends AbstractSessionFactoryProcessor { + // attributes for defragmentation + public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key(); + + public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; + public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; + + public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( + "Bin-Packing Algorithm", + "Bin-Packing Algorithm", + "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally " + + "their attributes (if the property is set)"); + public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue( + "Defragment", + "Defragment", + "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " + + "have the attributes and . All FlowFiles with the same value for \"fragment.identifier\" " + + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of " + + "the Records that are output is not guaranteed."); + + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder() + .name("merge-strategy") + .displayName("Merge Strategy") + .description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by " + + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily " + + "chosen FlowFiles") + .required(true) + .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT) + .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue()) + .build(); + public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("correlation-attribute-name") + .displayName("Correlation Attribute Name") + .description("If specified, two FlowFiles will be binned together only if they have the same value for " + + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .defaultValue(null) + .build(); + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("min-bin-size") + .displayName("Minimum Bin Size") + .description("The minimum size of for the bin") + .required(true) + .defaultValue("0 B") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("max-bin-size") + .displayName("Maximum Bin Size") + .description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, " + + "all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder() + .name("min-records") + .displayName("Minimum Number of Records") + .description("The minimum number of records to include in a bin") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder() + .name("max-records") + .displayName("Maximum Number of Records") + .description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, " + + "so this limit may be exceeded by up to the number of records in the last input FlowFile.") + .required(false) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() + .name("max.bin.count") + .displayName("Maximum Number of Bins") + .description("Specifies the maximum number of bins that can be held in memory at any one time. " + + "This number should not be smaller than the maximum number of conurrent threads for this Processor, " + + "or the bins that are created will often consist only of a single incoming FlowFile.") + .defaultValue("10") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder() + .name("max-bin-age") + .displayName("Max Bin Age") + .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is