NIFI-6047 Cleaned up code to allow tests to run against 1.13.0-snapshot

Removed DMC.
NIFI-6047 Started integrating changes from NIFI-6014.
NIFI-6047 Added DMC tests.
NIFI-6047 Added cache identifier recordpath test.
NIFI-6047 Added additional details.
NIFI-6047 Removed old additional details.
NIFI-6047 made some changes requested in a follow up review.
NIFI-6047 latest.
NIFI-6047 Finished updates
First round of code review cleanup
Latest
Removed EL from the dynamic properties.
Finished code review requested refactoring.
Checkstyle fix.
Removed a Java 11 API
NIFI-6047 Renamed processor to DeduplicateRecord

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4646
This commit is contained in:
Mike Thomsen 2020-10-29 13:52:12 -04:00 committed by Matthew Burgess
parent 23132fb89f
commit df00cc6cb5
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
7 changed files with 685 additions and 658 deletions

View File

@ -22,94 +22,111 @@ import com.google.common.hash.Funnels;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.codec.digest.MessageDigestAlgorithms;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.*;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.*;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.codec.binary.StringUtils.getBytesUtf8;
import static org.apache.commons.lang3.StringUtils.*;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@SystemResourceConsideration(resource = SystemResource.MEMORY,
description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " +
"The BloomFilter type will use constant memory regardless of the number of records processed.")
@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique",
"filter", "hash", "dupe", "duplicate", "dedupe"})
@CapabilityDescription("Caches records from each incoming FlowFile and determines if the record " +
"has already been seen. If so, routes the record to 'duplicate'. If the record is " +
"not determined to be a duplicate, it is routed to 'non-duplicate'."
description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " +
"The BloomFilter type will use constant memory regardless of the number of records processed.")
@SystemResourceConsideration(resource = SystemResource.CPU,
description = "If a more advanced hash algorithm is chosen, the amount of time required to hash any particular " +
"record could increase substantially."
)
@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique",
"filter", "hash", "dupe", "duplicate", "dedupe"})
@CapabilityDescription("This processor attempts to deduplicate a record set in memory using either a hashset or a bloom filter. " +
"It operates on a per-file basis rather than across an entire data set that spans multiple files.")
@WritesAttribute(attribute = "record.count", description = "The number of records processed.")
@DynamicProperty(
name = "RecordPath",
value = "An expression language statement used to determine how the RecordPath is resolved. " +
"The following variables are availble: ${field.name}, ${field.value}, ${field.type}",
description = "The name of each user-defined property must be a valid RecordPath.")
name = "RecordPath",
value = "An expression language statement used to determine how the RecordPath is resolved. " +
"The following variables are availible: ${field.name}, ${field.value}, ${field.type}",
description = "The name of each user-defined property must be a valid RecordPath.")
@SeeAlso(classNames = {
"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.DetectDuplicate"
"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.DetectDuplicate"
})
public class DetectDuplicateRecord extends AbstractProcessor {
public class DeduplicateRecord extends AbstractProcessor {
public static final char JOIN_CHAR = '~';
private static final String FIELD_NAME = "field.name";
private static final String FIELD_VALUE = "field.value";
private static final String FIELD_TYPE = "field.type";
private volatile RecordPathCache recordPathCache;
private volatile List<String> recordPaths;
private volatile List<PropertyDescriptor> dynamicProperties;
// VALUES
static final AllowableValue NONE_ALGORITHM_VALUE = new AllowableValue("none", "None",
"Do not use a hashing algorithm. The value of resolved RecordPaths will be combined with tildes (~) to form the unique record key. " +
"This may use significantly more storage depending on the size and shape or your data.");
static final AllowableValue MD5_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.MD5, "MD5",
"The MD5 message-digest algorithm.");
static final AllowableValue SHA1_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_1, "SHA-1",
"The SHA-1 cryptographic hash algorithm.");
static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_256, "SHA-256",
static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_256, "SHA-256",
"The SHA-256 cryptographic hash algorithm.");
static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_512, "SHA-512",
static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_512, "SHA-512",
"The SHA-512 cryptographic hash algorithm.");
static final AllowableValue HASH_SET_VALUE = new AllowableValue("hash-set", "HashSet",
@ -139,6 +156,45 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.required(true)
.build();
static final AllowableValue OPTION_SINGLE_FILE = new AllowableValue("single", "Single File");
static final AllowableValue OPTION_MULTIPLE_FILES = new AllowableValue("multiple", "Multiple Files");
static final PropertyDescriptor DEDUPLICATION_STRATEGY = new PropertyDescriptor.Builder()
.name("deduplication-strategy")
.displayName("Deduplication Strategy")
.description("The strategy to use for detecting and isolating duplicate records. The option for doing it " +
"across a single data file will operate in memory, whereas the one for going across the enter repository " +
"will require a distributed map cache.")
.allowableValues(OPTION_SINGLE_FILE, OPTION_MULTIPLE_FILES)
.defaultValue(OPTION_SINGLE_FILE.getValue())
.required(true)
.build();
static final PropertyDescriptor DISTRIBUTED_MAP_CACHE = new PropertyDescriptor.Builder()
.name("distributed-map-cache")
.displayName("Distributed Map Cache client")
.description("This configuration is required when the deduplication strategy is set to 'multiple files.' The map " +
"cache will be used to check a data source such as HBase or Redis for entries indicating that a record has " +
"been processed before. This option requires a downstream process that uses PutDistributedMapCache to write " +
"an entry to the cache data source once the record has been processed to indicate that it has been handled before.")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(false)
.addValidator(Validator.VALID)
.dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES)
.build();
static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
.name("cache-identifier")
.displayName("Cache Identifier")
.description("This option defines a record path operation to use for defining the cache identifier. It can be used " +
"in addition to the hash settings. This field will have the expression language attribute \"record.hash.value\" " +
"available to it to use with it to generate the record path operation.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(Validator.VALID)
.dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES)
.build();
static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
.name("include-zero-record-flowfiles")
.displayName("Include Zero Record FlowFiles")
@ -150,59 +206,16 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.required(true)
.build();
static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
.name("cache-the-entry-identifier")
.displayName("Cache The Entry Identifier")
.description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, "
+ "the processor would only check for duplicates and not cache the Entry Identifier, requiring another "
+ "processor to add identifiers to the distributed cache.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("distributed-cache-service")
.displayName("Distributed Cache Service")
.description("The Controller Service that is used to cache unique records, used to determine duplicates")
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("cache-entry-identifier")
.displayName("Cache Entry Identifier")
.description(
"A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated " +
"against a FlowFile in order to determine the cached filter type value used to identify duplicates.")
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.defaultValue("${hash.value}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder()
.name("age-off-duration")
.displayName("Age Off Duration")
.description("Time interval to age off cached filter entries. When the cache expires, the entire filter and its values " +
"are destroyed. Leaving this value empty will cause the cached entries to never expire but may eventually be rotated " +
"out when the cache servers rotation policy automatically expires entries.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor RECORD_HASHING_ALGORITHM = new PropertyDescriptor.Builder()
.name("record-hashing-algorithm")
.displayName("Record Hashing Algorithm")
.description("The algorithm used to hash the combined set of resolved RecordPath values for cache storage.")
.allowableValues(
NONE_ALGORITHM_VALUE,
MD5_ALGORITHM_VALUE,
SHA1_ALGORITHM_VALUE,
SHA256_ALGORITHM_VALUE,
SHA512_ALGORITHM_VALUE
)
.defaultValue(SHA1_ALGORITHM_VALUE.getValue())
.defaultValue(SHA256_ALGORITHM_VALUE.getValue())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@ -210,12 +223,16 @@ public class DetectDuplicateRecord extends AbstractProcessor {
static final PropertyDescriptor FILTER_TYPE = new PropertyDescriptor.Builder()
.name("filter-type")
.displayName("Filter Type")
.description("The filter used to determine whether a record has been seen before based on the matching RecordPath criteria.")
.description("The filter used to determine whether a record has been seen before based on the matching RecordPath " +
"criteria. If hash set is selected, a Java HashSet object will be used to deduplicate all encountered " +
"records. If the bloom filter option is selected, a bloom filter will be used. The bloom filter option is " +
"less memory intensive, but has a chance of having false positives.")
.allowableValues(
HASH_SET_VALUE,
BLOOM_FILTER_VALUE
)
.defaultValue(HASH_SET_VALUE.getValue())
.dependsOn(DEDUPLICATION_STRATEGY, OPTION_SINGLE_FILE)
.required(true)
.build();
@ -227,6 +244,7 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.defaultValue("25000")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.dependsOn(FILTER_TYPE, BLOOM_FILTER_VALUE)
.required(true)
.build();
@ -269,20 +287,15 @@ public class DetectDuplicateRecord extends AbstractProcessor {
private Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
private final Serializer<CacheValue> cacheValueSerializer = new CacheValueSerializer();
private final Deserializer<CacheValue> cacheValueDeserializer = new CacheValueDeserializer();
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(DEDUPLICATION_STRATEGY);
descriptors.add(DISTRIBUTED_MAP_CACHE);
descriptors.add(CACHE_IDENTIFIER);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(INCLUDE_ZERO_RECORD_FLOWFILES);
descriptors.add(CACHE_IDENTIFIER);
descriptors.add(CACHE_ENTRY_IDENTIFIER);
descriptors.add(AGE_OFF_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(RECORD_HASHING_ALGORITHM);
descriptors.add(FILTER_TYPE);
descriptors.add(FILTER_CAPACITY_HINT);
@ -318,24 +331,20 @@ public class DetectDuplicateRecord extends AbstractProcessor {
"to access information about the field and the value of the field being evaluated.")
.required(false)
.dynamic(true)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new RecordPathPropertyNameValidator())
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
RecordPathValidator recordPathValidator = new RecordPathValidator();
final List<ValidationResult> validationResults = validationContext.getProperties().keySet().stream()
.filter(PropertyDescriptor::isDynamic)
.map(property -> recordPathValidator.validate(
"User-defined Properties",
property.getName(),
validationContext
)).collect(Collectors.toList());
List<ValidationResult> validationResults = new ArrayList<>();
if(validationContext.getProperty(BLOOM_FILTER_FPP).isSet()) {
final double falsePositiveProbability = validationContext.getProperty(BLOOM_FILTER_FPP).asDouble();
boolean useSingleFile = context.getProperty(DEDUPLICATION_STRATEGY).getValue().equals(OPTION_SINGLE_FILE.getValue());
if (useSingleFile && context.getProperty(BLOOM_FILTER_FPP).isSet()) {
final double falsePositiveProbability = context.getProperty(BLOOM_FILTER_FPP).asDouble();
if (falsePositiveProbability < 0 || falsePositiveProbability > 1) {
validationResults.add(
new ValidationResult.Builder()
@ -344,42 +353,62 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.explanation("Valid values are 0.0 - 1.0 inclusive")
.valid(false).build());
}
}
if(validationContext.getProperty(CACHE_IDENTIFIER).asBoolean()) {
if(!validationContext.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet())
} else if (!useSingleFile) {
if (!context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) {
validationResults.add(new ValidationResult.Builder()
.subject(DISTRIBUTED_CACHE_SERVICE.getName())
.explanation(DISTRIBUTED_CACHE_SERVICE.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
.valid(false).build());
if(!validationContext.getProperty(CACHE_ENTRY_IDENTIFIER).isSet())
validationResults.add(new ValidationResult.Builder()
.subject(CACHE_ENTRY_IDENTIFIER.getName())
.explanation(CACHE_ENTRY_IDENTIFIER.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
.valid(false).build());
if(!validationContext.getProperty(AGE_OFF_DURATION).isSet())
validationResults.add(new ValidationResult.Builder()
.subject(AGE_OFF_DURATION.getName())
.explanation(AGE_OFF_DURATION.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
.subject(DISTRIBUTED_MAP_CACHE.getName())
.explanation("Multiple files deduplication was chosen, but a distributed map cache client was " +
"not configured")
.valid(false).build());
}
}
return validationResults;
}
private DistributedMapCacheClient mapCacheClient;
private RecordReaderFactory readerFactory;
private RecordSetWriterFactory writerFactory;
private boolean useInMemoryStrategy;
@OnScheduled
public void compileRecordPaths(final ProcessContext context) {
final List<String> recordPaths = new ArrayList<>();
recordPaths.addAll(context.getProperties().keySet().stream()
public void onScheduled(final ProcessContext context) {
dynamicProperties = context.getProperties().keySet().stream()
.filter(PropertyDescriptor::isDynamic)
.map(PropertyDescriptor::getName)
.collect(toList()));
.collect(Collectors.toList());
recordPathCache = new RecordPathCache(recordPaths.size());
this.recordPaths = recordPaths;
int cacheSize = dynamicProperties.size();
recordPathCache = new RecordPathCache(cacheSize);
if (context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) {
mapCacheClient = context.getProperty(DISTRIBUTED_MAP_CACHE).asControllerService(DistributedMapCacheClient.class);
}
readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
String strategy = context.getProperty(DEDUPLICATION_STRATEGY).getValue();
useInMemoryStrategy = strategy.equals(OPTION_SINGLE_FILE.getValue());
}
private FilterWrapper getFilter(ProcessContext context) {
if (useInMemoryStrategy) {
boolean useHashSet = context.getProperty(FILTER_TYPE).getValue()
.equals(context.getProperty(HASH_SET_VALUE.getValue()));
final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger();
return useHashSet
? new HashSetFilterWrapper(new HashSet<>(filterCapacity))
: new BloomFilterWrapper(BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
filterCapacity,
context.getProperty(BLOOM_FILTER_FPP).asDouble()
));
} else {
return new DistributedMapCacheClientWrapper(mapCacheClient);
}
}
@Override
@ -390,44 +419,29 @@ public class DetectDuplicateRecord extends AbstractProcessor {
}
final ComponentLog logger = getLogger();
final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
if (isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
FlowFile nonDuplicatesFlowFile = session.create(flowFile);
FlowFile duplicatesFlowFile = session.create(flowFile);
try {
final long now = System.currentTimeMillis();
final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
long index = 0;
final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean();
final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger();
Serializable serializableFilter = context.getProperty(FILTER_TYPE).getValue()
.equals(context.getProperty(HASH_SET_VALUE.getValue()))
? new HashSet<String>(filterCapacity)
: BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
filterCapacity,
context.getProperty(BLOOM_FILTER_FPP).asDouble());
WriteResult nonDuplicatesWriteResult = null;
WriteResult duplicatesWriteResult = null;
String duplicateMimeType = null;
String nonDuplicateMimeType = null;
if(shouldCacheIdentifier && cache.containsKey(cacheKey, keySerializer)) {
CacheValue cacheValue = cache.get(cacheKey, keySerializer, cacheValueDeserializer);
Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
if(durationMS != null && (now >= cacheValue.getEntryTimeMS() + durationMS)) {
boolean status = cache.remove(cacheKey, keySerializer);
logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status});
} else {
serializableFilter = cacheValue.getFilter();
}
}
final FilterWrapper filter = FilterWrapper.create(serializableFilter);
boolean error = false;
try (
final InputStream inputStream = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, inputStream, logger);
final OutputStream nonDupeStream = session.write(nonDuplicatesFlowFile);
final OutputStream dupeStream = session.write(duplicatesFlowFile);
final RecordSetWriter nonDuplicatesWriter = writerFactory
.createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), nonDupeStream, nonDuplicatesFlowFile);
final RecordSetWriter duplicatesWriter = writerFactory
.createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), dupeStream, duplicatesFlowFile);
) {
final FilterWrapper filter = getFilter(context);
final String recordHashingAlgorithm = context.getProperty(RECORD_HASHING_ALGORITHM).getValue();
final MessageDigest messageDigest = recordHashingAlgorithm.equals(NONE_ALGORITHM_VALUE.getValue())
@ -435,14 +449,6 @@ public class DetectDuplicateRecord extends AbstractProcessor {
: DigestUtils.getDigest(recordHashingAlgorithm);
final Boolean matchWholeRecord = context.getProperties().keySet().stream().noneMatch(p -> p.isDynamic());
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordReader reader = readerFactory.createRecordReader(flowFile.getAttributes(), session.read(flowFile), logger);
final RecordSchema writeSchema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
final RecordSetWriter nonDuplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(nonDuplicatesFlowFile));
final RecordSetWriter duplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(duplicatesFlowFile));
nonDuplicatesWriter.beginRecordSet();
duplicatesWriter.beginRecordSet();
Record record;
@ -451,114 +457,125 @@ public class DetectDuplicateRecord extends AbstractProcessor {
String recordValue;
if (matchWholeRecord) {
recordValue = Joiner.on('~').join(record.getValues());
recordValue = Joiner.on(JOIN_CHAR).join(record.getValues());
} else {
final List<String> fieldValues = new ArrayList<>();
for (final String recordPathText : recordPaths) {
final PropertyValue recordPathPropertyValue = context.getProperty(recordPathText);
final RecordPath recordPath = recordPathCache.getCompiled(recordPathText);
final RecordPathResult result = recordPath.evaluate(record);
final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
if(recordPathPropertyValue.isExpressionLanguagePresent()) {
final Map<String, String> fieldVariables = new HashMap<>();
selectedFields.forEach(fieldVal -> {
fieldVariables.clear();
fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName());
fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null));
fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue());
});
} else {
fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue());
}
fieldValues.addAll(selectedFields.stream()
.map(f -> recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue())
.collect(toList())
);
}
recordValue = Joiner.on('~').join(fieldValues);
recordValue = executeDynamicRecordPaths(context, record, flowFile);
}
final String recordHash = messageDigest != null
String recordHash = messageDigest != null
? Hex.encodeHexString(messageDigest.digest(getBytesUtf8(recordValue)))
: recordValue;
messageDigest.reset();
if(filter.contains(recordHash)) {
if (!useInMemoryStrategy && context.getProperty(CACHE_IDENTIFIER).isSet()) {
Map<String, String> additional = new HashMap<>();
additional.put("record.hash.value", recordHash);
String rawPath = context.getProperty(CACHE_IDENTIFIER).evaluateAttributeExpressions(flowFile, additional).getValue();
RecordPath compiled = recordPathCache.getCompiled(rawPath);
RecordPathResult result = compiled.evaluate(record);
FieldValue fieldValue = result.getSelectedFields().findFirst().get();
if (fieldValue.getValue() == null) {
throw new ProcessException(String.format("The path \"%s\" failed to create an ID value at record index %d", rawPath, index));
}
recordHash = fieldValue.getValue().toString();
}
if (filter.contains(recordHash)) {
duplicatesWriter.write(record);
} else {
nonDuplicatesWriter.write(record);
filter.put(recordHash);
}
filter.put(recordHash);
index++;
}
final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet()
? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()
: true;
duplicateMimeType = duplicatesWriter.getMimeType();
nonDuplicateMimeType = nonDuplicatesWriter.getMimeType();
// Route Non-Duplicates FlowFile
final WriteResult nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet();
nonDuplicatesWriter.close();
Map<String, String> attributes = new HashMap<>();
attributes.putAll(nonDuplicatesWriteResult.getAttributes());
attributes.put("record.count", String.valueOf(nonDuplicatesWriteResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), nonDuplicatesWriter.getMimeType());
nonDuplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes);
logger.info("Successfully found {} unique records for {}", new Object[] {nonDuplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile});
if(!includeZeroRecordFlowFiles && nonDuplicatesWriteResult.getRecordCount() == 0) {
session.remove(nonDuplicatesFlowFile);
} else {
session.transfer(nonDuplicatesFlowFile, REL_NON_DUPLICATE);
}
nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet();
// Route Duplicates FlowFile
final WriteResult duplicatesWriteResult = duplicatesWriter.finishRecordSet();
duplicatesWriter.close();
attributes.clear();
attributes.putAll(duplicatesWriteResult.getAttributes());
attributes.put("record.count", String.valueOf(duplicatesWriteResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), duplicatesWriter.getMimeType());
duplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes);
logger.info("Successfully found {} duplicate records for {}", new Object[] {duplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile});
if(!includeZeroRecordFlowFiles && duplicatesWriteResult.getRecordCount() == 0) {
session.remove(duplicatesFlowFile);
} else {
session.transfer(duplicatesFlowFile, REL_DUPLICATE);
}
session.adjustCounter("Records Processed",
nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false);
if(shouldCacheIdentifier) {
CacheValue cacheValue = new CacheValue(serializableFilter, now);
cache.put(cacheKey, cacheValue, keySerializer, cacheValueSerializer);
}
session.transfer(flowFile, REL_ORIGINAL);
duplicatesWriteResult = duplicatesWriter.finishRecordSet();
} catch (final Exception e) {
logger.error("Failed in detecting duplicate records.", e);
session.remove(duplicatesFlowFile);
session.remove(nonDuplicatesFlowFile);
session.transfer(flowFile, REL_FAILURE);
return;
logger.error("Failed in detecting duplicate records at index " + index, e);
error = true;
} finally {
if (!error) {
final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean();
session.adjustCounter("Records Processed",
nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false);
sendOrRemove(session, duplicatesFlowFile, REL_DUPLICATE, duplicateMimeType,
includeZeroRecordFlowFiles, duplicatesWriteResult);
sendOrRemove(session, nonDuplicatesFlowFile, REL_NON_DUPLICATE, nonDuplicateMimeType,
includeZeroRecordFlowFiles, nonDuplicatesWriteResult);
session.transfer(flowFile, REL_ORIGINAL);
} else {
session.remove(duplicatesFlowFile);
session.remove(nonDuplicatesFlowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}
private void sendOrRemove(ProcessSession session,
FlowFile outputFlowFile,
Relationship targetRelationship,
String mimeType,
boolean includeZeroRecordFlowFiles,
WriteResult writeResult) {
if (!includeZeroRecordFlowFiles && writeResult.getRecordCount() == 0) {
session.remove(outputFlowFile);
} else {
Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Successfully found {} unique records for {}",
writeResult.getRecordCount(), outputFlowFile);
}
session.transfer(outputFlowFile, targetRelationship);
}
}
private String executeDynamicRecordPaths(ProcessContext context, Record record, FlowFile flowFile) {
final List<String> fieldValues = new ArrayList<>();
for (final PropertyDescriptor propertyDescriptor : dynamicProperties) {
final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = recordPathCache.getCompiled(value);
final RecordPathResult result = recordPath.evaluate(record);
final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
fieldValues.add(propertyDescriptor.getName());
fieldValues.addAll(selectedFields.stream()
.map(f -> f.getValue().toString())
.collect(toList())
);
}
return Joiner.on(JOIN_CHAR).join(fieldValues);
}
private abstract static class FilterWrapper {
public static FilterWrapper create(Object filter) {
if(filter instanceof HashSet) {
if (filter instanceof HashSet) {
return new HashSetFilterWrapper((HashSet<String>) filter);
} else {
return new BloomFilterWrapper((BloomFilter<String>) filter);
}
}
public abstract boolean contains(String value);
public abstract void put(String value);
}
@ -600,6 +617,34 @@ public class DetectDuplicateRecord extends AbstractProcessor {
}
}
private static class DistributedMapCacheClientWrapper extends FilterWrapper {
private DistributedMapCacheClient client;
public DistributedMapCacheClientWrapper(DistributedMapCacheClient client) {
this.client = client;
}
@Override
public boolean contains(String value) {
try {
return client.containsKey(value, STRING_SERIALIZER);
} catch (IOException e) {
throw new ProcessException("Distributed Map lookup failed", e);
}
}
@Override
public void put(String value) {
/*
* This needs to be a noop because this process will be used upstream of the systems that would write the records
* that power the map cache.
*/
}
}
private static final Serializer<String> STRING_SERIALIZER = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
private static final Serializer<Boolean> BOOLEAN_SERIALIZER = (value, output) -> output.write((byte) (value ? 1 : 0));
private static class CacheValue implements Serializable {
private final Serializable filter;
@ -618,31 +663,4 @@ public class DetectDuplicateRecord extends AbstractProcessor {
return entryTimeMS;
}
}
private static class CacheValueSerializer implements Serializer<CacheValue> {
@Override
public void serialize(CacheValue cacheValue, OutputStream outputStream) throws SerializationException, IOException {
new ObjectOutputStream(outputStream).writeObject(cacheValue);
}
}
private static class CacheValueDeserializer implements Deserializer<CacheValue> {
@Override
public CacheValue deserialize(byte[] bytes) throws DeserializationException, IOException {
try {
return (CacheValue) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}

View File

@ -26,7 +26,7 @@ org.apache.nifi.processors.standard.CryptographicHashAttribute
org.apache.nifi.processors.standard.CryptographicHashContent
org.apache.nifi.processors.standard.DebugFlow
org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DetectDuplicateRecord
org.apache.nifi.processors.standard.DeduplicateRecord
org.apache.nifi.processors.standard.DistributeLoad
org.apache.nifi.processors.standard.DuplicateFlowFile
org.apache.nifi.processors.standard.EncryptContent

View File

@ -0,0 +1,70 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>DeduplicateRecords</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h1>Overview</h1>
<p>This processor provides deduplication across either a single record set file, across several files or even across an entire data lake
using a DistributedMapCacheClient controller service. In the case of the former, it uses either a HashSet or a bloom
filter to provide extremely fast in-memory calculations with a high degree of accuracy. In the latter use case, it
will use the controller service to compare a generated hash against a map cache stored in one of the supported caching
options that Apache NiFi offers.</p>
<h2>Configuring single file deduplication</h2>
<p>Choose the "single file" option under the configuration property labeled "Deduplication Strategy." Then choose
whether to use a bloom filter or hash set. Be mindful to set size limits that are in line with the average size of the
record sets that you process.</p>
<h2>Configuring multi-file deduplication</h2>
<p>Select the "Multiple Files" option under "Deduplication Strategy" and then configure a DistributedMapCacheClient service.
It is possible to configure a cache identifier in multiple ways:</p>
<ol>
<li>Generate a hash of the entire record by specifying no dynamic properties.</li>
<li>Generate a hash using dynamic properties to specify particular fields to use.</li>
<li>Manually specify a single record path statement in the cache identifier property. Note:
<ul>
<li>This can be chained with #1 and #2 because it supports expression language and exposes the computed
hash from #1 or #2 as the EL variable <em>record.hash.value</em>. Example:
<em>concat('${some.var}', -, '${record.hash.value}')</em>
</li>
</ul>
</li>
</ol>
<h2>The role of dynamic properties</h2>
<p>Dynamic properties should have a human-readable name for the property name and a record path operation for the
value. The record path operations will be used to extract values from the record to assemble a unique identifier. Here is an example:</p>
<ul>
<li>firstName => /FirstName</li>
<li>lastName => /LastName</li>
</ul>
<p>Record:</p>
<pre>
{
"firstName": "John",
"lastName": "Smith"
}
</pre>
<p>Will yield an identifier that has "John" and "Smith" in it before a hash is generated from the final value.</p>
<p>If any record path is missing, it will cause an exception to be raised and the flowfile will be sent to the
failure relationship.</p>
</body>
</html>

View File

@ -1,96 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>DetectDuplicateRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>This processor makes use of the NiFi RecordPath Domain-Specific Language (DSL) to allow the user to
indicate which field(s) in the Record should be used to determine uniqueness. Users do this by adding
a User-defined Property to the Processor's configuration. The name of the User-defined Property must
be the RecordPath text that should be evaluated against each Record. All of the values identified by
the record paths are hashed together in the order they were specified to derive a unique value
representing a single Record. This hashed value is then optionally stored in the cache for
subsequent FlowFile processing.</p>
<p>If a RecordPath is given and does not match any field in an input Record, that Property will be
skipped and all other Properties will still be evaluated. If the RecordPath matches no fields the
record will be routed to the 'non-duplicate' relationship. If no User-defined Properties specifying
a RecordPath are defined, all field values of the record will be used.</p>
<p>After all RecordPath values are resolved, the values are combined in the order of the User-defined
Properties and hashed together using the specified hashing algorithm, ensuring constant space per record.</p>
<h2>Choosing a Filter Type</h2>
<p></p>
<h2>Examples</h2>
<p>Below, we lay out some examples in order to provide clarity about the Processor's behavior.
For all of the examples below, consider the example to operate on the following set of 2 (JSON) records:</p>
<code>
<pre>
[
{
"id": 1,
"name": "John",
"gender": "M",
},
{
"id": 2,
"name": "Susan",
"gender": "F",
},
{
"id": 3,
"name": "Megan",
"gender": "F",
},
{
"id": 2,
"name": "Jerry",
"gender": "M",
},
]
</pre>
</code>
<h3>Example 1: Matching on a Single Record Field</h3>
<p>A valid property RecordPath mapping would be <em>/id => ${field.value}</em>.</p>
<p>For a record set with JSON like that, the records will be evaluated against the <code>id</code> field
to determine uniqueness.</p>
<ul>
<li><strong>non-duplicate:</strong> John, Susan, Megan</li>
<li><strong>duplicate:</strong> Jerry</li>
</ul>
<h3>Example 2: Matching on Multiple Record Fields</h3>
<p>If we wanted to define these records to be unique based on the <code>id</code> and <code>gender</code> fields,
we would specify two RecordPath mappings: <em>/id => ${field.value}</em> and <em>/gender => ${field.value}</em>.</p>
<ul>
<li><strong>non-duplicate:</strong> John, Susan, Megan, Jerry</li>
<li><strong>duplicate:</strong> <em>None</em></li>
</ul>
<h3>Example 3: Matching on All Record Fields</h3>
<p>Do not define any RecordPath properties in the processor to use all fields by default.</p>
<p>For a record set with JSON like that, the records will be evaluated against the <code>id, name, gender</code>
fields to determine uniqueness.</p>
<ul>
<li><strong>non-duplicate:</strong> John, Susan, Megan, Jerry</li>
<li><strong>duplicate:</strong> <em>None</em></li>
</ul>
</body>
</html>

View File

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.distributed.cache.client.Deserializer
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient {
def map = [:]
@Override
def <K, V> boolean putIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer1) throws IOException {
def retVal = map.containsKey(k)
if (retVal) {
false
} else {
map[k] = v
true
}
}
@Override
def <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer1, Deserializer<V> deserializer) throws IOException {
return null
}
@Override
def <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
return map.containsKey(k)
}
@Override
def <K, V> void put(K k, V v, Serializer<K> serializer, Serializer<V> serializer1) throws IOException {
}
@Override
def <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
return null
}
@Override
void close() throws IOException {
}
@Override
def <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
return false
}
@Override
long removeByPattern(String s) throws IOException {
return 0
}
void assertContains(String key, String value) {
assert map.containsKey(key) && map[key] == value
}
}

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestDeduplicateRecord {
private TestRunner runner;
private MockRecordParser reader;
private MockRecordWriter writer;
@BeforeEach
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(DeduplicateRecord.class);
// RECORD_READER, RECORD_WRITER
reader = new MockRecordParser();
writer = new MockRecordWriter("header", false);
runner.addControllerService("reader", reader);
runner.enableControllerService(reader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(DeduplicateRecord.RECORD_READER, "reader");
runner.setProperty(DeduplicateRecord.RECORD_WRITER, "writer");
runner.setProperty(DeduplicateRecord.RECORD_HASHING_ALGORITHM, DeduplicateRecord.SHA256_ALGORITHM_VALUE);
reader.addSchemaField("firstName", RecordFieldType.STRING);
reader.addSchemaField("middleName", RecordFieldType.STRING);
reader.addSchemaField("lastName", RecordFieldType.STRING);
// INCLUDE_ZERO_RECORD_FLOWFILES
runner.setProperty(DeduplicateRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true");
runner.assertValid();
}
void commonEnqueue() {
final Map<String, String> props = new HashMap<>();
props.put("hash.value", "1000");
runner.enqueue(new byte[]{}, props);
}
@Test
public void testInvalidRecordPathCausesValidationError() {
runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
runner.setProperty("middle_name", "//////middleName");
runner.assertNotValid();
}
@Test
public void testDetectDuplicatesHashSet() {
commonEnqueue();
runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
runner.setProperty("middle_name", "/middleName");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 2, 1);
}
@Test
public void testDetectDuplicatesBloomFilter() {
commonEnqueue();
runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE);
runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10");
runner.setProperty("middle_name", "/middleName");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 2, 1);
}
@Test
public void testNoDuplicatesHashSet() {
commonEnqueue();
runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
runner.setProperty("middle_name", "/middleName");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 3, 0);
}
@Test
public void testNoDuplicatesBloomFilter() {
commonEnqueue();
runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE);
runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10");
runner.setProperty("middle_name", "/middleName");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 3, 0);
}
@Test
public void testAllDuplicates() {
commonEnqueue();
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 1, 2);
}
@Test
public void testAllUnique() {
commonEnqueue();
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 3, 0);
}
@Test
public void testCacheValueFromRecordPath() {
commonEnqueue();
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jack", "Z", "Brown");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 2, 1);
}
/*
* These are all related to NIFI-6014
*/
@Test
public void testMultipleFileDeduplicationRequiresDMC() {
runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
runner.assertNotValid();
}
public static final String FIRST_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList(
"John", "Q", "Smith"
)));
public static final String SECOND_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList(
"Jack", "Z", "Brown"
)));
@Test
public void testDeduplicateWithDMC() throws Exception {
DistributedMapCacheClient dmc = new MockCacheService<>();
runner.addControllerService("dmc", dmc);
runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc");
runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
runner.enableControllerService(dmc);
runner.assertValid();
dmc.put(FIRST_KEY, true, null, null);
dmc.put(SECOND_KEY, true, null, null);
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 1, 3);
}
@Test
public void testDeduplicateWithDMCAndCacheIdentifier() throws Exception {
DistributedMapCacheClient dmc = new MockCacheService<>();
runner.addControllerService("dmc", dmc);
runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc");
runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
runner.setProperty(DeduplicateRecord.CACHE_IDENTIFIER, "concat('${user.name}', '${record.hash.value}')");
runner.enableControllerService(dmc);
runner.assertValid();
dmc.put(String.format("john.smith-%s", FIRST_KEY), true, null, null);
dmc.put(String.format("john.smith-%s", SECOND_KEY), true, null, null);
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
Map<String, String> attrs = new HashMap<>();
attrs.put("user.name", "john.smith-");
runner.enqueue("", attrs);
runner.run();
doCountTests(0, 1, 1, 1, 1, 3);
}
void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) {
runner.assertTransferCount(DeduplicateRecord.REL_DUPLICATE, duplicates);
runner.assertTransferCount(DeduplicateRecord.REL_NON_DUPLICATE, notDuplicates);
runner.assertTransferCount(DeduplicateRecord.REL_ORIGINAL, original);
runner.assertTransferCount(DeduplicateRecord.REL_FAILURE, failure);
List<MockFlowFile> duplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_DUPLICATE);
if (duplicateFlowFile != null) {
assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count"));
}
List<MockFlowFile> nonDuplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_NON_DUPLICATE);
if (nonDuplicateFlowFile != null) {
assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count"));
}
}
private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
private Map storage;
public MockCacheService() {
storage = new HashMap<>();
}
@Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
return false;
}
@Override
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
return storage.containsKey(key);
}
@Override
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
storage.put(key, value);
}
@Override
public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
return false;
}
@Override
public long removeByPattern(String regex) throws IOException {
return 0;
}
}
}

View File

@ -1,209 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.*;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.standard.DetectDuplicateRecord.*;
import static org.junit.Assert.assertEquals;
public class TestDetectDuplicateRecord {
private TestRunner runner;
private MockCacheService cache;
private MockRecordParser reader;
private MockRecordWriter writer;
@BeforeClass
public static void beforeClass() {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicateRecord", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicateRecord", "debug");
}
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(DetectDuplicateRecord.class);
// RECORD_READER, RECORD_WRITER
reader = new MockRecordParser();
writer = new MockRecordWriter("header", false);
runner.addControllerService("reader", reader);
runner.enableControllerService(reader);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(RECORD_READER, "reader");
runner.setProperty(RECORD_WRITER, "writer");
reader.addSchemaField("firstName", RecordFieldType.STRING);
reader.addSchemaField("middleName", RecordFieldType.STRING);
reader.addSchemaField("lastName", RecordFieldType.STRING);
// INCLUDE_ZERO_RECORD_FLOWFILES
runner.setProperty(INCLUDE_ZERO_RECORD_FLOWFILES, "true");
// CACHE_IDENTIFIER
runner.setProperty(CACHE_IDENTIFIER, "true");
// DISTRIBUTED_CACHE_SERVICE
cache = new MockCacheService();
runner.addControllerService("cache", cache);
runner.setProperty(DISTRIBUTED_CACHE_SERVICE, "cache");
runner.enableControllerService(cache);
// CACHE_ENTRY_IDENTIFIER
final Map<String, String> props = new HashMap<>();
props.put("hash.value", "1000");
runner.enqueue(new byte[]{}, props);
// AGE_OFF_DURATION
runner.setProperty(AGE_OFF_DURATION, "48 hours");
runner.assertValid();
}
@Test
public void testDetectDuplicatesHashSet() {
runner.setProperty(FILTER_TYPE, HASH_SET_VALUE);
runner.setProperty("/middleName", "${field.value}");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 2, 1);
}
@Test
public void testDetectDuplicatesBloomFilter() {
runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE);
runner.setProperty(BLOOM_FILTER_FPP, "0.10");
runner.setProperty("/middleName", "${field.value}");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 2, 1);
}
@Test
public void testNoDuplicatesHashSet() {
runner.setProperty(FILTER_TYPE, HASH_SET_VALUE);
runner.setProperty("/middleName", "${field.value}");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 3, 0);
}
@Test
public void testNoDuplicatesBloomFilter() {
runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE);
runner.setProperty(BLOOM_FILTER_FPP, "0.10");
runner.setProperty("/middleName", "${field.value}");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 3, 0);
}
@Test
public void testAllDuplicates() {
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("John", "Q", "Smith");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 0, 1, 2);
}
@Test
public void testAllUnique() {
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 3, 0);
}
@Test
public void testCacheValueFromRecordPath() {
runner.setProperty(CACHE_ENTRY_IDENTIFIER, "Users");
reader.addRecord("John", "Q", "Smith");
reader.addRecord("Jack", "Z", "Brown");
reader.addRecord("Jane", "X", "Doe");
runner.enqueue("");
runner.run();
doCountTests(0, 1, 1, 1, 2, 1);
cache.assertContains("KEY", "VALUE"); // TODO: Get the tests running so you can see what the key/value is in serialized form
}
void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) {
runner.assertTransferCount(REL_DUPLICATE, duplicates);
runner.assertTransferCount(REL_NON_DUPLICATE, notDuplicates);
runner.assertTransferCount(REL_ORIGINAL, original);
runner.assertTransferCount(REL_FAILURE, failure);
List<MockFlowFile> duplicateFlowFile = runner.getFlowFilesForRelationship(REL_DUPLICATE);
if (duplicateFlowFile != null) {
assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count"));
}
List<MockFlowFile> nonDuplicateFlowFile = runner.getFlowFilesForRelationship(REL_NON_DUPLICATE);
if (nonDuplicateFlowFile != null) {
assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count"));
}
}
}