NIFI-3216: Add N signals to Wait/Notify

- Support counters at Wait/Notify processors so that NiFi flow can be
  configured to wait for N signals
- Extract Wait/Notify logics into WaitNotifyProtocol
- Added FragmentAttributes to manage commonly used fragment attributes
- Changed existing split processors to set 'fragment.identifier' and
  'fragment.count', so that Wait can use those to wait for all splits
get processed

This closes #1420.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2017-01-13 16:52:30 +09:00 committed by Bryan Bende
parent e62eeb7563
commit 7f0171ffa2
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
25 changed files with 1205 additions and 241 deletions

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flowfile.attributes;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import java.util.HashMap;
import java.util.Map;
/**
* This enum class contains flow file attribute keys commonly used among Split processors.
*/
public enum FragmentAttributes implements FlowFileAttributeKey {
/**
* The number of bytes from the original FlowFile that were copied to this FlowFile,
* including header, if applicable, which is duplicated in each split FlowFile.
*/
FRAGMENT_SIZE("fragment.size"),
/**
* All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute.
*/
FRAGMENT_ID("fragment.identifier"),
/**
* A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile.
*/
FRAGMENT_INDEX("fragment.index"),
/**
* The number of split FlowFiles generated from the parent FlowFile.
*/
FRAGMENT_COUNT("fragment.count"),
/**
* The filename of the parent FlowFile.
*/
SEGMENT_ORIGINAL_FILENAME("segment.original.filename");
private final String key;
FragmentAttributes(final String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
public static FlowFile copyAttributesToOriginal(final ProcessSession processSession, final FlowFile originalFlowFile,
final String fragmentId, final int fragmentCount) {
final Map<String, String> attributesToOriginal = new HashMap<>();
if (fragmentId != null && fragmentId.length() > 0) {
attributesToOriginal.put(FRAGMENT_ID.key(), fragmentId);
}
attributesToOriginal.put(FRAGMENT_COUNT.key(), String.valueOf(fragmentCount));
return processSession.putAllAttributes(originalFlowFile, attributesToOriginal);
}
}

View File

@ -64,6 +64,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "split" })
@ -217,13 +223,14 @@ public class SplitAvro extends AbstractProcessor {
final String fragmentIdentifier = UUID.randomUUID().toString();
IntStream.range(0, splits.size()).forEach((i) -> {
FlowFile split = splits.get(i);
split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
split = session.putAttribute(split, "fragment.index", Integer.toString(i));
split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size()));
split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(i));
split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
session.transfer(split, REL_SPLIT);
});
session.transfer(flowFile, REL_ORIGINAL);
final FlowFile originalFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, splits.size());
session.transfer(originalFlowFile, REL_ORIGINAL);
} catch (ProcessException e) {
getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, REL_FAILURE);

View File

@ -44,6 +44,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.stream.IntStream;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.junit.Assert.assertEquals;
public class TestSplitAvro {
@ -116,6 +118,9 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, true);
final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier");
@ -123,7 +128,7 @@ public class TestSplitAvro {
MockFlowFile flowFile = flowFiles.get(i);
assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index")));
assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier"));
assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count")));
assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute(FRAGMENT_COUNT.key())));
assertEquals(filename, flowFile.getAttribute("segment.original.filename"));
});
}
@ -140,6 +145,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 20, true);
}
@ -156,6 +162,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 100, true);
}
@ -172,6 +179,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, false);
@ -197,6 +205,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "100");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 1, true);
@ -215,6 +224,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, true);
@ -234,6 +244,7 @@ public class TestSplitAvro {
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, false);

View File

@ -65,6 +65,11 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
@SideEffectFree
@SupportsBatching
@SeeAlso(PutSQL.class)
@ -369,9 +374,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
attributes.put("sql.table", tableName);
attributes.put("fragment.identifier", fragmentIdentifier);
attributes.put("fragment.count", String.valueOf(arrayNode.size()));
attributes.put("fragment.index", String.valueOf(i));
attributes.put(FRAGMENT_ID.key(), fragmentIdentifier);
attributes.put(FRAGMENT_COUNT.key(), String.valueOf(arrayNode.size()));
attributes.put(FRAGMENT_INDEX.key(), String.valueOf(i));
if (catalog != null) {
attributes.put("sql.catalog", catalog);
@ -381,6 +386,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
session.transfer(sqlFlowFile, REL_SQL);
}
flowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size());
session.transfer(flowFile, REL_ORIGINAL);
}

View File

@ -67,6 +67,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -127,15 +128,15 @@ import org.apache.nifi.util.FlowFilePackagerV3;
public class MergeContent extends BinFiles {
// preferred attributes
public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
public static final String FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
public static final String FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
// old style attributes
public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm",

View File

@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -37,9 +35,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -49,7 +45,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
@EventDriven
@SupportsBatching
@ -67,7 +62,7 @@ public class Notify extends AbstractProcessor {
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
@ -80,6 +75,18 @@ public class Notify extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " +
"Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
"of different types of events, such as success or failure, or destination data source names, etc.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
.build();
// Specifies an optional regex used to identify which attributes to cache
public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
.name("Attribute Cache Regex")
@ -103,9 +110,6 @@ public class Notify extends AbstractProcessor {
private final Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
private final Serializer<Map<String, String>> valueSerializer = new FlowFileAttributesSerializer();
public Notify() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@ -117,6 +121,7 @@ public class Notify extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_CACHE_REGEX);
return descriptors;
@ -137,11 +142,12 @@ public class Notify extends AbstractProcessor {
final ComponentLog logger = getLogger();
// cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(cacheKey)) {
if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@ -149,7 +155,8 @@ public class Notify extends AbstractProcessor {
}
// the cache client used to interact with the distributed cache
final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
try {
final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
@ -164,10 +171,12 @@ public class Notify extends AbstractProcessor {
}
if (logger.isDebugEnabled()) {
logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile});
logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
}
cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer);
// In case of ConcurrentModificationException, just throw the exception so that processor can
// retry after yielding for a while.
protocol.notify(signalId, counterName, 1, attributesToCache);
session.transfer(flowFile, REL_SUCCESS);
} catch (final IOException e) {
@ -177,15 +186,4 @@ public class Notify extends AbstractProcessor {
}
}
/**
* Simple string serializer, used for serializing the cache key
*/
public static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
@ -161,9 +162,9 @@ public class PutSQL extends AbstractProcessor {
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key();
private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();
private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$");

View File

@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@ -80,11 +81,11 @@ public class SegmentContent extends AbstractProcessor {
public static final String SEGMENT_ID = "segment.identifier";
public static final String SEGMENT_INDEX = "segment.index";
public static final String SEGMENT_COUNT = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String FRAGMENT_ID = "fragment.identifier";
public static final String FRAGMENT_INDEX = "fragment.index";
public static final String FRAGMENT_COUNT = "fragment.count";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
.name("Segment Size")
@ -180,6 +181,7 @@ public class SegmentContent extends AbstractProcessor {
}
session.transfer(segmentSet, REL_SEGMENTS);
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, segmentId, totalSegments);
session.transfer(flowFile, REL_ORIGINAL);
if (totalSegments <= 10) {

View File

@ -50,6 +50,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -77,10 +78,10 @@ import org.apache.nifi.util.Tuple;
public class SplitContent extends AbstractProcessor {
// attribute keys
public static final String FRAGMENT_ID = "fragment.identifier";
public static final String FRAGMENT_INDEX = "fragment.index";
public static final String FRAGMENT_COUNT = "fragment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
@ -182,7 +183,7 @@ public class SplitContent extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@ -285,8 +286,9 @@ public class SplitContent extends AbstractProcessor {
splitList.add(finalSplit);
}
finishFragmentAttributes(session, flowFile, splitList);
final String fragmentId = finishFragmentAttributes(session, flowFile, splitList);
session.transfer(splitList, REL_SPLITS);
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, splitList.size());
session.transfer(flowFile, REL_ORIGINAL);
if (splitList.size() > 10) {
@ -302,8 +304,9 @@ public class SplitContent extends AbstractProcessor {
* @param session session
* @param source source
* @param splits splits
* @return generated fragment identifier for the splits
*/
private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
private String finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
final String fragmentId = UUID.randomUUID().toString();
@ -319,6 +322,7 @@ public class SplitContent extends AbstractProcessor {
FlowFile newFF = session.putAllAttributes(ff, attributes);
splits.add(newFF);
}
return fragmentId;
}
static class HexStringPropertyValidator implements Validator {

View File

@ -56,6 +56,12 @@ import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
@EventDriven
@SideEffectFree
@SupportsBatching
@ -166,7 +172,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
@Override
public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) {
final FlowFile original = processSession.get();
FlowFile original = processSession.get();
if (original == null) {
return;
}
@ -202,8 +208,9 @@ public class SplitJson extends AbstractJsonPathProcessor {
List resultList = (List) jsonPathResult;
Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", UUID.randomUUID().toString());
attributes.put("fragment.count", Integer.toString(resultList.size()));
final String fragmentId = UUID.randomUUID().toString();
attributes.put(FRAGMENT_ID.key(), fragmentId);
attributes.put(FRAGMENT_COUNT.key(), Integer.toString(resultList.size()));
for (int i = 0; i < resultList.size(); i++) {
Object resultSegment = resultList.get(i);
@ -213,11 +220,12 @@ public class SplitJson extends AbstractJsonPathProcessor {
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
}
);
attributes.put("segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
attributes.put("fragment.index", Integer.toString(i));
attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
attributes.put(FRAGMENT_INDEX.key(), Integer.toString(i));
processSession.transfer(processSession.putAllAttributes(split, attributes), REL_SPLIT);
}
original = copyAttributesToOriginal(processSession, original, fragmentId, resultList.size());
processSession.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, resultList.size()});
}

View File

@ -48,6 +48,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@ -86,11 +87,11 @@ import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
public class SplitText extends AbstractProcessor {
// attribute keys
public static final String SPLIT_LINE_COUNT = "text.line.count";
public static final String FRAGMENT_SIZE = "fragment.size";
public static final String FRAGMENT_ID = "fragment.identifier";
public static final String FRAGMENT_INDEX = "fragment.index";
public static final String FRAGMENT_COUNT = "fragment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
public static final String FRAGMENT_SIZE = FragmentAttributes.FRAGMENT_SIZE.key();
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
.name("Line Split Count")
@ -250,8 +251,10 @@ public class SplitText extends AbstractProcessor {
if (error.get()){
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
processSession.transfer(sourceFlowFile, REL_ORIGINAL);
final String fragmentId = UUID.randomUUID().toString();
List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(fragmentId, sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
final FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal(processSession, sourceFlowFile, fragmentId, splitFlowFiles.size());
processSession.transfer(originalFlowFile, REL_ORIGINAL);
if (!splitFlowFiles.isEmpty()) {
processSession.transfer(splitFlowFiles, REL_SPLITS);
}
@ -279,7 +282,8 @@ public class SplitText extends AbstractProcessor {
* it signifies the header information and its contents will be included in
* each and every computed split.
*/
private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
private List<FlowFile> generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo,
List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
List<FlowFile> splitFlowFiles = new ArrayList<>();
FlowFile headerFlowFile = null;
long headerCrlfLength = 0;
@ -288,7 +292,6 @@ public class SplitText extends AbstractProcessor {
headerCrlfLength = splitInfo.trimmedLength;
}
int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
String fragmentId = UUID.randomUUID().toString();
if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);

View File

@ -64,6 +64,12 @@ import org.xml.sax.Locator;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal;
@EventDriven
@SideEffectFree
@SupportsBatching
@ -161,9 +167,9 @@ public class SplitXml extends AbstractProcessor {
final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree -> {
FlowFile split = session.create(original);
split = session.write(split, out -> out.write(xmlTree.getBytes("UTF-8")));
split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
split = session.putAttribute(split, "fragment.index", Integer.toString(numberOfRecords.getAndIncrement()));
split = session.putAttribute(split, "segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
split = session.putAttribute(split, FRAGMENT_ID.key(), fragmentIdentifier);
split = session.putAttribute(split, FRAGMENT_INDEX.key(), Integer.toString(numberOfRecords.getAndIncrement()));
split = session.putAttribute(split, SEGMENT_ORIGINAL_FILENAME.key(), split.getAttribute(CoreAttributes.FILENAME.key()));
splits.add(split);
}, depth);
@ -188,12 +194,13 @@ public class SplitXml extends AbstractProcessor {
session.remove(splits);
} else {
splits.forEach((split) -> {
split = session.putAttribute(split, "fragment.count", Integer.toString(numberOfRecords.get()));
split = session.putAttribute(split, FRAGMENT_COUNT.key(), Integer.toString(numberOfRecords.get()));
session.transfer(split, REL_SPLIT);
});
session.transfer(original, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()});
final FlowFile originalToTransfer = copyAttributesToOriginal(session, original, fragmentIdentifier, numberOfRecords.get());
session.transfer(originalToTransfer, REL_ORIGINAL);
logger.info("Split {} into {} FlowFiles", new Object[]{originalToTransfer, splits.size()});
}
}

View File

@ -53,6 +53,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -97,10 +98,10 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
@SeeAlso(MergeContent.class)
public class UnpackContent extends AbstractProcessor {
// attribute keys
public static final String FRAGMENT_ID = "fragment.identifier";
public static final String FRAGMENT_INDEX = "fragment.index";
public static final String FRAGMENT_COUNT = "fragment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String AUTO_DETECT_FORMAT_NAME = "use mime.type attribute";
public static final String TAR_FORMAT_NAME = "tar";
@ -262,6 +263,8 @@ public class UnpackContent extends AbstractProcessor {
finishFragmentAttributes(session, flowFile, unpacked);
}
session.transfer(unpacked, REL_SUCCESS);
final String fragmentId = unpacked.size() > 0 ? unpacked.get(0).getAttribute(FRAGMENT_ID) : null;
flowFile = FragmentAttributes.copyAttributesToOriginal(session, flowFile, fragmentId, unpacked.size());
session.transfer(flowFile, REL_ORIGINAL);
session.getProvenanceReporter().fork(flowFile, unpacked);
logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked});

View File

@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -28,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -35,15 +34,13 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -53,19 +50,30 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
+ "is stored in the distributed cache from a corresponding Notify processor. At this point, a waiting FlowFile is routed to "
+ "the 'success' relationship, with attributes copied from the FlowFile that produced "
+ "the release signal from the Notify processor. The release signal entry is then removed from "
+ "the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration.")
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.")
+ "is stored in the distributed cache from a corresponding Notify processor. "
+ "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, "
+ "with attributes copied from the FlowFile that produced the release signal from the Notify processor. "
+ "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+ "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+ "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+ "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
)
@WritesAttributes({
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile."),
@WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+ "each count value in the signal is copied.")
})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Notify"})
public class Wait extends AbstractProcessor {
@ -77,7 +85,7 @@ public class Wait extends AbstractProcessor {
.name("Distributed Cache Service")
.description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
@ -90,6 +98,29 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
.name("Target Signal Count")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the target signal count. " +
"This processor checks whether the signal count has reached this number. " +
"If Signal Counter Name is specified, this processor checks a particular counter, " +
"otherwise checks against total count in a signal.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("1")
.build();
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " +
"If not specified, this processor checks the total count in a signal.")
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("Expiration Duration")
@ -136,9 +167,6 @@ public class Wait extends AbstractProcessor {
.build();
private final Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
private final Deserializer<Map<String, String>> valueDeserializer = new FlowFileAttributesSerializer();
public Wait() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@ -152,6 +180,8 @@ public class Wait extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(TARGET_SIGNAL_COUNT);
descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
@ -173,11 +203,11 @@ public class Wait extends AbstractProcessor {
final ComponentLog logger = getLogger();
// cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(cacheKey)) {
if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@ -185,9 +215,17 @@ public class Wait extends AbstractProcessor {
}
// the cache client used to interact with the distributed cache
final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
Signal signal = null;
try {
// get notifying signal
signal = protocol.getSignal(signalId);
// check for expiration
String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
if (waitStartTimestamp == null) {
@ -201,6 +239,8 @@ public class Wait extends AbstractProcessor {
} catch (NumberFormatException nfe) {
logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
flowFile = session.penalize(flowFile);
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -209,58 +249,83 @@ public class Wait extends AbstractProcessor {
long now = System.currentTimeMillis();
if (now > (lWaitStartTimestamp + expirationDuration)) {
logger.warn("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_EXPIRED);
return;
}
// get notifying flow file attributes
Map<String, String> cachedAttributes = cache.get(cacheKey, keySerializer, valueDeserializer);
if (cachedAttributes == null) {
if (signal == null) {
// If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (logger.isDebugEnabled()) {
logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {cacheKey, flowFile});
logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
}
session.transfer(flowFile, REL_WAIT);
return;
}
// copy over attributes from release signal flow file, if provided
if (!cachedAttributes.isEmpty()) {
cachedAttributes.remove("uuid");
String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
if (ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode)) {
flowFile = session.putAllAttributes(flowFile, cachedAttributes);
} else {
Map<String, String> attributesToCopy = new HashMap<>();
for(Entry<String, String> entry : cachedAttributes.entrySet()) {
// if the current flow file does *not* have the cached attribute, copy it
if (flowFile.getAttribute(entry.getKey()) == null) {
attributesToCopy.put(entry.getKey(), entry.getValue());
}
}
flowFile = session.putAllAttributes(flowFile, attributesToCopy);
final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount);
if (!reachedToTargetCount) {
if (logger.isDebugEnabled()) {
logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
new Object[] {targetCounterName, targetCount, signalId, flowFile});
}
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_WAIT);
return;
}
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_SUCCESS);
cache.remove(cacheKey, keySerializer);
protocol.complete(signalId);
} catch (final NumberFormatException e) {
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
} catch (final IOException e) {
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}
}
/**
* Simple string serializer, used for serializing the cache key
*/
public static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
if (signal == null) {
return flowFile;
}
// copy over attributes from release signal flow file, if provided
final Map<String, String> attributesToCopy;
if (replaceOriginal) {
attributesToCopy = new HashMap<>(signal.getAttributes());
attributesToCopy.remove("uuid");
} else {
// if the current flow file does *not* have the cached attribute, copy it
attributesToCopy = signal.getAttributes().entrySet().stream()
.filter(e -> flowFile.getAttribute(e.getKey()) == null)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
// Copy counter attributes
final Map<String, Long> counts = signal.getCounts();
final long totalCount = counts.entrySet().stream().mapToLong(e -> {
final Long count = e.getValue();
attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
return count;
}).sum();
attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));
return session.putAllAttributes(flowFile, attributesToCopy);
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
/**
* This class provide a protocol for Wait and Notify processors to work together.
* Once AtomicDistributedMapCacheClient is passed to this protocol, components that wish to join the notification mechanism
* should only use methods provided by this protocol, instead of calling cache API directly.
*/
public class WaitNotifyProtocol {
private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
public static final String DEFAULT_COUNT_NAME = "default";
private static final int MAX_REPLACE_RETRY_COUNT = 5;
private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Serializer<String> stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
public static class Signal {
private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>();
public Map<String, Long> getCounts() {
return counts;
}
public void setCounts(Map<String, Long> counts) {
this.counts = counts;
}
public Map<String, String> getAttributes() {
return attributes;
}
public void setAttributes(Map<String, String> attributes) {
this.attributes = attributes;
}
public boolean isTotalCountReached(final long targetCount) {
final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
return totalCount >= targetCount;
}
public boolean isCountReached(final String counterName, final long targetCount) {
return getCount(counterName) >= targetCount;
}
public long getCount(final String counterName) {
final Long count = counts.get(counterName);
return count != null ? count : -1;
}
}
private final AtomicDistributedMapCacheClient cache;
public WaitNotifyProtocol(final AtomicDistributedMapCacheClient cache) {
this.cache = cache;
}
/**
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
* @param counterName specify count to update
* @param delta delta to update a counter
* @param attributes attributes to save in the cache entry
* @return A Signal instance, merged with an existing signal if any
* @throws IOException thrown when it failed interacting with the cache engine
* @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts
*/
public Signal notify(final String signalId, final String counterName, final int delta, final Map<String, String> attributes)
throws IOException, ConcurrentModificationException {
for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
Signal signal = getSignal(signalId);
if (signal == null) {
signal = new Signal();
}
if (attributes != null) {
signal.attributes.putAll(attributes);
}
long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
count += delta;
signal.counts.put(counterName, count);
final String signalJson = objectMapper.writeValueAsString(signal);
final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
return signal;
}
long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1);
logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName);
try {
Thread.sleep(waitMillis);
} catch (InterruptedException e) {
final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName);
throw new ConcurrentModificationException(msg, e);
}
}
final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT);
throw new ConcurrentModificationException(msg);
}
/**
* Retrieve a stored Signal in the cache engine.
* If a caller gets satisfied with the returned Signal state and finish waiting, it should call {@link #complete(String)}
* to complete the Wait Notify protocol.
* @param signalId a key in the underlying cache engine
* @return A Signal instance
* @throws IOException thrown when it failed interacting with the cache engine
* @throws DeserializationException thrown if the cache found is not in expected serialized format
*/
public Signal getSignal(final String signalId) throws IOException, DeserializationException {
final CacheEntry<String, String> entry = cache.fetch(signalId, stringSerializer, stringDeserializer);
if (entry == null) {
// No signal found.
return null;
}
final String value = entry.getValue();
try {
return objectMapper.readValue(value, Signal.class);
} catch (final JsonParseException jsonE) {
// Try to read it as FlowFileAttributes for backward compatibility.
try {
final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
final Signal signal = new Signal();
signal.setAttributes(attributes);
signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
return signal;
} catch (Exception attrE) {
final String msg = String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"",
signalId, jsonE.getMessage(), attrE.getMessage());
throw new DeserializationException(msg);
}
}
}
/**
* Finish protocol and remove the cache entry.
* @param signalId a key in the underlying cache engine
* @throws IOException thrown when it failed interacting with the cache engine
*/
public void complete(final String signalId) throws IOException {
cache.remove(signalId, stringSerializer);
}
}

View File

@ -37,6 +37,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
public class TestConvertJSONToSQL {
static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@ -70,6 +73,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -106,6 +110,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -140,6 +145,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -175,6 +181,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@ -210,6 +217,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
@ -245,6 +253,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 5);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL);
for (final MockFlowFile mff : mffs) {
@ -279,6 +288,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@ -314,6 +324,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.VARCHAR));
@ -349,6 +360,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
@ -384,6 +396,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -420,6 +433,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -455,6 +469,9 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -590,6 +607,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -625,6 +643,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -687,6 +706,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
@ -724,6 +744,7 @@ public class TestConvertJSONToSQL {
runner.run();
runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));

View File

@ -16,26 +16,29 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.apache.nifi.distributed.cache.client.StandardCacheEntry;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestNotify {
private TestRunner runner;
@ -66,8 +69,47 @@ public class TestNotify {
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
runner.clearTransferState();
Map<String, String> cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer());
final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("value", cachedAttributes.get("key"));
assertTrue(signal.isTotalCountReached(1));
}
@Test
public void testNotifyCounters() throws InitializationException, IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
final Map<String, String> props1 = new HashMap<>();
props1.put("releaseSignalAttribute", "someDataProcessing");
props1.put("key", "data1");
props1.put("status", "success");
runner.enqueue(new byte[]{}, props1);
final Map<String, String> props2 = new HashMap<>();
props2.put("releaseSignalAttribute", "someDataProcessing");
props2.put("key", "data2");
props2.put("status", "success");
runner.enqueue(new byte[]{}, props2);
final Map<String, String> props3 = new HashMap<>();
props3.put("releaseSignalAttribute", "someDataProcessing");
props3.put("key", "data3");
props3.put("status", "failure");
runner.enqueue(new byte[]{}, props3);
runner.run(3);
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
runner.clearTransferState();
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key"));
assertTrue(signal.isTotalCountReached(3));
assertEquals(2, signal.getCount("success"));
assertEquals(1, signal.getCount("failure"));
}
@Test
@ -86,9 +128,11 @@ public class TestNotify {
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
runner.clearTransferState();
Map<String, String> cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer());
final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("value", cachedAttributes.get("key1"));
assertNull(cachedAttributes.get("other.key1"));
assertTrue(signal.isTotalCountReached(1));
}
@Test
@ -121,11 +165,11 @@ public class TestNotify {
service.setFailOnCalls(false);
}
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient {
private final ConcurrentMap<Object, CacheEntry> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
public void setFailOnCalls(boolean failOnCalls){
void setFailOnCalls(boolean failOnCalls){
this.failOnCalls = failOnCalls;
}
@ -136,42 +180,47 @@ public class TestNotify {
}
}
private void unsupported() throws UnsupportedOperationException {
throw new UnsupportedOperationException("This method shouldn't be used from Notify processor.");
}
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
final Object retValue = values.putIfAbsent(key, value);
return (retValue == null);
unsupported();
return false;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.putIfAbsent(key, value);
unsupported();
return null;
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
verifyNotFail();
return values.containsKey(key);
unsupported();
return false;
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
values.put(key, value);
unsupported();
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
if(values.containsKey(key)) {
return (V) values.get(key);
} else {
final CacheEntry entry = values.get(key);
if (entry == null) {
return null;
}
// This mock cache stores String as it is, without serializing, so it needs to convert it to byte[] first here.
return valueDeserializer.deserialize(((String)entry.getValue()).getBytes(StandardCharsets.UTF_8));
}
@Override
@ -181,7 +230,28 @@ public class TestNotify {
@Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
verifyNotFail();
values.remove(key);
return values.remove(key) != null;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return values.get(key);
}
@Override
public <K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException {
verifyNotFail();
final CacheEntry existing = values.get(key);
if (existing != null && existing.getRevision() != revision) {
return false;
}
values.put(key, new StandardCacheEntry<>(key, value, revision + 1));
return true;
}
}

View File

@ -37,6 +37,11 @@ public class TestSegmentContent {
testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
testRunner.run();
testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID);
originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "3");
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS);
assertEquals(3, flowFiles.size());
@ -57,6 +62,11 @@ public class TestSegmentContent {
testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
testRunner.run();
testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID);
originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "1");
testRunner.assertTransferCount(SegmentContent.REL_SEGMENTS, 1);
final MockFlowFile out1 = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS).get(0);
out1.assertContentEquals(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});

View File

@ -25,6 +25,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
public class TestSplitContent {
@Test
@ -39,6 +42,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
runner.assertQueueEmpty();
@ -62,6 +66,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
runner.assertQueueEmpty();
@ -76,6 +81,7 @@ public class TestSplitContent {
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
@ -89,6 +95,7 @@ public class TestSplitContent {
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a test");
@ -102,6 +109,7 @@ public class TestSplitContent {
runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a test");
@ -115,6 +123,7 @@ public class TestSplitContent {
runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "5");
runner.assertTransferCount(SplitContent.REL_SPLITS, 5);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
@ -138,6 +147,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "3");
runner.assertTransferCount(SplitContent.REL_SPLITS, 3);
runner.assertQueueEmpty();
@ -157,6 +167,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@ -178,6 +189,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@ -202,6 +214,9 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID);
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@ -227,6 +242,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@ -249,6 +265,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
runner.assertQueueEmpty();
@ -269,6 +286,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
runner.assertQueueEmpty();
@ -289,6 +307,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
runner.assertQueueEmpty();
@ -309,6 +328,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
@ -327,6 +347,7 @@ public class TestSplitContent {
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();

View File

@ -35,6 +35,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
public class TestSplitJson {
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
@ -86,6 +91,7 @@ public class TestSplitJson {
testRunner.run();
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 1);
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("0");
@ -102,6 +108,7 @@ public class TestSplitJson {
int numSplitsExpected = 10;
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), String.valueOf(numSplitsExpected));
testRunner.assertTransferCount(SplitJson.REL_SPLIT, numSplitsExpected);
final MockFlowFile originalOut = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
originalOut.assertContentEquals(JSON_SNIPPET);
@ -120,18 +127,21 @@ public class TestSplitJson {
testRunner.run();
testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7");
originalFlowFile.assertContentEquals(JSON_SNIPPET);
testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0);
flowFile.assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
flowFile.assertAttributeEquals("fragment.count", "7");
flowFile.assertAttributeEquals("fragment.index", "0");
flowFile.assertAttributeEquals("segment.original.filename", "test.json");
flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7");
flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), "0");
flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.json");
flowFile = testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(6);
flowFile.assertAttributeEquals("fragment.count", "7");
flowFile.assertAttributeEquals("fragment.index", "6");
flowFile.assertAttributeEquals("segment.original.filename", "test.json");
flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "7");
flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), "6");
flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.json");
}
@Test

View File

@ -50,6 +50,9 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "3");
originalFlowFile.assertAttributeExists(SplitText.FRAGMENT_ID);
runner.assertTransferCount(SplitText.REL_SPLITS, 3);
}
@ -80,6 +83,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -87,7 +91,7 @@ public class TestSplitText {
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "86");
splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "3");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "54");
final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
final String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i+1));
@ -110,6 +114,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "3");
runner.assertTransferCount(SplitText.REL_SPLITS, 3);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -119,7 +124,7 @@ public class TestSplitText {
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "55");
splits.get(2).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
splits.get(2).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "23");
final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
final String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i + 1));
@ -152,6 +157,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "6");
runner.assertTransferCount(SplitText.REL_SPLITS, 6);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -176,6 +182,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1");
runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat with header cou8nt versus header marker
@ -189,6 +196,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1");
runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat single header line with no newline characters
@ -202,6 +210,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals("fragment.count", "1");
runner.assertTransferCount(SplitText.REL_SPLITS, 1);
}
@ -218,10 +227,11 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
String fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
@ -244,10 +254,11 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
fragmentUUID = splits.get(0).getAttribute(SplitText.FRAGMENT_ID);
splits.get(0).assertContentEquals("Header Line #1\nHeader Line #2\nLine #1\nLine #2\nLine #3\nLine #4\nLine #5\n");
splits.get(1).assertContentEquals("Line #6\nLine #7\nLine #8\nLine #9\nLine #10");
splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7");
@ -275,6 +286,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -292,6 +304,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splitsWithNoHeader = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -311,6 +324,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -328,6 +342,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splitsWithNoHeader =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -394,6 +409,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "4");
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
@ -420,6 +436,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitText.REL_ORIGINAL).get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "11");
runner.assertTransferCount(SplitText.REL_SPLITS, 11);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);

View File

@ -37,6 +37,11 @@ import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX;
import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME;
public class TestSplitXml {
SAXParserFactory factory;
@ -67,15 +72,18 @@ public class TestSplitXml {
});
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID.key());
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "6");
runner.assertTransferCount(SplitXml.REL_SPLIT, 6);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT));
Arrays.asList(0, 1, 2, 3, 4, 5).forEach((index) -> {
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT).get(index);
flowFile.assertAttributeEquals("fragment.index", Integer.toString(index));
flowFile.assertAttributeEquals("fragment.count", "6");
flowFile.assertAttributeEquals("segment.original.filename", "test.xml");
flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), Integer.toString(index));
flowFile.assertAttributeEquals(FRAGMENT_COUNT.key(), "6");
flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), "test.xml");
});
}
@ -86,6 +94,7 @@ public class TestSplitXml {
runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "12");
runner.assertTransferCount(SplitXml.REL_SPLIT, 12);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
@ -99,6 +108,7 @@ public class TestSplitXml {
runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "12");
runner.assertTransferCount(SplitXml.REL_SPLIT, 12);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
@ -118,6 +128,7 @@ public class TestSplitXml {
runner.enqueue(Paths.get("src/test/resources/TestXml/namespace.xml"));
runner.run();
runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "2");
runner.assertTransferCount(SplitXml.REL_SPLIT, 2);
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -57,10 +59,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -95,10 +101,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -138,10 +148,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -174,10 +188,14 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
autoUnpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "1");
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -211,6 +229,8 @@ public class TestUnpackContent {
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -235,6 +255,8 @@ public class TestUnpackContent {
runner.assertTransferCount(UnpackContent.REL_SUCCESS, 4);
runner.assertTransferCount(UnpackContent.REL_ORIGINAL, 2);
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(1).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = runner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -258,6 +280,7 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
@ -292,6 +315,9 @@ public class TestUnpackContent {
unpackRunner.assertTransferCount(UnpackContent.REL_SUCCESS, 2);
unpackRunner.assertTransferCount(UnpackContent.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(FRAGMENT_ID);
originalFlowFile.assertAttributeEquals(FRAGMENT_COUNT, "2");
unpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked = unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);

View File

@ -24,14 +24,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -60,7 +53,7 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
runner.enqueue(new byte[] {},props);
runner.enqueue(new byte[]{}, props);
runner.run();
@ -76,7 +69,7 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
runner.enqueue(new byte[] {},props);
runner.enqueue(new byte[]{}, props);
runner.run();
@ -93,6 +86,45 @@ public class TestWait {
runner.clearTransferState();
}
@Test
public void testCounterExpired() throws InitializationException, InterruptedException, IOException {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "5");
runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "notification-id");
runner.enqueue(new byte[]{}, props);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
runner.clearTransferState();
runner.enqueue(ff);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
final Map<String, String> signalAttributes = new HashMap<>();
signalAttributes.put("signal-attr-1", "signal-attr-1-value");
signalAttributes.put("signal-attr-2", "signal-attr-2-value");
protocol.notify("notification-id", "counter-A", 1, signalAttributes);
protocol.notify("notification-id", "counter-B", 2, signalAttributes);
Thread.sleep(101L);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
// Even if wait didn't complete, signal attributes should be set
ff.assertAttributeEquals("wait.counter.total", "3");
ff.assertAttributeEquals("wait.counter.counter-A", "1");
ff.assertAttributeEquals("wait.counter.counter-B", "2");
ff.assertAttributeEquals("signal-attr-1", "signal-attr-1-value");
ff.assertAttributeEquals("signal-attr-2", "signal-attr-2-value");
runner.clearTransferState();
}
@Test
public void testBadWaitStartTimestamp() throws InitializationException, InterruptedException {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
@ -101,7 +133,7 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
props.put("wait.start.timestamp", "blue bunny");
runner.enqueue(new byte[] {},props);
runner.enqueue(new byte[]{}, props);
runner.run();
@ -114,11 +146,12 @@ public class TestWait {
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
final Map<String, String> props = new HashMap<>();
runner.enqueue(new byte[] {},props);
runner.enqueue(new byte[]{}, props);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
runner.clearTransferState();
}
@ -129,12 +162,13 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "2");
runner.enqueue(new byte[] {}, props);
runner.enqueue(new byte[]{}, props);
runner.run();
//Expect the processor to receive an IO exception from the cache service and route to failure
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.assertTransferCount(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
service.setFailOnCalls(false);
}
@ -146,7 +180,10 @@ public class TestWait {
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer());
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
protocol.notify("key", "default", 1, cachedAttributes);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_REPLACE.getValue());
@ -159,7 +196,7 @@ public class TestWait {
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
// make sure the key is in the cache before Wait runs
assertNotNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer()));
assertNotNull(protocol.getSignal("key"));
runner.run();
@ -180,7 +217,7 @@ public class TestWait {
runner.clearTransferState();
// make sure Wait removed this key from the cache
assertNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer()));
assertNull(protocol.getSignal("key"));
}
@Test
@ -190,7 +227,10 @@ public class TestWait {
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer());
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
protocol.notify("key", "default", 1, cachedAttributes);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue());
@ -221,70 +261,181 @@ public class TestWait {
runner.clearTransferState();
}
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
@Test
public void testWaitForTotalCount() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("both", "notifyValue");
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
public void setFailOnCalls(boolean failOnCalls){
this.failOnCalls = failOnCalls;
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
protocol.notify("key", "counter-A", 1, cachedAttributes);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
final Map<String, String> waitAttributes = new HashMap<>();
waitAttributes.put("releaseSignalAttribute", "key");
waitAttributes.put("targetSignalCount", "3");
waitAttributes.put("wait.only", "waitValue");
waitAttributes.put("both", "waitValue");
waitAttributes.put("uuid", UUID.randomUUID().toString());
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
/*
* 1st iteration
*/
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
/*
* 2nd iteration.
*/
runner.clearTransferState();
runner.enqueue(waitingFlowFile);
// Notify with other counter.
protocol.notify("key", "counter-B", 1, cachedAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
/*
* 3rd iteration.
*/
runner.clearTransferState();
runner.enqueue(waitingFlowFile);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
/*
* 4th iteration.
*/
runner.clearTransferState();
runner.enqueue(waitingFlowFile);
// Notify with other counter.
protocol.notify("key", "counter-C", 1, cachedAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that uuid was not overwritten
assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid"));
// show that the original attributes are still there
assertEquals("waitValue", outputFlowFile.getAttribute("wait.only"));
// show that the original attribute is kept
assertEquals("waitValue", outputFlowFile.getAttribute("both"));
runner.clearTransferState();
assertNull("The key no longer exist", protocol.getSignal("key"));
}
private void verifyNotFail() throws IOException {
if (failOnCalls) {
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
}
}
@Test
public void testWaitForSpecificCount() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("both", "notifyValue");
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
final Object retValue = values.putIfAbsent(key, value);
return (retValue == null);
}
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
protocol.notify("key", "counter-A", 1, cachedAttributes);
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.putIfAbsent(key, value);
}
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
verifyNotFail();
return values.containsKey(key);
}
final Map<String, String> waitAttributes = new HashMap<>();
waitAttributes.put("releaseSignalAttribute", "key");
waitAttributes.put("targetSignalCount", "2");
waitAttributes.put("signalCounterName", "counter-B");
waitAttributes.put("wait.only", "waitValue");
waitAttributes.put("both", "waitValue");
waitAttributes.put("uuid", UUID.randomUUID().toString());
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
values.put(key, value);
}
/*
* 1st iteration
*/
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
if(values.containsKey(key)) {
return (V) values.get(key);
} else {
return null;
}
}
/*
* 2nd iteration.
*/
runner.clearTransferState();
runner.enqueue(waitingFlowFile);
@Override
public void close() throws IOException {
}
// Notify with target counter.
protocol.notify("key", "counter-B", 1, cachedAttributes);
@Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
verifyNotFail();
values.remove(key);
return true;
}
}
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since counter-B doesn't reach to 2.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
/*
* 3rd iteration.
*/
runner.clearTransferState();
runner.enqueue(waitingFlowFile);
// Notify with other counter.
protocol.notify("key", "counter-C", 1, cachedAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
/*
* 4th iteration.
*/
runner.clearTransferState();
runner.enqueue(waitingFlowFile);
// Notify with target counter.
protocol.notify("key", "counter-B", 1, cachedAttributes);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that uuid was not overwritten
assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid"));
// show that the original attributes are still there
assertEquals("waitValue", outputFlowFile.getAttribute("wait.only"));
// show that the original attribute is kept
assertEquals("waitValue", outputFlowFile.getAttribute("both"));
outputFlowFile.assertAttributeEquals("wait.counter.total", "4");
outputFlowFile.assertAttributeEquals("wait.counter.counter-A", "1");
outputFlowFile.assertAttributeEquals("wait.counter.counter-B", "2");
outputFlowFile.assertAttributeEquals("wait.counter.counter-C", "1");
runner.clearTransferState();
assertNull("The key no longer exist", protocol.getSignal("key"));
}
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
import org.apache.nifi.distributed.cache.client.StandardCacheEntry;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
public class TestWaitNotifyProtocol {
private final Map<String, CacheEntry<String, String>> cacheEntries = new HashMap<>();
private AtomicDistributedMapCacheClient cache;
private final Answer successfulReplace = invocation -> {
final String key = invocation.getArgumentAt(0, String.class);
final String value = invocation.getArgumentAt(1, String.class);
final Long revision = invocation.getArgumentAt(4, Long.class);
cacheEntries.put(key, new StandardCacheEntry<>(key, value, revision + 1));
return true;
};
@Before
public void before() throws Exception {
cacheEntries.clear();
// Default mock implementations.
cache = mock(AtomicDistributedMapCacheClient.class);
doAnswer(invocation -> {
final CacheEntry<String, String> entry = cacheEntries.get(invocation.getArguments()[0]);
return entry;
}).when(cache).fetch(any(), any(), any());
}
@Test
public void testNotifyRetryFailure() throws Exception {
// replace always return false.
doAnswer(invocation -> false)
.when(cache).replace(any(), any(), any(), any(), anyLong());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
try {
protocol.notify(signalId, "a", 1, null);
fail("Notify should fail after retrying few times.");
} catch (ConcurrentModificationException e) {
}
}
@Test
public void testNotifyFirst() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
final Signal signal = protocol.notify(signalId, "a", 1, null);
assertNotNull(signal);
assertEquals(Long.valueOf(1), signal.getCounts().get("a"));
assertTrue(cacheEntries.containsKey("signal-id"));
final CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue());
}
@Test
public void testNotifyCounters() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
protocol.notify(signalId, "a", 1, null);
protocol.notify(signalId, "a", 1, null);
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue());
protocol.notify(signalId, "a", 10, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(2, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue());
protocol.notify(signalId, "b", 2, null);
protocol.notify(signalId, "c", 3, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(4, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
}
@Test
public void testNotifyAttributes() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
final Map<String, String> attributeA1 = new HashMap<>();
attributeA1.put("p1", "a1");
attributeA1.put("p2", "a1");
protocol.notify(signalId, "a", 1, attributeA1);
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue());
final Map<String, String> attributeA2 = new HashMap<>();
attributeA2.put("p2", "a2"); // Update p2
attributeA2.put("p3", "a2"); // Add p3
// Notify again
protocol.notify(signalId, "a", 1, attributeA2);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision());
assertEquals("Updated attributes should be merged correctly",
"{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue());
}
@Test
public void testSignalCount() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = "signal-id";
Signal signal = protocol.getSignal(signalId);
assertNull("Should be null since there's no signal yet", signal);
// First notification.
protocol.notify(signalId, "success", 1, null);
signal = protocol.getSignal(signalId);
assertNotNull(signal);
assertEquals(1, signal.getCount("success"));
assertTrue(signal.isCountReached("success", 1));
assertFalse(signal.isCountReached("success", 2));
assertTrue(signal.isTotalCountReached(1));
assertFalse(signal.isTotalCountReached(2));
// Notify again with different counter name.
protocol.notify(signalId, "failure", 1, null);
signal = protocol.getSignal(signalId);
assertNotNull(signal);
assertEquals(1, signal.getCount("success"));
assertEquals(1, signal.getCount("failure"));
assertTrue(signal.isCountReached("failure", 1));
assertFalse(signal.isCountReached("failure", 2));
assertTrue(signal.isTotalCountReached(1));
assertTrue(signal.isTotalCountReached(2));
}
/**
* Test migration across NiFi version upgrade.
* Old version of Wait/Notify processors use FlowFileAttributesSerializer for cache entries.
* New version uses StringSerializer. WaitNotifyProtocol should be able to migrate old cache entries.
*/
@Test
public void testNiFiVersionUpgrade() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong());
// Simulate old cache entry.
final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer();
final Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("key1", "value1");
cachedAttributes.put("key2", "value2");
cachedAttributes.put("key3", "value3");
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
attributesSerializer.serialize(cachedAttributes, bos);
final String signalId = "old-entry";
cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0));
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Signal signal = protocol.getSignal(signalId);
assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME));
assertEquals("value1", signal.getAttributes().get("key1"));
assertEquals("value2", signal.getAttributes().get("key2"));
assertEquals("value3", signal.getAttributes().get("key3"));
cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0));
try {
protocol.getSignal(signalId);
fail("Should fail since cached value was not in expected format.");
} catch (DeserializationException e) {
}
}
}