diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java similarity index 53% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java index 191a6755d3..b055a75273 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java @@ -22,94 +22,111 @@ import com.google.common.hash.Funnels; import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.MessageDigestAlgorithms; -import org.apache.nifi.annotation.behavior.*; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.*; -import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; -import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.*; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; import org.apache.nifi.record.path.util.RecordPathCache; -import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator; import org.apache.nifi.record.path.validation.RecordPathValidator; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.*; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.util.DataTypeUtils; -import java.io.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; import static org.apache.commons.codec.binary.StringUtils.getBytesUtf8; -import static org.apache.commons.lang3.StringUtils.*; @EventDriven @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @SystemResourceConsideration(resource = SystemResource.MEMORY, - description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " + - "The BloomFilter type will use constant memory regardless of the number of records processed.") -@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique", - "filter", "hash", "dupe", "duplicate", "dedupe"}) -@CapabilityDescription("Caches records from each incoming FlowFile and determines if the record " + - "has already been seen. If so, routes the record to 'duplicate'. If the record is " + - "not determined to be a duplicate, it is routed to 'non-duplicate'." + description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " + + "The BloomFilter type will use constant memory regardless of the number of records processed.") +@SystemResourceConsideration(resource = SystemResource.CPU, + description = "If a more advanced hash algorithm is chosen, the amount of time required to hash any particular " + + "record could increase substantially." ) +@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique", + "filter", "hash", "dupe", "duplicate", "dedupe"}) +@CapabilityDescription("This processor attempts to deduplicate a record set in memory using either a hashset or a bloom filter. " + + "It operates on a per-file basis rather than across an entire data set that spans multiple files.") @WritesAttribute(attribute = "record.count", description = "The number of records processed.") @DynamicProperty( - name = "RecordPath", - value = "An expression language statement used to determine how the RecordPath is resolved. " + - "The following variables are availble: ${field.name}, ${field.value}, ${field.type}", - description = "The name of each user-defined property must be a valid RecordPath.") + name = "RecordPath", + value = "An expression language statement used to determine how the RecordPath is resolved. " + + "The following variables are availible: ${field.name}, ${field.value}, ${field.type}", + description = "The name of each user-defined property must be a valid RecordPath.") @SeeAlso(classNames = { - "org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", - "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", - "org.apache.nifi.processors.standard.DetectDuplicate" + "org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", + "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", + "org.apache.nifi.processors.standard.DetectDuplicate" }) -public class DetectDuplicateRecord extends AbstractProcessor { +public class DeduplicateRecord extends AbstractProcessor { + public static final char JOIN_CHAR = '~'; private static final String FIELD_NAME = "field.name"; private static final String FIELD_VALUE = "field.value"; private static final String FIELD_TYPE = "field.type"; private volatile RecordPathCache recordPathCache; - private volatile List recordPaths; + private volatile List dynamicProperties; // VALUES static final AllowableValue NONE_ALGORITHM_VALUE = new AllowableValue("none", "None", "Do not use a hashing algorithm. The value of resolved RecordPaths will be combined with tildes (~) to form the unique record key. " + "This may use significantly more storage depending on the size and shape or your data."); - static final AllowableValue MD5_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.MD5, "MD5", - "The MD5 message-digest algorithm."); - static final AllowableValue SHA1_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_1, "SHA-1", - "The SHA-1 cryptographic hash algorithm."); - static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_256, "SHA-256", + static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_256, "SHA-256", "The SHA-256 cryptographic hash algorithm."); - static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_512, "SHA-512", + static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_512, "SHA-512", "The SHA-512 cryptographic hash algorithm."); static final AllowableValue HASH_SET_VALUE = new AllowableValue("hash-set", "HashSet", @@ -139,6 +156,45 @@ public class DetectDuplicateRecord extends AbstractProcessor { .required(true) .build(); + static final AllowableValue OPTION_SINGLE_FILE = new AllowableValue("single", "Single File"); + static final AllowableValue OPTION_MULTIPLE_FILES = new AllowableValue("multiple", "Multiple Files"); + + static final PropertyDescriptor DEDUPLICATION_STRATEGY = new PropertyDescriptor.Builder() + .name("deduplication-strategy") + .displayName("Deduplication Strategy") + .description("The strategy to use for detecting and isolating duplicate records. The option for doing it " + + "across a single data file will operate in memory, whereas the one for going across the enter repository " + + "will require a distributed map cache.") + .allowableValues(OPTION_SINGLE_FILE, OPTION_MULTIPLE_FILES) + .defaultValue(OPTION_SINGLE_FILE.getValue()) + .required(true) + .build(); + + static final PropertyDescriptor DISTRIBUTED_MAP_CACHE = new PropertyDescriptor.Builder() + .name("distributed-map-cache") + .displayName("Distributed Map Cache client") + .description("This configuration is required when the deduplication strategy is set to 'multiple files.' The map " + + "cache will be used to check a data source such as HBase or Redis for entries indicating that a record has " + + "been processed before. This option requires a downstream process that uses PutDistributedMapCache to write " + + "an entry to the cache data source once the record has been processed to indicate that it has been handled before.") + .identifiesControllerService(DistributedMapCacheClient.class) + .required(false) + .addValidator(Validator.VALID) + .dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES) + .build(); + + static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder() + .name("cache-identifier") + .displayName("Cache Identifier") + .description("This option defines a record path operation to use for defining the cache identifier. It can be used " + + "in addition to the hash settings. This field will have the expression language attribute \"record.hash.value\" " + + "available to it to use with it to generate the record path operation.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(Validator.VALID) + .dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES) + .build(); + static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder() .name("include-zero-record-flowfiles") .displayName("Include Zero Record FlowFiles") @@ -150,59 +206,16 @@ public class DetectDuplicateRecord extends AbstractProcessor { .required(true) .build(); - static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder() - .name("cache-the-entry-identifier") - .displayName("Cache The Entry Identifier") - .description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, " - + "the processor would only check for duplicates and not cache the Entry Identifier, requiring another " - + "processor to add identifiers to the distributed cache.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - - static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() - .name("distributed-cache-service") - .displayName("Distributed Cache Service") - .description("The Controller Service that is used to cache unique records, used to determine duplicates") - .required(false) - .identifiesControllerService(DistributedMapCacheClient.class) - .build(); - - static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder() - .name("cache-entry-identifier") - .displayName("Cache Entry Identifier") - .description( - "A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated " + - "against a FlowFile in order to determine the cached filter type value used to identify duplicates.") - .required(false) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .defaultValue("${hash.value}") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder() - .name("age-off-duration") - .displayName("Age Off Duration") - .description("Time interval to age off cached filter entries. When the cache expires, the entire filter and its values " + - "are destroyed. Leaving this value empty will cause the cached entries to never expire but may eventually be rotated " + - "out when the cache servers rotation policy automatically expires entries.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - static final PropertyDescriptor RECORD_HASHING_ALGORITHM = new PropertyDescriptor.Builder() .name("record-hashing-algorithm") .displayName("Record Hashing Algorithm") .description("The algorithm used to hash the combined set of resolved RecordPath values for cache storage.") .allowableValues( NONE_ALGORITHM_VALUE, - MD5_ALGORITHM_VALUE, - SHA1_ALGORITHM_VALUE, SHA256_ALGORITHM_VALUE, SHA512_ALGORITHM_VALUE ) - .defaultValue(SHA1_ALGORITHM_VALUE.getValue()) + .defaultValue(SHA256_ALGORITHM_VALUE.getValue()) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -210,12 +223,16 @@ public class DetectDuplicateRecord extends AbstractProcessor { static final PropertyDescriptor FILTER_TYPE = new PropertyDescriptor.Builder() .name("filter-type") .displayName("Filter Type") - .description("The filter used to determine whether a record has been seen before based on the matching RecordPath criteria.") + .description("The filter used to determine whether a record has been seen before based on the matching RecordPath " + + "criteria. If hash set is selected, a Java HashSet object will be used to deduplicate all encountered " + + "records. If the bloom filter option is selected, a bloom filter will be used. The bloom filter option is " + + "less memory intensive, but has a chance of having false positives.") .allowableValues( HASH_SET_VALUE, BLOOM_FILTER_VALUE ) .defaultValue(HASH_SET_VALUE.getValue()) + .dependsOn(DEDUPLICATION_STRATEGY, OPTION_SINGLE_FILE) .required(true) .build(); @@ -227,6 +244,7 @@ public class DetectDuplicateRecord extends AbstractProcessor { .defaultValue("25000") .expressionLanguageSupported(ExpressionLanguageScope.NONE) .addValidator(StandardValidators.INTEGER_VALIDATOR) + .dependsOn(FILTER_TYPE, BLOOM_FILTER_VALUE) .required(true) .build(); @@ -269,20 +287,15 @@ public class DetectDuplicateRecord extends AbstractProcessor { private Set relationships; - private final Serializer keySerializer = new StringSerializer(); - private final Serializer cacheValueSerializer = new CacheValueSerializer(); - private final Deserializer cacheValueDeserializer = new CacheValueDeserializer(); - @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); + descriptors.add(DEDUPLICATION_STRATEGY); + descriptors.add(DISTRIBUTED_MAP_CACHE); + descriptors.add(CACHE_IDENTIFIER); descriptors.add(RECORD_READER); descriptors.add(RECORD_WRITER); descriptors.add(INCLUDE_ZERO_RECORD_FLOWFILES); - descriptors.add(CACHE_IDENTIFIER); - descriptors.add(CACHE_ENTRY_IDENTIFIER); - descriptors.add(AGE_OFF_DURATION); - descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(RECORD_HASHING_ALGORITHM); descriptors.add(FILTER_TYPE); descriptors.add(FILTER_CAPACITY_HINT); @@ -318,24 +331,20 @@ public class DetectDuplicateRecord extends AbstractProcessor { "to access information about the field and the value of the field being evaluated.") .required(false) .dynamic(true) + .addValidator(new RecordPathValidator()) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new RecordPathPropertyNameValidator()) .build(); } @Override - protected Collection customValidate(final ValidationContext validationContext) { + protected Collection customValidate(final ValidationContext context) { RecordPathValidator recordPathValidator = new RecordPathValidator(); - final List validationResults = validationContext.getProperties().keySet().stream() - .filter(PropertyDescriptor::isDynamic) - .map(property -> recordPathValidator.validate( - "User-defined Properties", - property.getName(), - validationContext - )).collect(Collectors.toList()); + List validationResults = new ArrayList<>(); - if(validationContext.getProperty(BLOOM_FILTER_FPP).isSet()) { - final double falsePositiveProbability = validationContext.getProperty(BLOOM_FILTER_FPP).asDouble(); + boolean useSingleFile = context.getProperty(DEDUPLICATION_STRATEGY).getValue().equals(OPTION_SINGLE_FILE.getValue()); + + if (useSingleFile && context.getProperty(BLOOM_FILTER_FPP).isSet()) { + final double falsePositiveProbability = context.getProperty(BLOOM_FILTER_FPP).asDouble(); if (falsePositiveProbability < 0 || falsePositiveProbability > 1) { validationResults.add( new ValidationResult.Builder() @@ -344,42 +353,62 @@ public class DetectDuplicateRecord extends AbstractProcessor { .explanation("Valid values are 0.0 - 1.0 inclusive") .valid(false).build()); } - } - - if(validationContext.getProperty(CACHE_IDENTIFIER).asBoolean()) { - if(!validationContext.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet()) + } else if (!useSingleFile) { + if (!context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) { validationResults.add(new ValidationResult.Builder() - .subject(DISTRIBUTED_CACHE_SERVICE.getName()) - .explanation(DISTRIBUTED_CACHE_SERVICE.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.") - .valid(false).build()); - - if(!validationContext.getProperty(CACHE_ENTRY_IDENTIFIER).isSet()) - validationResults.add(new ValidationResult.Builder() - .subject(CACHE_ENTRY_IDENTIFIER.getName()) - .explanation(CACHE_ENTRY_IDENTIFIER.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.") - .valid(false).build()); - - if(!validationContext.getProperty(AGE_OFF_DURATION).isSet()) - validationResults.add(new ValidationResult.Builder() - .subject(AGE_OFF_DURATION.getName()) - .explanation(AGE_OFF_DURATION.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.") + .subject(DISTRIBUTED_MAP_CACHE.getName()) + .explanation("Multiple files deduplication was chosen, but a distributed map cache client was " + + "not configured") .valid(false).build()); + } } return validationResults; } + private DistributedMapCacheClient mapCacheClient; + private RecordReaderFactory readerFactory; + private RecordSetWriterFactory writerFactory; + + private boolean useInMemoryStrategy; + @OnScheduled - public void compileRecordPaths(final ProcessContext context) { - final List recordPaths = new ArrayList<>(); - - recordPaths.addAll(context.getProperties().keySet().stream() + public void onScheduled(final ProcessContext context) { + dynamicProperties = context.getProperties().keySet().stream() .filter(PropertyDescriptor::isDynamic) - .map(PropertyDescriptor::getName) - .collect(toList())); + .collect(Collectors.toList()); - recordPathCache = new RecordPathCache(recordPaths.size()); - this.recordPaths = recordPaths; + int cacheSize = dynamicProperties.size(); + + recordPathCache = new RecordPathCache(cacheSize); + + if (context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) { + mapCacheClient = context.getProperty(DISTRIBUTED_MAP_CACHE).asControllerService(DistributedMapCacheClient.class); + } + + readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + String strategy = context.getProperty(DEDUPLICATION_STRATEGY).getValue(); + + useInMemoryStrategy = strategy.equals(OPTION_SINGLE_FILE.getValue()); + } + + private FilterWrapper getFilter(ProcessContext context) { + if (useInMemoryStrategy) { + boolean useHashSet = context.getProperty(FILTER_TYPE).getValue() + .equals(context.getProperty(HASH_SET_VALUE.getValue())); + final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger(); + return useHashSet + ? new HashSetFilterWrapper(new HashSet<>(filterCapacity)) + : new BloomFilterWrapper(BloomFilter.create( + Funnels.stringFunnel(Charset.defaultCharset()), + filterCapacity, + context.getProperty(BLOOM_FILTER_FPP).asDouble() + )); + } else { + return new DistributedMapCacheClientWrapper(mapCacheClient); + } } @Override @@ -390,44 +419,29 @@ public class DetectDuplicateRecord extends AbstractProcessor { } final ComponentLog logger = getLogger(); - final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); - - if (isBlank(cacheKey)) { - logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - } FlowFile nonDuplicatesFlowFile = session.create(flowFile); FlowFile duplicatesFlowFile = session.create(flowFile); - try { - final long now = System.currentTimeMillis(); - final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + long index = 0; - final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean(); - final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger(); - Serializable serializableFilter = context.getProperty(FILTER_TYPE).getValue() - .equals(context.getProperty(HASH_SET_VALUE.getValue())) - ? new HashSet(filterCapacity) - : BloomFilter.create( - Funnels.stringFunnel(Charset.defaultCharset()), - filterCapacity, - context.getProperty(BLOOM_FILTER_FPP).asDouble()); + WriteResult nonDuplicatesWriteResult = null; + WriteResult duplicatesWriteResult = null; + String duplicateMimeType = null; + String nonDuplicateMimeType = null; - if(shouldCacheIdentifier && cache.containsKey(cacheKey, keySerializer)) { - CacheValue cacheValue = cache.get(cacheKey, keySerializer, cacheValueDeserializer); - Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS); - - if(durationMS != null && (now >= cacheValue.getEntryTimeMS() + durationMS)) { - boolean status = cache.remove(cacheKey, keySerializer); - logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status}); - } else { - serializableFilter = cacheValue.getFilter(); - } - } - - final FilterWrapper filter = FilterWrapper.create(serializableFilter); + boolean error = false; + try ( + final InputStream inputStream = session.read(flowFile); + final RecordReader reader = readerFactory.createRecordReader(flowFile, inputStream, logger); + final OutputStream nonDupeStream = session.write(nonDuplicatesFlowFile); + final OutputStream dupeStream = session.write(duplicatesFlowFile); + final RecordSetWriter nonDuplicatesWriter = writerFactory + .createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), nonDupeStream, nonDuplicatesFlowFile); + final RecordSetWriter duplicatesWriter = writerFactory + .createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), dupeStream, duplicatesFlowFile); + ) { + final FilterWrapper filter = getFilter(context); final String recordHashingAlgorithm = context.getProperty(RECORD_HASHING_ALGORITHM).getValue(); final MessageDigest messageDigest = recordHashingAlgorithm.equals(NONE_ALGORITHM_VALUE.getValue()) @@ -435,14 +449,6 @@ public class DetectDuplicateRecord extends AbstractProcessor { : DigestUtils.getDigest(recordHashingAlgorithm); final Boolean matchWholeRecord = context.getProperties().keySet().stream().noneMatch(p -> p.isDynamic()); - final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordReader reader = readerFactory.createRecordReader(flowFile.getAttributes(), session.read(flowFile), logger); - - final RecordSchema writeSchema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()); - final RecordSetWriter nonDuplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(nonDuplicatesFlowFile)); - final RecordSetWriter duplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(duplicatesFlowFile)); - nonDuplicatesWriter.beginRecordSet(); duplicatesWriter.beginRecordSet(); Record record; @@ -451,114 +457,125 @@ public class DetectDuplicateRecord extends AbstractProcessor { String recordValue; if (matchWholeRecord) { - recordValue = Joiner.on('~').join(record.getValues()); + recordValue = Joiner.on(JOIN_CHAR).join(record.getValues()); } else { - final List fieldValues = new ArrayList<>(); - for (final String recordPathText : recordPaths) { - final PropertyValue recordPathPropertyValue = context.getProperty(recordPathText); - final RecordPath recordPath = recordPathCache.getCompiled(recordPathText); - final RecordPathResult result = recordPath.evaluate(record); - final List selectedFields = result.getSelectedFields().collect(Collectors.toList()); - - if(recordPathPropertyValue.isExpressionLanguagePresent()) { - final Map fieldVariables = new HashMap<>(); - selectedFields.forEach(fieldVal -> { - fieldVariables.clear(); - fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName()); - fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null)); - fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name()); - - fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue()); - }); - } else { - fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue()); - } - - fieldValues.addAll(selectedFields.stream() - .map(f -> recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue()) - .collect(toList()) - ); - } - recordValue = Joiner.on('~').join(fieldValues); + recordValue = executeDynamicRecordPaths(context, record, flowFile); } - final String recordHash = messageDigest != null + String recordHash = messageDigest != null ? Hex.encodeHexString(messageDigest.digest(getBytesUtf8(recordValue))) : recordValue; + messageDigest.reset(); - if(filter.contains(recordHash)) { + if (!useInMemoryStrategy && context.getProperty(CACHE_IDENTIFIER).isSet()) { + Map additional = new HashMap<>(); + additional.put("record.hash.value", recordHash); + String rawPath = context.getProperty(CACHE_IDENTIFIER).evaluateAttributeExpressions(flowFile, additional).getValue(); + RecordPath compiled = recordPathCache.getCompiled(rawPath); + RecordPathResult result = compiled.evaluate(record); + FieldValue fieldValue = result.getSelectedFields().findFirst().get(); + if (fieldValue.getValue() == null) { + throw new ProcessException(String.format("The path \"%s\" failed to create an ID value at record index %d", rawPath, index)); + } + + recordHash = fieldValue.getValue().toString(); + } + + if (filter.contains(recordHash)) { duplicatesWriter.write(record); } else { nonDuplicatesWriter.write(record); + filter.put(recordHash); } - filter.put(recordHash); + index++; } - final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet() - ? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean() - : true; - + duplicateMimeType = duplicatesWriter.getMimeType(); + nonDuplicateMimeType = nonDuplicatesWriter.getMimeType(); // Route Non-Duplicates FlowFile - final WriteResult nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet(); - nonDuplicatesWriter.close(); - Map attributes = new HashMap<>(); - attributes.putAll(nonDuplicatesWriteResult.getAttributes()); - attributes.put("record.count", String.valueOf(nonDuplicatesWriteResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), nonDuplicatesWriter.getMimeType()); - nonDuplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes); - logger.info("Successfully found {} unique records for {}", new Object[] {nonDuplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile}); - - if(!includeZeroRecordFlowFiles && nonDuplicatesWriteResult.getRecordCount() == 0) { - session.remove(nonDuplicatesFlowFile); - } else { - session.transfer(nonDuplicatesFlowFile, REL_NON_DUPLICATE); - } - + nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet(); // Route Duplicates FlowFile - final WriteResult duplicatesWriteResult = duplicatesWriter.finishRecordSet(); - duplicatesWriter.close(); - attributes.clear(); - attributes.putAll(duplicatesWriteResult.getAttributes()); - attributes.put("record.count", String.valueOf(duplicatesWriteResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), duplicatesWriter.getMimeType()); - duplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes); - logger.info("Successfully found {} duplicate records for {}", new Object[] {duplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile}); - - if(!includeZeroRecordFlowFiles && duplicatesWriteResult.getRecordCount() == 0) { - session.remove(duplicatesFlowFile); - } else { - session.transfer(duplicatesFlowFile, REL_DUPLICATE); - } - - session.adjustCounter("Records Processed", - nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false); - - if(shouldCacheIdentifier) { - CacheValue cacheValue = new CacheValue(serializableFilter, now); - cache.put(cacheKey, cacheValue, keySerializer, cacheValueSerializer); - } - - session.transfer(flowFile, REL_ORIGINAL); + duplicatesWriteResult = duplicatesWriter.finishRecordSet(); } catch (final Exception e) { - logger.error("Failed in detecting duplicate records.", e); - session.remove(duplicatesFlowFile); - session.remove(nonDuplicatesFlowFile); - session.transfer(flowFile, REL_FAILURE); - return; + logger.error("Failed in detecting duplicate records at index " + index, e); + error = true; + } finally { + if (!error) { + final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean(); + + session.adjustCounter("Records Processed", + nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false); + + sendOrRemove(session, duplicatesFlowFile, REL_DUPLICATE, duplicateMimeType, + includeZeroRecordFlowFiles, duplicatesWriteResult); + + sendOrRemove(session, nonDuplicatesFlowFile, REL_NON_DUPLICATE, nonDuplicateMimeType, + includeZeroRecordFlowFiles, nonDuplicatesWriteResult); + + session.transfer(flowFile, REL_ORIGINAL); + } else { + session.remove(duplicatesFlowFile); + session.remove(nonDuplicatesFlowFile); + session.transfer(flowFile, REL_FAILURE); + } } } + private void sendOrRemove(ProcessSession session, + FlowFile outputFlowFile, + Relationship targetRelationship, + String mimeType, + boolean includeZeroRecordFlowFiles, + WriteResult writeResult) { + if (!includeZeroRecordFlowFiles && writeResult.getRecordCount() == 0) { + session.remove(outputFlowFile); + } else { + Map attributes = new HashMap<>(); + attributes.putAll(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType); + outputFlowFile = session.putAllAttributes(outputFlowFile, attributes); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Successfully found {} unique records for {}", + writeResult.getRecordCount(), outputFlowFile); + } + + session.transfer(outputFlowFile, targetRelationship); + } + } + + private String executeDynamicRecordPaths(ProcessContext context, Record record, FlowFile flowFile) { + final List fieldValues = new ArrayList<>(); + for (final PropertyDescriptor propertyDescriptor : dynamicProperties) { + final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); + final RecordPath recordPath = recordPathCache.getCompiled(value); + final RecordPathResult result = recordPath.evaluate(record); + final List selectedFields = result.getSelectedFields().collect(Collectors.toList()); + + fieldValues.add(propertyDescriptor.getName()); + + fieldValues.addAll(selectedFields.stream() + .map(f -> f.getValue().toString()) + .collect(toList()) + ); + } + + return Joiner.on(JOIN_CHAR).join(fieldValues); + } + private abstract static class FilterWrapper { public static FilterWrapper create(Object filter) { - if(filter instanceof HashSet) { + if (filter instanceof HashSet) { return new HashSetFilterWrapper((HashSet) filter); } else { return new BloomFilterWrapper((BloomFilter) filter); } } + public abstract boolean contains(String value); + public abstract void put(String value); } @@ -600,6 +617,34 @@ public class DetectDuplicateRecord extends AbstractProcessor { } } + private static class DistributedMapCacheClientWrapper extends FilterWrapper { + private DistributedMapCacheClient client; + + public DistributedMapCacheClientWrapper(DistributedMapCacheClient client) { + this.client = client; + } + + @Override + public boolean contains(String value) { + try { + return client.containsKey(value, STRING_SERIALIZER); + } catch (IOException e) { + throw new ProcessException("Distributed Map lookup failed", e); + } + } + + @Override + public void put(String value) { + /* + * This needs to be a noop because this process will be used upstream of the systems that would write the records + * that power the map cache. + */ + } + } + + private static final Serializer STRING_SERIALIZER = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8)); + private static final Serializer BOOLEAN_SERIALIZER = (value, output) -> output.write((byte) (value ? 1 : 0)); + private static class CacheValue implements Serializable { private final Serializable filter; @@ -618,31 +663,4 @@ public class DetectDuplicateRecord extends AbstractProcessor { return entryTimeMS; } } - - private static class CacheValueSerializer implements Serializer { - @Override - public void serialize(CacheValue cacheValue, OutputStream outputStream) throws SerializationException, IOException { - new ObjectOutputStream(outputStream).writeObject(cacheValue); - } - } - - private static class CacheValueDeserializer implements Deserializer { - @Override - public CacheValue deserialize(byte[] bytes) throws DeserializationException, IOException { - try { - return (CacheValue) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - return null; - } - } - - private static class StringSerializer implements Serializer { - - @Override - public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { - out.write(value.getBytes(StandardCharsets.UTF_8)); - } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 0324a5b71d..160d552158 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -26,7 +26,7 @@ org.apache.nifi.processors.standard.CryptographicHashAttribute org.apache.nifi.processors.standard.CryptographicHashContent org.apache.nifi.processors.standard.DebugFlow org.apache.nifi.processors.standard.DetectDuplicate -org.apache.nifi.processors.standard.DetectDuplicateRecord +org.apache.nifi.processors.standard.DeduplicateRecord org.apache.nifi.processors.standard.DistributeLoad org.apache.nifi.processors.standard.DuplicateFlowFile org.apache.nifi.processors.standard.EncryptContent diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html new file mode 100644 index 0000000000..07b1f64508 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html @@ -0,0 +1,70 @@ + + + + + + DeduplicateRecords + + + + + +

Overview

+

This processor provides deduplication across either a single record set file, across several files or even across an entire data lake + using a DistributedMapCacheClient controller service. In the case of the former, it uses either a HashSet or a bloom + filter to provide extremely fast in-memory calculations with a high degree of accuracy. In the latter use case, it + will use the controller service to compare a generated hash against a map cache stored in one of the supported caching + options that Apache NiFi offers.

+ +

Configuring single file deduplication

+

Choose the "single file" option under the configuration property labeled "Deduplication Strategy." Then choose + whether to use a bloom filter or hash set. Be mindful to set size limits that are in line with the average size of the + record sets that you process.

+ +

Configuring multi-file deduplication

+

Select the "Multiple Files" option under "Deduplication Strategy" and then configure a DistributedMapCacheClient service. + It is possible to configure a cache identifier in multiple ways:

+
    +
  1. Generate a hash of the entire record by specifying no dynamic properties.
  2. +
  3. Generate a hash using dynamic properties to specify particular fields to use.
  4. +
  5. Manually specify a single record path statement in the cache identifier property. Note: +
      +
    • This can be chained with #1 and #2 because it supports expression language and exposes the computed + hash from #1 or #2 as the EL variable record.hash.value. Example: + concat('${some.var}', -, '${record.hash.value}') +
    • +
    +
  6. +
+

The role of dynamic properties

+

Dynamic properties should have a human-readable name for the property name and a record path operation for the + value. The record path operations will be used to extract values from the record to assemble a unique identifier. Here is an example:

+
    +
  • firstName => /FirstName
  • +
  • lastName => /LastName
  • +
+

Record:

+
+        {
+            "firstName": "John",
+            "lastName": "Smith"
+        }
+    
+

Will yield an identifier that has "John" and "Smith" in it before a hash is generated from the final value.

+

If any record path is missing, it will cause an exception to be raised and the flowfile will be sent to the + failure relationship.

+ + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html deleted file mode 100644 index 8f243022fb..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html +++ /dev/null @@ -1,96 +0,0 @@ - - - - - - DetectDuplicateRecord - - - - -

This processor makes use of the NiFi RecordPath Domain-Specific Language (DSL) to allow the user to - indicate which field(s) in the Record should be used to determine uniqueness. Users do this by adding - a User-defined Property to the Processor's configuration. The name of the User-defined Property must - be the RecordPath text that should be evaluated against each Record. All of the values identified by - the record paths are hashed together in the order they were specified to derive a unique value - representing a single Record. This hashed value is then optionally stored in the cache for - subsequent FlowFile processing.

-

If a RecordPath is given and does not match any field in an input Record, that Property will be - skipped and all other Properties will still be evaluated. If the RecordPath matches no fields the - record will be routed to the 'non-duplicate' relationship. If no User-defined Properties specifying - a RecordPath are defined, all field values of the record will be used.

-

After all RecordPath values are resolved, the values are combined in the order of the User-defined - Properties and hashed together using the specified hashing algorithm, ensuring constant space per record.

- -

Choosing a Filter Type

-

-

Examples

-

Below, we lay out some examples in order to provide clarity about the Processor's behavior. - For all of the examples below, consider the example to operate on the following set of 2 (JSON) records:

- -
-        [
-            {
-                "id": 1,
-                "name": "John",
-                "gender": "M",
-            },
-            {
-                "id": 2,
-                "name": "Susan",
-                "gender": "F",
-            },
-            {
-                "id": 3,
-                "name": "Megan",
-                "gender": "F",
-            },
-            {
-                "id": 2,
-                "name": "Jerry",
-                "gender": "M",
-            },
-        ]
-    
-
- -

Example 1: Matching on a Single Record Field

-

A valid property RecordPath mapping would be /id => ${field.value}.

-

For a record set with JSON like that, the records will be evaluated against the id field - to determine uniqueness.

-
    -
  • non-duplicate: John, Susan, Megan
  • -
  • duplicate: Jerry
  • -
- -

Example 2: Matching on Multiple Record Fields

-

If we wanted to define these records to be unique based on the id and gender fields, - we would specify two RecordPath mappings: /id => ${field.value} and /gender => ${field.value}.

-
    -
  • non-duplicate: John, Susan, Megan, Jerry
  • -
  • duplicate: None
  • -
- -

Example 3: Matching on All Record Fields

-

Do not define any RecordPath properties in the processor to use all fields by default.

-

For a record set with JSON like that, the records will be evaluated against the id, name, gender - fields to determine uniqueness.

-
    -
  • non-duplicate: John, Susan, Megan, Jerry
  • -
  • duplicate: None
  • -
- - \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy deleted file mode 100644 index d0f5d091e4..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.standard - -import org.apache.nifi.controller.AbstractControllerService -import org.apache.nifi.distributed.cache.client.Deserializer -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient -import org.apache.nifi.distributed.cache.client.Serializer - -class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient { - def map = [:] - - @Override - def boolean putIfAbsent(K k, V v, Serializer serializer, Serializer serializer1) throws IOException { - def retVal = map.containsKey(k) - if (retVal) { - false - } else { - map[k] = v - true - } - } - - @Override - def V getAndPutIfAbsent(K k, V v, Serializer serializer, Serializer serializer1, Deserializer deserializer) throws IOException { - return null - } - - @Override - def boolean containsKey(K k, Serializer serializer) throws IOException { - return map.containsKey(k) - } - - @Override - def void put(K k, V v, Serializer serializer, Serializer serializer1) throws IOException { - - } - - @Override - def V get(K k, Serializer serializer, Deserializer deserializer) throws IOException { - return null - } - - @Override - void close() throws IOException { - - } - - @Override - def boolean remove(K k, Serializer serializer) throws IOException { - return false - } - - @Override - long removeByPattern(String s) throws IOException { - return 0 - } - - void assertContains(String key, String value) { - assert map.containsKey(key) && map[key] == value - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java new file mode 100644 index 0000000000..9de152feeb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestDeduplicateRecord { + + private TestRunner runner; + private MockRecordParser reader; + private MockRecordWriter writer; + + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(DeduplicateRecord.class); + + // RECORD_READER, RECORD_WRITER + reader = new MockRecordParser(); + writer = new MockRecordWriter("header", false); + + runner.addControllerService("reader", reader); + runner.enableControllerService(reader); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(DeduplicateRecord.RECORD_READER, "reader"); + runner.setProperty(DeduplicateRecord.RECORD_WRITER, "writer"); + runner.setProperty(DeduplicateRecord.RECORD_HASHING_ALGORITHM, DeduplicateRecord.SHA256_ALGORITHM_VALUE); + + reader.addSchemaField("firstName", RecordFieldType.STRING); + reader.addSchemaField("middleName", RecordFieldType.STRING); + reader.addSchemaField("lastName", RecordFieldType.STRING); + + // INCLUDE_ZERO_RECORD_FLOWFILES + runner.setProperty(DeduplicateRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true"); + + runner.assertValid(); + } + + void commonEnqueue() { + final Map props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); + } + + @Test + public void testInvalidRecordPathCausesValidationError() { + runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE); + runner.setProperty("middle_name", "//////middleName"); + runner.assertNotValid(); + } + + @Test + public void testDetectDuplicatesHashSet() { + commonEnqueue(); + + runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE); + runner.setProperty("middle_name", "/middleName"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jane", "X", "Doe"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 2, 1); + } + + @Test + public void testDetectDuplicatesBloomFilter() { + commonEnqueue(); + runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE); + runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10"); + runner.setProperty("middle_name", "/middleName"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jane", "X", "Doe"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 2, 1); + } + + @Test + public void testNoDuplicatesHashSet() { + commonEnqueue(); + runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE); + runner.setProperty("middle_name", "/middleName"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jane", "X", "Doe"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 3, 0); + } + + @Test + public void testNoDuplicatesBloomFilter() { + commonEnqueue(); + runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE); + runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10"); + runner.setProperty("middle_name", "/middleName"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jane", "X", "Doe"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 3, 0); + } + + @Test + public void testAllDuplicates() { + commonEnqueue(); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("John", "Q", "Smith"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 1, 2); + } + + @Test + public void testAllUnique() { + commonEnqueue(); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jane", "X", "Doe"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 3, 0); + } + + @Test + public void testCacheValueFromRecordPath() { + commonEnqueue(); + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jack", "Z", "Brown"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 2, 1); + } + + /* + * These are all related to NIFI-6014 + */ + + @Test + public void testMultipleFileDeduplicationRequiresDMC() { + runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue()); + runner.assertNotValid(); + } + + public static final String FIRST_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList( + "John", "Q", "Smith" + ))); + public static final String SECOND_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList( + "Jack", "Z", "Brown" + ))); + + @Test + public void testDeduplicateWithDMC() throws Exception { + DistributedMapCacheClient dmc = new MockCacheService<>(); + runner.addControllerService("dmc", dmc); + runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc"); + runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue()); + runner.enableControllerService(dmc); + runner.assertValid(); + + dmc.put(FIRST_KEY, true, null, null); + dmc.put(SECOND_KEY, true, null, null); + + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jane", "X", "Doe"); + + runner.enqueue(""); + runner.run(); + + doCountTests(0, 1, 1, 1, 1, 3); + } + + @Test + public void testDeduplicateWithDMCAndCacheIdentifier() throws Exception { + DistributedMapCacheClient dmc = new MockCacheService<>(); + runner.addControllerService("dmc", dmc); + runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc"); + runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue()); + runner.setProperty(DeduplicateRecord.CACHE_IDENTIFIER, "concat('${user.name}', '${record.hash.value}')"); + runner.enableControllerService(dmc); + runner.assertValid(); + + dmc.put(String.format("john.smith-%s", FIRST_KEY), true, null, null); + dmc.put(String.format("john.smith-%s", SECOND_KEY), true, null, null); + + reader.addRecord("John", "Q", "Smith"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jack", "Z", "Brown"); + reader.addRecord("Jane", "X", "Doe"); + + Map attrs = new HashMap<>(); + attrs.put("user.name", "john.smith-"); + + runner.enqueue("", attrs); + runner.run(); + + doCountTests(0, 1, 1, 1, 1, 3); + } + + void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) { + runner.assertTransferCount(DeduplicateRecord.REL_DUPLICATE, duplicates); + runner.assertTransferCount(DeduplicateRecord.REL_NON_DUPLICATE, notDuplicates); + runner.assertTransferCount(DeduplicateRecord.REL_ORIGINAL, original); + runner.assertTransferCount(DeduplicateRecord.REL_FAILURE, failure); + + List duplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_DUPLICATE); + if (duplicateFlowFile != null) { + assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count")); + } + + List nonDuplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_NON_DUPLICATE); + if (nonDuplicateFlowFile != null) { + assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count")); + } + } + + private static final class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient { + private Map storage; + + public MockCacheService() { + storage = new HashMap<>(); + } + + @Override + public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + return false; + } + + @Override + public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { + return null; + } + + @Override + public boolean containsKey(K key, Serializer keySerializer) throws IOException { + return storage.containsKey(key); + } + + @Override + public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + storage.put(key, value); + } + + @Override + public V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + return null; + } + + @Override + public void close() throws IOException { + + } + + @Override + public boolean remove(K key, Serializer serializer) throws IOException { + return false; + } + + @Override + public long removeByPattern(String regex) throws IOException { + return 0; + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java deleted file mode 100644 index 6855857848..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.record.MockRecordParser; -import org.apache.nifi.serialization.record.MockRecordWriter; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.util.*; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.nifi.processors.standard.DetectDuplicateRecord.*; -import static org.junit.Assert.assertEquals; - -public class TestDetectDuplicateRecord { - - private TestRunner runner; - private MockCacheService cache; - private MockRecordParser reader; - private MockRecordWriter writer; - - @BeforeClass - public static void beforeClass() { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); - System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicateRecord", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicateRecord", "debug"); - } - - @Before - public void setup() throws InitializationException { - runner = TestRunners.newTestRunner(DetectDuplicateRecord.class); - - // RECORD_READER, RECORD_WRITER - reader = new MockRecordParser(); - writer = new MockRecordWriter("header", false); - - runner.addControllerService("reader", reader); - runner.enableControllerService(reader); - runner.addControllerService("writer", writer); - runner.enableControllerService(writer); - - runner.setProperty(RECORD_READER, "reader"); - runner.setProperty(RECORD_WRITER, "writer"); - - reader.addSchemaField("firstName", RecordFieldType.STRING); - reader.addSchemaField("middleName", RecordFieldType.STRING); - reader.addSchemaField("lastName", RecordFieldType.STRING); - - // INCLUDE_ZERO_RECORD_FLOWFILES - runner.setProperty(INCLUDE_ZERO_RECORD_FLOWFILES, "true"); - - // CACHE_IDENTIFIER - runner.setProperty(CACHE_IDENTIFIER, "true"); - - // DISTRIBUTED_CACHE_SERVICE - cache = new MockCacheService(); - runner.addControllerService("cache", cache); - runner.setProperty(DISTRIBUTED_CACHE_SERVICE, "cache"); - runner.enableControllerService(cache); - - // CACHE_ENTRY_IDENTIFIER - final Map props = new HashMap<>(); - props.put("hash.value", "1000"); - runner.enqueue(new byte[]{}, props); - - // AGE_OFF_DURATION - runner.setProperty(AGE_OFF_DURATION, "48 hours"); - - runner.assertValid(); - } - - @Test - public void testDetectDuplicatesHashSet() { - runner.setProperty(FILTER_TYPE, HASH_SET_VALUE); - runner.setProperty("/middleName", "${field.value}"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("Jane", "X", "Doe"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 1, 2, 1); - } - - @Test - public void testDetectDuplicatesBloomFilter() { - runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE); - runner.setProperty(BLOOM_FILTER_FPP, "0.10"); - runner.setProperty("/middleName", "${field.value}"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("Jane", "X", "Doe"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 1, 2, 1); - } - - @Test - public void testNoDuplicatesHashSet() { - runner.setProperty(FILTER_TYPE, HASH_SET_VALUE); - runner.setProperty("/middleName", "${field.value}"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("Jack", "Z", "Brown"); - reader.addRecord("Jane", "X", "Doe"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 1, 3, 0); - } - - @Test - public void testNoDuplicatesBloomFilter() { - runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE); - runner.setProperty(BLOOM_FILTER_FPP, "0.10"); - runner.setProperty("/middleName", "${field.value}"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("Jack", "Z", "Brown"); - reader.addRecord("Jane", "X", "Doe"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 1, 3, 0); - } - - @Test - public void testAllDuplicates() { - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("John", "Q", "Smith"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 0, 1, 2); - } - - @Test - public void testAllUnique() { - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("Jack", "Z", "Brown"); - reader.addRecord("Jane", "X", "Doe"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 1, 3, 0); - } - - - - @Test - public void testCacheValueFromRecordPath() { - runner.setProperty(CACHE_ENTRY_IDENTIFIER, "Users"); - reader.addRecord("John", "Q", "Smith"); - reader.addRecord("Jack", "Z", "Brown"); - reader.addRecord("Jane", "X", "Doe"); - - runner.enqueue(""); - runner.run(); - - doCountTests(0, 1, 1, 1, 2, 1); - - cache.assertContains("KEY", "VALUE"); // TODO: Get the tests running so you can see what the key/value is in serialized form - } - - void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) { - runner.assertTransferCount(REL_DUPLICATE, duplicates); - runner.assertTransferCount(REL_NON_DUPLICATE, notDuplicates); - runner.assertTransferCount(REL_ORIGINAL, original); - runner.assertTransferCount(REL_FAILURE, failure); - - List duplicateFlowFile = runner.getFlowFilesForRelationship(REL_DUPLICATE); - if (duplicateFlowFile != null) { - assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count")); - } - - List nonDuplicateFlowFile = runner.getFlowFilesForRelationship(REL_NON_DUPLICATE); - if (nonDuplicateFlowFile != null) { - assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count")); - } - } -}