diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java new file mode 100644 index 0000000000..6f055da37b --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FragmentAttributes.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowfile.attributes; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +import java.util.HashMap; +import java.util.Map; + +/** + * This enum class contains flow file attribute keys commonly used among Split processors. + */ +public enum FragmentAttributes implements FlowFileAttributeKey { + + /** + * The number of bytes from the original FlowFile that were copied to this FlowFile, + * including header, if applicable, which is duplicated in each split FlowFile. + */ + FRAGMENT_SIZE("fragment.size"), + /** + * All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute. + */ + FRAGMENT_ID("fragment.identifier"), + /** + * A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile. + */ + FRAGMENT_INDEX("fragment.index"), + /** + * The number of split FlowFiles generated from the parent FlowFile. + */ + FRAGMENT_COUNT("fragment.count"), + /** + * The filename of the parent FlowFile. + */ + SEGMENT_ORIGINAL_FILENAME("segment.original.filename"); + + private final String key; + + FragmentAttributes(final String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + + public static FlowFile copyAttributesToOriginal(final ProcessSession processSession, final FlowFile originalFlowFile, + final String fragmentId, final int fragmentCount) { + final Map attributesToOriginal = new HashMap<>(); + if (fragmentId != null && fragmentId.length() > 0) { + attributesToOriginal.put(FRAGMENT_ID.key(), fragmentId); + } + attributesToOriginal.put(FRAGMENT_COUNT.key(), String.valueOf(fragmentCount)); + return processSession.putAllAttributes(originalFlowFile, attributesToOriginal); + } + +} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java index 83964fa720..3a2891777e 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java @@ -64,6 +64,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.BufferedOutputStream; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal; + @SideEffectFree @SupportsBatching @Tags({ "avro", "split" }) @@ -217,13 +223,14 @@ public class SplitAvro extends AbstractProcessor { final String fragmentIdentifier = UUID.randomUUID().toString(); IntStream.range(0, splits.size()).forEach((i) -> { FlowFile split = splits.get(i); - split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier); - split = session.putAttribute(split, "fragment.index", Integer.toString(i)); - split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key())); - split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size())); + split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier); + split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i)); + split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key())); + split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size())); session.transfer(split, REL_SPLIT); }); - session.transfer(flowFile, REL_ORIGINAL); + final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size()); + session.transfer(originalFlowFile, REL_ORIGINAL); } catch (ProcessException e) { getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e); session.transfer(flowFile, REL_FAILURE); diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java index 32d43e306b..c17842d51d 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java @@ -44,6 +44,8 @@ import java.util.HashMap; import java.util.List; import java.util.stream.IntStream; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; import static org.junit.Assert.assertEquals; public class TestSplitAvro { @@ -116,6 +118,9 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_SPLIT, 100); runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(FRAGMENT_ID.key()); + originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "100"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkDataFileSplitSize(flowFiles, 1, true); final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier"); @@ -123,7 +128,7 @@ public class TestSplitAvro { MockFlowFile flowFile = flowFiles.get(i); assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index"))); assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier")); - assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count"))); + assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute(FRAGMENT_COUNT.key()))); assertEquals(filename, flowFile.getAttribute("segment.original.filename")); }); } @@ -140,6 +145,7 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkDataFileSplitSize(flowFiles, 20, true); } @@ -156,6 +162,7 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkDataFileSplitSize(flowFiles, 100, true); } @@ -172,6 +179,7 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkDataFileSplitSize(flowFiles, 1, false); @@ -197,6 +205,7 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkBareRecordsSplitSize(flowFiles, 1, true); @@ -215,6 +224,7 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkBareRecordsSplitSize(flowFiles, 20, true); @@ -234,6 +244,7 @@ public class TestSplitAvro { runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5"); final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkBareRecordsSplitSize(flowFiles, 20, false); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 2db7bcca87..1ce6fb5586 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -65,6 +65,11 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal; + @SideEffectFree @SupportsBatching @SeeAlso(PutSQL.class) @@ -369,9 +374,9 @@ public class ConvertJSONToSQL extends AbstractProcessor { attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attributes.put("sql.table", tableName); - attributes.put("fragment.identifier", fragmentIdentifier); - attributes.put("fragment.count", String.valueOf(arrayNode.size())); - attributes.put("fragment.index", String.valueOf(i)); + attributes.put(FRAGMENT_ID.key(), fragmentIdentifier); + attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size())); + attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i)); if (catalog != null) { attributes.put("sql.catalog", catalog); @@ -381,6 +386,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { session.transfer(sqlFlowFile, REL_SQL); } + flowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size()); session.transfer(flowFile, REL_ORIGINAL); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 16bb911d02..f18416ecb7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -67,6 +67,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -127,15 +128,15 @@ import org.apache.nifi.util.FlowFilePackagerV3; public class MergeContent extends BinFiles { // preferred attributes - public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier"; - public static final String FRAGMENT_INDEX_ATTRIBUTE = "fragment.index"; - public static final String FRAGMENT_COUNT_ATTRIBUTE = "fragment.count"; + public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key(); // old style attributes public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier"; public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index"; public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count"; - public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( "Bin-Packing Algorithm", diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java index 32cd249d2b..9d809efde2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java @@ -17,8 +17,6 @@ package org.apache.nifi.processors.standard; import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -37,9 +35,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -49,7 +45,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; @EventDriven @SupportsBatching @@ -67,7 +62,7 @@ public class Notify extends AbstractProcessor { .name("Distributed Cache Service") .description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor") .required(true) - .identifiesControllerService(DistributedMapCacheClient.class) + .identifiesControllerService(AtomicDistributedMapCacheClient.class) .build(); // Selects the FlowFile attribute or expression, whose value is used as cache key @@ -80,6 +75,18 @@ public class Notify extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder() + .name("Signal Counter Name") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the signal counter name. " + + "Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " + + "of different types of events, such as success or failure, or destination data source names, etc.") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME) + .build(); + // Specifies an optional regex used to identify which attributes to cache public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder() .name("Attribute Cache Regex") @@ -103,9 +110,6 @@ public class Notify extends AbstractProcessor { private final Set relationships; - private final Serializer keySerializer = new StringSerializer(); - private final Serializer> valueSerializer = new FlowFileAttributesSerializer(); - public Notify() { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); @@ -117,6 +121,7 @@ public class Notify extends AbstractProcessor { protected List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(RELEASE_SIGNAL_IDENTIFIER); + descriptors.add(SIGNAL_COUNTER_NAME); descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_CACHE_REGEX); return descriptors; @@ -137,11 +142,12 @@ public class Notify extends AbstractProcessor { final ComponentLog logger = getLogger(); - // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support - final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support + final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(); // if the computed value is null, or empty, we transfer the flow file to failure relationship - if (StringUtils.isBlank(cacheKey)) { + if (StringUtils.isBlank(signalId)) { logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); @@ -149,7 +155,8 @@ public class Notify extends AbstractProcessor { } // the cache client used to interact with the distributed cache - final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class); + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); try { final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet()) @@ -164,10 +171,12 @@ public class Notify extends AbstractProcessor { } if (logger.isDebugEnabled()) { - logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile}); + logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile}); } - cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer); + // In case of ConcurrentModificationException, just throw the exception so that processor can + // retry after yielding for a while. + protocol.notify(signalId, counterName, 1, attributesToCache); session.transfer(flowFile, REL_SUCCESS); } catch (final IOException e) { @@ -177,15 +186,4 @@ public class Notify extends AbstractProcessor { } } - /** - * Simple string serializer, used for serializing the cache key - */ - public static class StringSerializer implements Serializer { - - @Override - public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { - out.write(value.getBytes(StandardCharsets.UTF_8)); - } - } - } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index adfac058f5..76901fe42a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessContext; @@ -161,9 +162,9 @@ public class PutSQL extends AbstractProcessor { private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); - private static final String FRAGMENT_ID_ATTR = "fragment.identifier"; - private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; - private static final String FRAGMENT_COUNT_ATTR = "fragment.count"; + private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key(); + private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key(); + private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key(); private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java index 4e25d3c6a7..1fc1feb69a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java @@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -80,11 +81,11 @@ public class SegmentContent extends AbstractProcessor { public static final String SEGMENT_ID = "segment.identifier"; public static final String SEGMENT_INDEX = "segment.index"; public static final String SEGMENT_COUNT = "segment.count"; - public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); - public static final String FRAGMENT_ID = "fragment.identifier"; - public static final String FRAGMENT_INDEX = "fragment.index"; - public static final String FRAGMENT_COUNT = "fragment.count"; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder() .name("Segment Size") @@ -180,6 +181,7 @@ public class SegmentContent extends AbstractProcessor { } session.transfer(segmentSet, REL_SEGMENTS); + flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, segmentId, totalSegments); session.transfer(flowFile, REL_ORIGINAL); if (totalSegments <= 10) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java index 52b124e21a..d20fe8c7cb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java @@ -50,6 +50,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -77,10 +78,10 @@ import org.apache.nifi.util.Tuple; public class SplitContent extends AbstractProcessor { // attribute keys - public static final String FRAGMENT_ID = "fragment.identifier"; - public static final String FRAGMENT_INDEX = "fragment.index"; - public static final String FRAGMENT_COUNT = "fragment.count"; - public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes"); static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text"); @@ -182,7 +183,7 @@ public class SplitContent extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { - final FlowFile flowFile = session.get(); + FlowFile flowFile = session.get(); if (flowFile == null) { return; } @@ -285,8 +286,9 @@ public class SplitContent extends AbstractProcessor { splitList.add(finalSplit); } - finishFragmentAttributes(session, flowFile, splitList); + final String fragmentId = finishFragmentAttributes(session, flowFile, splitList); session.transfer(splitList, REL_SPLITS); + flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, splitList.size()); session.transfer(flowFile, REL_ORIGINAL); if (splitList.size() > 10) { @@ -302,8 +304,9 @@ public class SplitContent extends AbstractProcessor { * @param session session * @param source source * @param splits splits + * @return generated fragment identifier for the splits */ - private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List splits) { + private String finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List splits) { final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); final String fragmentId = UUID.randomUUID().toString(); @@ -319,6 +322,7 @@ public class SplitContent extends AbstractProcessor { FlowFile newFF = session.putAllAttributes(ff, attributes); splits.add(newFF); } + return fragmentId; } static class HexStringPropertyValidator implements Validator { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index 003834ea00..e7df459ca4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -56,6 +56,12 @@ import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.PathNotFoundException; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal; + @EventDriven @SideEffectFree @SupportsBatching @@ -166,7 +172,7 @@ public class SplitJson extends AbstractJsonPathProcessor { @Override public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) { - final FlowFile original = processSession.get(); + FlowFile original = processSession.get(); if (original == null) { return; } @@ -202,8 +208,9 @@ public class SplitJson extends AbstractJsonPathProcessor { List resultList = (List) jsonPathResult; Map attributes = new HashMap<>(); - attributes.put("fragment.identifier", UUID.randomUUID().toString()); - attributes.put("fragment.count", Integer.toString(resultList.size())); + final String fragmentId = UUID.randomUUID().toString(); + attributes.put(FRAGMENT_ID.key(), fragmentId); + attributes.put(FRAGMENT_COUNT.key(), Integer.toString(resultList.size())); for (int i = 0; i < resultList.size(); i++) { Object resultSegment = resultList.get(i); @@ -213,11 +220,12 @@ public class SplitJson extends AbstractJsonPathProcessor { out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8)); } ); - attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key())); - attributes.put("fragment.index", Integer.toString(i)); + attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key())); + attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i)); processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT); } + original = copyAttributesToOriginal(processSession, original, fragmentId, resultList.size()); processSession.transfer(original, REL_ORIGINAL); logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()}); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index ddb770db4d..e57841f636 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -48,6 +48,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -86,11 +87,11 @@ import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo; public class SplitText extends AbstractProcessor { // attribute keys public static final String SPLIT_LINE_COUNT = "text.line.count"; - public static final String FRAGMENT_SIZE = "fragment.size"; - public static final String FRAGMENT_ID = "fragment.identifier"; - public static final String FRAGMENT_INDEX = "fragment.index"; - public static final String FRAGMENT_COUNT = "fragment.count"; - public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + public static final String FRAGMENT_SIZE = FragmentAttributes.FRAGMENT_SIZE.key(); + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder() .name("Line Split Count") @@ -250,8 +251,10 @@ public class SplitText extends AbstractProcessor { if (error.get()){ processSession.transfer(sourceFlowFile, REL_FAILURE); } else { - List splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession); - processSession.transfer(sourceFlowFile, REL_ORIGINAL); + final String fragmentId = UUID.randomUUID().toString(); + List splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession); + final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size()); + processSession.transfer(originalFlowFile, REL_ORIGINAL); if (!splitFlowFiles.isEmpty()) { processSession.transfer(splitFlowFiles, REL_SPLITS); } @@ -279,7 +282,8 @@ public class SplitText extends AbstractProcessor { * it signifies the header information and its contents will be included in * each and every computed split. */ - private List generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List computedSplitsInfo, ProcessSession processSession){ + private List generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo, + List computedSplitsInfo, ProcessSession processSession){ List splitFlowFiles = new ArrayList<>(); FlowFile headerFlowFile = null; long headerCrlfLength = 0; @@ -288,7 +292,6 @@ public class SplitText extends AbstractProcessor { headerCrlfLength = splitInfo.trimmedLength; } int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme - String fragmentId = UUID.randomUUID().toString(); if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) { FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java index 0f0032a9ca..502f7f3506 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java @@ -64,6 +64,12 @@ import org.xml.sax.Locator; import org.xml.sax.SAXException; import org.xml.sax.XMLReader; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal; + @EventDriven @SideEffectFree @SupportsBatching @@ -161,9 +167,9 @@ public class SplitXml extends AbstractProcessor { final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> { FlowFile split = session.create(original); split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8"))); - split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier); - split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement())); - split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key())); + split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier); + split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement())); + split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key())); splits.add(split); }, depth); @@ -188,12 +194,13 @@ public class SplitXml extends AbstractProcessor { session.remove(splits); } else { splits.forEach((split) -> { - split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get())); + split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(numberOfRecords.get())); session.transfer(split, REL_SPLIT); }); - session.transfer(original, REL_ORIGINAL); - logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()}); + final FlowFile originalToTransfer = copyAttributesToOriginal(session, original, fragmentIdentifier, numberOfRecords.get()); + session.transfer(originalToTransfer, REL_ORIGINAL); + logger.info("Split {} into {} FlowFiles", new Object[]{originalToTransfer, splits.size()}); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 0437ed1599..18015fab7d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -53,6 +53,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -97,10 +98,10 @@ import org.apache.nifi.util.FlowFileUnpackagerV3; @SeeAlso(MergeContent.class) public class UnpackContent extends AbstractProcessor { // attribute keys - public static final String FRAGMENT_ID = "fragment.identifier"; - public static final String FRAGMENT_INDEX = "fragment.index"; - public static final String FRAGMENT_COUNT = "fragment.count"; - public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename"; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute"; public static final String TAR_FORMAT_NAME = "tar"; @@ -262,6 +263,8 @@ public class UnpackContent extends AbstractProcessor { finishFragmentAttributes(session, flowFile, unpacked); } session.transfer(unpacked, REL_SUCCESS); + final String fragmentId = unpacked.size() > 0 ? unpacked.get(0).getAttribute(FRAGMENT_ID) : null; + flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size()); session.transfer(flowFile, REL_ORIGINAL); session.getProvenanceReporter().fork(flowFile, unpacked); logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index 4a4f8033bd..0bd5ca68b1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -17,8 +17,6 @@ package org.apache.nifi.processors.standard; import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; @@ -35,15 +34,13 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -53,19 +50,30 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; @EventDriven @SupportsBatching @Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal " - + "is stored in the distributed cache from a corresponding Notify processor. At this point, a waiting FlowFile is routed to " - + "the 'success' relationship, with attributes copied from the FlowFile that produced " - + "the release signal from the Notify processor. The release signal entry is then removed from " - + "the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration.") -@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the " - + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.") + + "is stored in the distributed cache from a corresponding Notify processor. " + + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, " + + "with attributes copied from the FlowFile that produced the release signal from the Notify processor. " + + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. " + + + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. " + + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. " + + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to " + + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value " + + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor." +) +@WritesAttributes({ + @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the " + + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile."), + @WritesAttribute(attribute = "wait.counter.", description = "If a signal exists when the processor runs, " + + "each count value in the signal is copied.") +}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.Notify"}) public class Wait extends AbstractProcessor { @@ -77,7 +85,7 @@ public class Wait extends AbstractProcessor { .name("Distributed Cache Service") .description("The Controller Service that is used to check for release signals from a corresponding Notify processor") .required(true) - .identifiesControllerService(DistributedMapCacheClient.class) + .identifiesControllerService(AtomicDistributedMapCacheClient.class) .build(); // Selects the FlowFile attribute or expression, whose value is used as cache key @@ -90,6 +98,29 @@ public class Wait extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder() + .name("Target Signal Count") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the target signal count. " + + "This processor checks whether the signal count has reached this number. " + + "If Signal Counter Name is specified, this processor checks a particular counter, " + + "otherwise checks against total count in a signal.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder() + .name("Signal Counter Name") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the signal counter name. " + + "If not specified, this processor checks the total count in a signal.") + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); + // Selects the FlowFile attribute or expression, whose value is used as cache key public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder() .name("Expiration Duration") @@ -136,9 +167,6 @@ public class Wait extends AbstractProcessor { .build(); private final Set relationships; - private final Serializer keySerializer = new StringSerializer(); - private final Deserializer> valueDeserializer = new FlowFileAttributesSerializer(); - public Wait() { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); @@ -152,6 +180,8 @@ public class Wait extends AbstractProcessor { protected List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(RELEASE_SIGNAL_IDENTIFIER); + descriptors.add(TARGET_SIGNAL_COUNT); + descriptors.add(SIGNAL_COUNTER_NAME); descriptors.add(EXPIRATION_DURATION); descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_COPY_MODE); @@ -173,11 +203,11 @@ public class Wait extends AbstractProcessor { final ComponentLog logger = getLogger(); - // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support - final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support + final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); // if the computed value is null, or empty, we transfer the flow file to failure relationship - if (StringUtils.isBlank(cacheKey)) { + if (StringUtils.isBlank(signalId)) { logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); @@ -185,9 +215,17 @@ public class Wait extends AbstractProcessor { } // the cache client used to interact with the distributed cache - final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class); + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); + final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode); + + Signal signal = null; try { + // get notifying signal + signal = protocol.getSignal(signalId); + // check for expiration String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP); if (waitStartTimestamp == null) { @@ -201,6 +239,8 @@ public class Wait extends AbstractProcessor { } catch (NumberFormatException nfe) { logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile}); flowFile = session.penalize(flowFile); + + flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); session.transfer(flowFile, REL_FAILURE); return; } @@ -209,58 +249,83 @@ public class Wait extends AbstractProcessor { long now = System.currentTimeMillis(); if (now > (lWaitStartTimestamp + expirationDuration)) { logger.warn("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)}); + flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); session.transfer(flowFile, REL_EXPIRED); return; } - // get notifying flow file attributes - Map cachedAttributes = cache.get(cacheKey, keySerializer, valueDeserializer); - - if (cachedAttributes == null) { + if (signal == null) { + // If there's no signal yet, then we don't have to evaluate target counts. Return immediately. if (logger.isDebugEnabled()) { - logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {cacheKey, flowFile}); + logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile}); } session.transfer(flowFile, REL_WAIT); return; } - // copy over attributes from release signal flow file, if provided - if (!cachedAttributes.isEmpty()) { - cachedAttributes.remove("uuid"); - String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); - if (ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode)) { - flowFile = session.putAllAttributes(flowFile, cachedAttributes); - } else { - Map attributesToCopy = new HashMap<>(); - for(Entry entry : cachedAttributes.entrySet()) { - // if the current flow file does *not* have the cached attribute, copy it - if (flowFile.getAttribute(entry.getKey()) == null) { - attributesToCopy.put(entry.getKey(), entry.getValue()); - } - } - flowFile = session.putAllAttributes(flowFile, attributesToCopy); + final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue()); + final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName) + ? signal.isTotalCountReached(targetCount) + : signal.isCountReached(targetCounterName, targetCount); + + if (!reachedToTargetCount) { + if (logger.isDebugEnabled()) { + logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}", + new Object[] {targetCounterName, targetCount, signalId, flowFile}); } + flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); + session.transfer(flowFile, REL_WAIT); + return; } + + flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); session.transfer(flowFile, REL_SUCCESS); - cache.remove(cacheKey, keySerializer); + protocol.complete(signalId); + + } catch (final NumberFormatException e) { + flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}); + } catch (final IOException e) { + flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e}); } } - /** - * Simple string serializer, used for serializing the cache key - */ - public static class StringSerializer implements Serializer { - - @Override - public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { - out.write(value.getBytes(StandardCharsets.UTF_8)); + private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) { + if (signal == null) { + return flowFile; } + + // copy over attributes from release signal flow file, if provided + final Map attributesToCopy; + if (replaceOriginal) { + attributesToCopy = new HashMap<>(signal.getAttributes()); + attributesToCopy.remove("uuid"); + } else { + // if the current flow file does *not* have the cached attribute, copy it + attributesToCopy = signal.getAttributes().entrySet().stream() + .filter(e -> flowFile.getAttribute(e.getKey()) == null) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + } + + // Copy counter attributes + final Map counts = signal.getCounts(); + final long totalCount = counts.entrySet().stream().mapToLong(e -> { + final Long count = e.getValue(); + attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count)); + return count; + }).sum(); + attributesToCopy.put("wait.counter.total", String.valueOf(totalCount)); + + return session.putAllAttributes(flowFile, attributesToCopy); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java new file mode 100644 index 0000000000..a74590a200 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Map; + +/** + * This class provide a protocol for Wait and Notify processors to work together. + * Once AtomicDistributedMapCacheClient is passed to this protocol, components that wish to join the notification mechanism + * should only use methods provided by this protocol, instead of calling cache API directly. + */ +public class WaitNotifyProtocol { + + private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class); + + public static final String DEFAULT_COUNT_NAME = "default"; + private static final int MAX_REPLACE_RETRY_COUNT = 5; + private static final int REPLACE_RETRY_WAIT_MILLIS = 10; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final Serializer stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8)); + private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8); + + public static class Signal { + private Map counts = new HashMap<>(); + private Map attributes = new HashMap<>(); + + public Map getCounts() { + return counts; + } + + public void setCounts(Map counts) { + this.counts = counts; + } + + public Map getAttributes() { + return attributes; + } + + public void setAttributes(Map attributes) { + this.attributes = attributes; + } + + public boolean isTotalCountReached(final long targetCount) { + final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum(); + return totalCount >= targetCount; + } + + public boolean isCountReached(final String counterName, final long targetCount) { + return getCount(counterName) >= targetCount; + } + + public long getCount(final String counterName) { + final Long count = counts.get(counterName); + return count != null ? count : -1; + } + + } + + private final AtomicDistributedMapCacheClient cache; + + public WaitNotifyProtocol(final AtomicDistributedMapCacheClient cache) { + this.cache = cache; + } + + /** + * Notify a signal to increase a counter. + * @param signalId a key in the underlying cache engine + * @param counterName specify count to update + * @param delta delta to update a counter + * @param attributes attributes to save in the cache entry + * @return A Signal instance, merged with an existing signal if any + * @throws IOException thrown when it failed interacting with the cache engine + * @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts + */ + public Signal notify(final String signalId, final String counterName, final int delta, final Map attributes) + throws IOException, ConcurrentModificationException { + + for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) { + + final CacheEntry existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer); + + Signal signal = getSignal(signalId); + if (signal == null) { + signal = new Signal(); + } + + if (attributes != null) { + signal.attributes.putAll(attributes); + } + + long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0; + count += delta; + signal.counts.put(counterName, count); + + final String signalJson = objectMapper.writeValueAsString(signal); + final long revision = existingEntry != null ? existingEntry.getRevision() : -1; + + + if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) { + return signal; + } + + long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1); + logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName); + try { + Thread.sleep(waitMillis); + } catch (InterruptedException e) { + final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName); + throw new ConcurrentModificationException(msg, e); + } + } + + final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT); + throw new ConcurrentModificationException(msg); + } + + /** + * Retrieve a stored Signal in the cache engine. + * If a caller gets satisfied with the returned Signal state and finish waiting, it should call {@link #complete(String)} + * to complete the Wait Notify protocol. + * @param signalId a key in the underlying cache engine + * @return A Signal instance + * @throws IOException thrown when it failed interacting with the cache engine + * @throws DeserializationException thrown if the cache found is not in expected serialized format + */ + public Signal getSignal(final String signalId) throws IOException, DeserializationException { + + final CacheEntry entry = cache.fetch(signalId, stringSerializer, stringDeserializer); + + if (entry == null) { + // No signal found. + return null; + } + + final String value = entry.getValue(); + + try { + return objectMapper.readValue(value, Signal.class); + } catch (final JsonParseException jsonE) { + // Try to read it as FlowFileAttributes for backward compatibility. + try { + final Map attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8)); + final Signal signal = new Signal(); + signal.setAttributes(attributes); + signal.getCounts().put(DEFAULT_COUNT_NAME, 1L); + return signal; + } catch (Exception attrE) { + final String msg = String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"", + signalId, jsonE.getMessage(), attrE.getMessage()); + throw new DeserializationException(msg); + } + } + } + + /** + * Finish protocol and remove the cache entry. + * @param signalId a key in the underlying cache engine + * @throws IOException thrown when it failed interacting with the cache engine + */ + public void complete(final String signalId) throws IOException { + cache.remove(signalId, stringSerializer); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index 7e95adab97..f808072a04 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -37,6 +37,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; + public class TestConvertJSONToSQL { static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; @@ -70,6 +73,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -106,6 +110,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -140,6 +145,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -175,6 +181,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); @@ -210,6 +217,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); final List mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); for (final MockFlowFile mff : mffs) { @@ -245,6 +253,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5); final List mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL); for (final MockFlowFile mff : mffs) { @@ -279,6 +288,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); @@ -314,6 +324,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR)); @@ -349,6 +360,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); @@ -384,6 +396,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -420,6 +433,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -455,6 +469,9 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(FRAGMENT_ID.key()); + originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -590,6 +607,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -625,6 +643,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -687,6 +706,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); @@ -724,6 +744,7 @@ public class TestConvertJSONToSQL { runner.run(); runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0); out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index 37c4c436ae..e5c183f923 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -16,26 +16,29 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.apache.nifi.distributed.cache.client.StandardCacheEntry; +import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class TestNotify { private TestRunner runner; @@ -66,8 +69,47 @@ public class TestNotify { runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); runner.clearTransferState(); - Map cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer()); + final Signal signal = new WaitNotifyProtocol(service).getSignal("1"); + Map cachedAttributes = signal.getAttributes(); assertEquals("value", cachedAttributes.get("key")); + assertTrue(signal.isTotalCountReached(1)); + } + + @Test + public void testNotifyCounters() throws InitializationException, IOException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); + runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); + + final Map props1 = new HashMap<>(); + props1.put("releaseSignalAttribute", "someDataProcessing"); + props1.put("key", "data1"); + props1.put("status", "success"); + runner.enqueue(new byte[]{}, props1); + + final Map props2 = new HashMap<>(); + props2.put("releaseSignalAttribute", "someDataProcessing"); + props2.put("key", "data2"); + props2.put("status", "success"); + runner.enqueue(new byte[]{}, props2); + + final Map props3 = new HashMap<>(); + props3.put("releaseSignalAttribute", "someDataProcessing"); + props3.put("key", "data3"); + props3.put("status", "failure"); + runner.enqueue(new byte[]{}, props3); + + runner.run(3); + + runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3); + runner.clearTransferState(); + + final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); + Map cachedAttributes = signal.getAttributes(); + assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key")); + assertTrue(signal.isTotalCountReached(3)); + assertEquals(2, signal.getCount("success")); + assertEquals(1, signal.getCount("failure")); } @Test @@ -86,9 +128,11 @@ public class TestNotify { runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); runner.clearTransferState(); - Map cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer()); + final Signal signal = new WaitNotifyProtocol(service).getSignal("1"); + Map cachedAttributes = signal.getAttributes(); assertEquals("value", cachedAttributes.get("key1")); assertNull(cachedAttributes.get("other.key1")); + assertTrue(signal.isTotalCountReached(1)); } @Test @@ -121,11 +165,11 @@ public class TestNotify { service.setFailOnCalls(false); } - private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { - private final ConcurrentMap values = new ConcurrentHashMap<>(); + static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { + private final ConcurrentMap values = new ConcurrentHashMap<>(); private boolean failOnCalls = false; - public void setFailOnCalls(boolean failOnCalls){ + void setFailOnCalls(boolean failOnCalls){ this.failOnCalls = failOnCalls; } @@ -136,42 +180,47 @@ public class TestNotify { } } + private void unsupported() throws UnsupportedOperationException { + throw new UnsupportedOperationException("This method shouldn't be used from Notify processor."); + } + @Override public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - verifyNotFail(); - final Object retValue = values.putIfAbsent(key, value); - return (retValue == null); + unsupported(); + return false; } @Override @SuppressWarnings("unchecked") public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { - verifyNotFail(); - return (V) values.putIfAbsent(key, value); + unsupported(); + return null; } @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { - verifyNotFail(); - return values.containsKey(key); + unsupported(); + return false; } @Override public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - verifyNotFail(); - values.put(key, value); + unsupported(); } @Override @SuppressWarnings("unchecked") public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { verifyNotFail(); - if(values.containsKey(key)) { - return (V) values.get(key); - } else { + + final CacheEntry entry = values.get(key); + if (entry == null) { return null; } + + // This mock cache stores String as it is, without serializing, so it needs to convert it to byte[] first here. + return valueDeserializer.deserialize(((String)entry.getValue()).getBytes(StandardCharsets.UTF_8)); } @Override @@ -181,7 +230,28 @@ public class TestNotify { @Override public boolean remove(final K key, final Serializer serializer) throws IOException { verifyNotFail(); - values.remove(key); + return values.remove(key) != null; + } + + @Override + @SuppressWarnings("unchecked") + public CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + verifyNotFail(); + + return values.get(key); + } + + @Override + public boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { + verifyNotFail(); + + final CacheEntry existing = values.get(key); + if (existing != null && existing.getRevision() != revision) { + return false; + } + + values.put(key, new StandardCacheEntry<>(key, value, revision + 1)); + return true; } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java index 5a883231cc..74fad06952 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java @@ -37,6 +37,11 @@ public class TestSegmentContent { testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}); testRunner.run(); + testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID); + originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "3"); + final List flowFiles = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS); assertEquals(3, flowFiles.size()); @@ -57,6 +62,11 @@ public class TestSegmentContent { testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}); testRunner.run(); + testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID); + originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "1"); + testRunner.assertTransferCount(SegmentContent.REL_SEGMENTS, 1); final MockFlowFile out1 = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS).get(0); out1.assertContentEquals(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java index 6d9fba9b1b..0dec5a12c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java @@ -25,6 +25,9 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; +import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT; +import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID; + public class TestSplitContent { @Test @@ -39,6 +42,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitContent.REL_SPLITS, 4); runner.assertQueueEmpty(); @@ -62,6 +66,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitContent.REL_SPLITS, 4); runner.assertQueueEmpty(); @@ -76,6 +81,7 @@ public class TestSplitContent { runner.enqueue(input); runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitContent.REL_SPLITS, 4); splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); splits.get(0).assertContentEquals("This is a "); @@ -89,6 +95,7 @@ public class TestSplitContent { runner.enqueue(input); runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitContent.REL_SPLITS, 4); splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); splits.get(0).assertContentEquals("This is a test"); @@ -102,6 +109,7 @@ public class TestSplitContent { runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes()); runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitContent.REL_SPLITS, 4); splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); splits.get(0).assertContentEquals("This is a test"); @@ -115,6 +123,7 @@ public class TestSplitContent { runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes()); runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "5"); runner.assertTransferCount(SplitContent.REL_SPLITS, 5); splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); splits.get(0).assertContentEquals("This is a "); @@ -138,6 +147,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "3"); runner.assertTransferCount(SplitContent.REL_SPLITS, 3); runner.assertQueueEmpty(); @@ -157,6 +167,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitContent.REL_SPLITS, 2); runner.assertQueueEmpty(); @@ -178,6 +189,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitContent.REL_SPLITS, 2); runner.assertQueueEmpty(); @@ -202,6 +214,9 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(FRAGMENT_ID); + originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitContent.REL_SPLITS, 2); runner.assertQueueEmpty(); @@ -227,6 +242,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitContent.REL_SPLITS, 2); runner.assertQueueEmpty(); @@ -249,6 +265,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); runner.assertTransferCount(SplitContent.REL_SPLITS, 1); runner.assertQueueEmpty(); @@ -269,6 +286,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); runner.assertTransferCount(SplitContent.REL_SPLITS, 1); runner.assertQueueEmpty(); @@ -289,6 +307,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); runner.assertTransferCount(SplitContent.REL_SPLITS, 1); runner.assertQueueEmpty(); @@ -309,6 +328,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitContent.REL_SPLITS, 2); runner.assertQueueEmpty(); @@ -327,6 +347,7 @@ public class TestSplitContent { runner.run(); runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitContent.REL_SPLITS, 2); runner.assertQueueEmpty(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java index 4e0d99927f..4efd82d9bd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java @@ -35,6 +35,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashMap; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; + public class TestSplitJson { private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json"); @@ -86,6 +91,7 @@ public class TestSplitJson { testRunner.run(); testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); + testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1"); testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1); testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET); testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0"); @@ -102,6 +108,7 @@ public class TestSplitJson { int numSplitsExpected = 10; testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); + testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), String.valueOf(numSplitsExpected)); testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected); final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0); originalOut.assertContentEquals(JSON_SNIPPET); @@ -120,18 +127,21 @@ public class TestSplitJson { testRunner.run(); testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(FRAGMENT_ID.key()); + originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7"); + originalFlowFile.assertContentEquals(JSON_SNIPPET); testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7); - testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET); MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0); flowFile.assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}"); - flowFile.assertAttributeEquals("fragment.count", "7"); - flowFile.assertAttributeEquals("fragment.index", "0"); - flowFile.assertAttributeEquals("segment.original.filename", "test.json"); + flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7"); + flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), "0"); + flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.json"); flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(6); - flowFile.assertAttributeEquals("fragment.count", "7"); - flowFile.assertAttributeEquals("fragment.index", "6"); - flowFile.assertAttributeEquals("segment.original.filename", "test.json"); + flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7"); + flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), "6"); + flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.json"); } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java index 3951e021e4..8e4c8815d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java @@ -50,6 +50,9 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "3"); + originalFlowFile.assertAttributeExists(SplitText.FRAGMENT_ID); runner.assertTransferCount(SplitText.REL_SPLITS, 3); } @@ -80,6 +83,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "2"); runner.assertTransferCount(SplitText.REL_SPLITS, 2); final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -87,7 +91,7 @@ public class TestSplitText { splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "86"); splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "3"); splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "54"); - final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier"); + final String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID); for (int i = 0; i < splits.size(); i++) { final MockFlowFile split = splits.get(i); split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i+1)); @@ -110,6 +114,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "3"); runner.assertTransferCount(SplitText.REL_SPLITS, 3); final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -119,7 +124,7 @@ public class TestSplitText { splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "55"); splits.get(2).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1"); splits.get(2).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "23"); - final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier"); + final String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID); for (int i = 0; i < splits.size(); i++) { final MockFlowFile split = splits.get(i); split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i + 1)); @@ -152,6 +157,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "6"); runner.assertTransferCount(SplitText.REL_SPLITS, 6); final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -176,6 +182,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1"); runner.assertTransferCount(SplitText.REL_SPLITS, 1); // repeat with header cou8nt versus header marker @@ -189,6 +196,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1"); runner.assertTransferCount(SplitText.REL_SPLITS, 1); // repeat single header line with no newline characters @@ -202,6 +210,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1"); runner.assertTransferCount(SplitText.REL_SPLITS, 1); } @@ -218,10 +227,11 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitText.REL_SPLITS, 2); List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); - String fragmentUUID = splits.get(0).getAttribute("fragment.identifier"); + String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID); for (int i = 0; i < splits.size(); i++) { final MockFlowFile split = splits.get(i); split.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5"); @@ -244,10 +254,11 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitText.REL_SPLITS, 2); splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); - fragmentUUID = splits.get(0).getAttribute("fragment.identifier"); + fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID); splits.get(0).assertContentEquals("Header Line #1\nHeader Line #2\nLine #1\nLine #2\nLine #3\nLine #4\nLine #5\n"); splits.get(1).assertContentEquals("Line #6\nLine #7\nLine #8\nLine #9\nLine #10"); splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7"); @@ -275,6 +286,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitText.REL_SPLITS, 4); final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -292,6 +304,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitText.REL_SPLITS, 2); final List splitsWithNoHeader = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -311,6 +324,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitText.REL_SPLITS, 4); final List splits =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -328,6 +342,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); runner.assertTransferCount(SplitText.REL_SPLITS, 2); final List splitsWithNoHeader =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -394,6 +409,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4"); runner.assertTransferCount(SplitText.REL_SPLITS, 4); final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); @@ -420,6 +436,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "11"); runner.assertTransferCount(SplitText.REL_SPLITS, 11); final List splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java index 2157ab853a..281212f3bd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java @@ -37,6 +37,11 @@ import org.xml.sax.InputSource; import org.xml.sax.SAXException; import org.xml.sax.helpers.DefaultHandler; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; + public class TestSplitXml { SAXParserFactory factory; @@ -67,15 +72,18 @@ public class TestSplitXml { }); runner.run(); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(FRAGMENT_ID.key()); + originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "6"); runner.assertTransferCount(SplitXml.REL_SPLIT, 6); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL)); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT)); Arrays.asList(0, 1, 2, 3, 4, 5).forEach((index) -> { final MockFlowFile flowFile = runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT).get(index); - flowFile.assertAttributeEquals("fragment.index", Integer.toString(index)); - flowFile.assertAttributeEquals("fragment.count", "6"); - flowFile.assertAttributeEquals("segment.original.filename", "test.xml"); + flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), Integer.toString(index)); + flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "6"); + flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.xml"); }); } @@ -86,6 +94,7 @@ public class TestSplitXml { runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1")); runner.run(); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "12"); runner.assertTransferCount(SplitXml.REL_SPLIT, 12); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL)); @@ -99,6 +108,7 @@ public class TestSplitXml { runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1")); runner.run(); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "12"); runner.assertTransferCount(SplitXml.REL_SPLIT, 12); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL)); @@ -118,6 +128,7 @@ public class TestSplitXml { runner.enqueue(Paths.get("src/test/resources/TestXml/namespace.xml")); runner.run(); runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1); + runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "2"); runner.assertTransferCount(SplitXml.REL_SPLIT, 2); parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java index e91a6e6f56..587d050ee6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.standard; +import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT; +import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -57,10 +59,14 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2"); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2"); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -95,10 +101,14 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1"); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1"); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -138,10 +148,14 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2"); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2"); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -174,10 +188,14 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1"); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1"); + autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1"); autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -211,6 +229,8 @@ public class TestUnpackContent { runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); + runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -235,6 +255,8 @@ public class TestUnpackContent { runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4); runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2); + runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); + runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2"); runner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -258,6 +280,7 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2"); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); @@ -292,6 +315,9 @@ public class TestUnpackContent { unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2); unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(FRAGMENT_ID); + originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2"); unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0); final List unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index f42740b81c..e1117d53eb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -24,14 +24,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.apache.nifi.processors.standard.TestNotify.MockCacheClient; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -60,7 +53,7 @@ public class TestWait { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "1"); - runner.enqueue(new byte[] {},props); + runner.enqueue(new byte[]{}, props); runner.run(); @@ -76,7 +69,7 @@ public class TestWait { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "1"); - runner.enqueue(new byte[] {},props); + runner.enqueue(new byte[]{}, props); runner.run(); @@ -93,6 +86,45 @@ public class TestWait { runner.clearTransferState(); } + @Test + public void testCounterExpired() throws InitializationException, InterruptedException, IOException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "5"); + runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "notification-id"); + runner.enqueue(new byte[]{}, props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + + runner.clearTransferState(); + runner.enqueue(ff); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + final Map signalAttributes = new HashMap<>(); + signalAttributes.put("signal-attr-1", "signal-attr-1-value"); + signalAttributes.put("signal-attr-2", "signal-attr-2-value"); + protocol.notify("notification-id", "counter-A", 1, signalAttributes); + protocol.notify("notification-id", "counter-B", 2, signalAttributes); + + Thread.sleep(101L); + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1); + ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0); + // Even if wait didn't complete, signal attributes should be set + ff.assertAttributeEquals("wait.counter.total", "3"); + ff.assertAttributeEquals("wait.counter.counter-A", "1"); + ff.assertAttributeEquals("wait.counter.counter-B", "2"); + ff.assertAttributeEquals("signal-attr-1", "signal-attr-1-value"); + ff.assertAttributeEquals("signal-attr-2", "signal-attr-2-value"); + runner.clearTransferState(); + } + @Test public void testBadWaitStartTimestamp() throws InitializationException, InterruptedException { runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); @@ -101,7 +133,7 @@ public class TestWait { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "1"); props.put("wait.start.timestamp", "blue bunny"); - runner.enqueue(new byte[] {},props); + runner.enqueue(new byte[]{}, props); runner.run(); @@ -114,11 +146,12 @@ public class TestWait { runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); final Map props = new HashMap<>(); - runner.enqueue(new byte[] {},props); + runner.enqueue(new byte[]{}, props); runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total"); runner.clearTransferState(); } @@ -129,12 +162,13 @@ public class TestWait { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "2"); - runner.enqueue(new byte[] {}, props); + runner.enqueue(new byte[]{}, props); runner.run(); //Expect the processor to receive an IO exception from the cache service and route to failure runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); runner.assertTransferCount(Wait.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total"); service.setFailOnCalls(false); } @@ -146,7 +180,10 @@ public class TestWait { cachedAttributes.put("uuid", "notifyUuid"); cachedAttributes.put("notify.only", "notifyValue"); - service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer()); + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + protocol.notify("key", "default", 1, cachedAttributes); + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_REPLACE.getValue()); @@ -159,7 +196,7 @@ public class TestWait { runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); // make sure the key is in the cache before Wait runs - assertNotNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer())); + assertNotNull(protocol.getSignal("key")); runner.run(); @@ -180,7 +217,7 @@ public class TestWait { runner.clearTransferState(); // make sure Wait removed this key from the cache - assertNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer())); + assertNull(protocol.getSignal("key")); } @Test @@ -190,7 +227,10 @@ public class TestWait { cachedAttributes.put("uuid", "notifyUuid"); cachedAttributes.put("notify.only", "notifyValue"); - service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer()); + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + protocol.notify("key", "default", 1, cachedAttributes); + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()); @@ -221,70 +261,181 @@ public class TestWait { runner.clearTransferState(); } - private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { - private final ConcurrentMap values = new ConcurrentHashMap<>(); - private boolean failOnCalls = false; + @Test + public void testWaitForTotalCount() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("both", "notifyValue"); + cachedAttributes.put("uuid", "notifyUuid"); + cachedAttributes.put("notify.only", "notifyValue"); - public void setFailOnCalls(boolean failOnCalls){ - this.failOnCalls = failOnCalls; - } + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + protocol.notify("key", "counter-A", 1, cachedAttributes); + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}"); - private void verifyNotFail() throws IOException { - if (failOnCalls) { - throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); - } - } + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("targetSignalCount", "3"); + waitAttributes.put("wait.only", "waitValue"); + waitAttributes.put("both", "waitValue"); + waitAttributes.put("uuid", UUID.randomUUID().toString()); + String flowFileContent = "content"; + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); - @Override - public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - verifyNotFail(); - final Object retValue = values.putIfAbsent(key, value); - return (retValue == null); - } + /* + * 1st iteration + */ + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - @Override - @SuppressWarnings("unchecked") - public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, - final Deserializer valueDeserializer) throws IOException { - verifyNotFail(); - return (V) values.putIfAbsent(key, value); - } + /* + * 2nd iteration. + */ + runner.clearTransferState(); + runner.enqueue(waitingFlowFile); - @Override - public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { - verifyNotFail(); - return values.containsKey(key); - } + // Notify with other counter. + protocol.notify("key", "counter-B", 1, cachedAttributes); - @Override - public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - verifyNotFail(); - values.put(key, value); - } + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + // Still waiting since total count doesn't reach to 3. + waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - @Override - @SuppressWarnings("unchecked") - public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { - verifyNotFail(); - if(values.containsKey(key)) { - return (V) values.get(key); - } else { - return null; - } - } + /* + * 3rd iteration. + */ + runner.clearTransferState(); + runner.enqueue(waitingFlowFile); - @Override - public void close() throws IOException { - } + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + // Still waiting since total count doesn't reach to 3. + waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - @Override - public boolean remove(final K key, final Serializer serializer) throws IOException { - verifyNotFail(); - values.remove(key); - return true; - } + /* + * 4th iteration. + */ + runner.clearTransferState(); + runner.enqueue(waitingFlowFile); + + // Notify with other counter. + protocol.notify("key", "counter-C", 1, cachedAttributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + + // show a new attribute was copied from the cache + assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); + // show that uuid was not overwritten + assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid")); + // show that the original attributes are still there + assertEquals("waitValue", outputFlowFile.getAttribute("wait.only")); + // show that the original attribute is kept + assertEquals("waitValue", outputFlowFile.getAttribute("both")); + runner.clearTransferState(); + + assertNull("The key no longer exist", protocol.getSignal("key")); } + @Test + public void testWaitForSpecificCount() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("both", "notifyValue"); + cachedAttributes.put("uuid", "notifyUuid"); + cachedAttributes.put("notify.only", "notifyValue"); + + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + protocol.notify("key", "counter-A", 1, cachedAttributes); + + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}"); + runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}"); + + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("targetSignalCount", "2"); + waitAttributes.put("signalCounterName", "counter-B"); + waitAttributes.put("wait.only", "waitValue"); + waitAttributes.put("both", "waitValue"); + waitAttributes.put("uuid", UUID.randomUUID().toString()); + String flowFileContent = "content"; + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + /* + * 1st iteration + */ + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + + /* + * 2nd iteration. + */ + runner.clearTransferState(); + runner.enqueue(waitingFlowFile); + + // Notify with target counter. + protocol.notify("key", "counter-B", 1, cachedAttributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + // Still waiting since counter-B doesn't reach to 2. + waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + + /* + * 3rd iteration. + */ + runner.clearTransferState(); + runner.enqueue(waitingFlowFile); + + // Notify with other counter. + protocol.notify("key", "counter-C", 1, cachedAttributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + // Still waiting since total count doesn't reach to 3. + waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + + /* + * 4th iteration. + */ + runner.clearTransferState(); + runner.enqueue(waitingFlowFile); + + // Notify with target counter. + protocol.notify("key", "counter-B", 1, cachedAttributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + + // show a new attribute was copied from the cache + assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); + // show that uuid was not overwritten + assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid")); + // show that the original attributes are still there + assertEquals("waitValue", outputFlowFile.getAttribute("wait.only")); + // show that the original attribute is kept + assertEquals("waitValue", outputFlowFile.getAttribute("both")); + + outputFlowFile.assertAttributeEquals("wait.counter.total", "4"); + outputFlowFile.assertAttributeEquals("wait.counter.counter-A", "1"); + outputFlowFile.assertAttributeEquals("wait.counter.counter-B", "2"); + outputFlowFile.assertAttributeEquals("wait.counter.counter-C", "1"); + + runner.clearTransferState(); + + assertNull("The key no longer exist", protocol.getSignal("key")); + + } + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java new file mode 100644 index 0000000000..d4bc783033 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry; +import org.apache.nifi.distributed.cache.client.StandardCacheEntry; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; +import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; + +import java.nio.charset.StandardCharsets; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class TestWaitNotifyProtocol { + + private final Map> cacheEntries = new HashMap<>(); + + private AtomicDistributedMapCacheClient cache; + private final Answer successfulReplace = invocation -> { + final String key = invocation.getArgumentAt(0, String.class); + final String value = invocation.getArgumentAt(1, String.class); + final Long revision = invocation.getArgumentAt(4, Long.class); + cacheEntries.put(key, new StandardCacheEntry<>(key, value, revision + 1)); + return true; + }; + + @Before + public void before() throws Exception { + cacheEntries.clear(); + + // Default mock implementations. + cache = mock(AtomicDistributedMapCacheClient.class); + doAnswer(invocation -> { + final CacheEntry entry = cacheEntries.get(invocation.getArguments()[0]); + return entry; + }).when(cache).fetch(any(), any(), any()); + } + + @Test + public void testNotifyRetryFailure() throws Exception { + + // replace always return false. + doAnswer(invocation -> false) + .when(cache).replace(any(), any(), any(), any(), anyLong()); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + + final String signalId = "signal-id"; + try { + protocol.notify(signalId, "a", 1, null); + fail("Notify should fail after retrying few times."); + } catch (ConcurrentModificationException e) { + } + } + + @Test + public void testNotifyFirst() throws Exception { + + doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + + final String signalId = "signal-id"; + final Signal signal = protocol.notify(signalId, "a", 1, null); + + assertNotNull(signal); + assertEquals(Long.valueOf(1), signal.getCounts().get("a")); + assertTrue(cacheEntries.containsKey("signal-id")); + + final CacheEntry cacheEntry = cacheEntries.get("signal-id"); + + assertEquals(0, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue()); + } + + @Test + public void testNotifyCounters() throws Exception { + + doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + + final String signalId = "signal-id"; + + protocol.notify(signalId, "a", 1, null); + protocol.notify(signalId, "a", 1, null); + + CacheEntry cacheEntry = cacheEntries.get("signal-id"); + assertEquals(1, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue()); + + protocol.notify(signalId, "a", 10, null); + + cacheEntry = cacheEntries.get("signal-id"); + assertEquals(2, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue()); + + protocol.notify(signalId, "b", 2, null); + protocol.notify(signalId, "c", 3, null); + + cacheEntry = cacheEntries.get("signal-id"); + assertEquals(4, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue()); + + } + + @Test + public void testNotifyAttributes() throws Exception { + doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + + final String signalId = "signal-id"; + + final Map attributeA1 = new HashMap<>(); + attributeA1.put("p1", "a1"); + attributeA1.put("p2", "a1"); + + protocol.notify(signalId, "a", 1, attributeA1); + + CacheEntry cacheEntry = cacheEntries.get("signal-id"); + assertEquals(0, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue()); + + final Map attributeA2 = new HashMap<>(); + attributeA2.put("p2", "a2"); // Update p2 + attributeA2.put("p3", "a2"); // Add p3 + + // Notify again + protocol.notify(signalId, "a", 1, attributeA2); + + cacheEntry = cacheEntries.get("signal-id"); + assertEquals(1, cacheEntry.getRevision()); + assertEquals("Updated attributes should be merged correctly", + "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue()); + + } + + @Test + public void testSignalCount() throws Exception { + doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + + final String signalId = "signal-id"; + + Signal signal = protocol.getSignal(signalId); + assertNull("Should be null since there's no signal yet", signal); + + // First notification. + protocol.notify(signalId, "success", 1, null); + + signal = protocol.getSignal(signalId); + assertNotNull(signal); + assertEquals(1, signal.getCount("success")); + assertTrue(signal.isCountReached("success", 1)); + assertFalse(signal.isCountReached("success", 2)); + assertTrue(signal.isTotalCountReached(1)); + assertFalse(signal.isTotalCountReached(2)); + + // Notify again with different counter name. + protocol.notify(signalId, "failure", 1, null); + + signal = protocol.getSignal(signalId); + assertNotNull(signal); + assertEquals(1, signal.getCount("success")); + assertEquals(1, signal.getCount("failure")); + assertTrue(signal.isCountReached("failure", 1)); + assertFalse(signal.isCountReached("failure", 2)); + assertTrue(signal.isTotalCountReached(1)); + assertTrue(signal.isTotalCountReached(2)); + + } + + /** + * Test migration across NiFi version upgrade. + * Old version of Wait/Notify processors use FlowFileAttributesSerializer for cache entries. + * New version uses StringSerializer. WaitNotifyProtocol should be able to migrate old cache entries. + */ + @Test + public void testNiFiVersionUpgrade() throws Exception { + doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + + // Simulate old cache entry. + final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer(); + final Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("key1", "value1"); + cachedAttributes.put("key2", "value2"); + cachedAttributes.put("key3", "value3"); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + attributesSerializer.serialize(cachedAttributes, bos); + + final String signalId = "old-entry"; + cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0)); + + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); + final Signal signal = protocol.getSignal(signalId); + + assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME)); + assertEquals("value1", signal.getAttributes().get("key1")); + assertEquals("value2", signal.getAttributes().get("key2")); + assertEquals("value3", signal.getAttributes().get("key3")); + + cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0)); + try { + protocol.getSignal(signalId); + fail("Should fail since cached value was not in expected format."); + } catch (DeserializationException e) { + } + + } + +} \ No newline at end of file