mirror of https://github.com/apache/nifi.git
NIFI-9814: Add range sampling to SampleRecord
- Incorporated review comments This closes #5878 Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
parent
241d619138
commit
f2774c4924
|
@ -16,19 +16,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.Range;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.behavior.SystemResource;
|
import org.apache.nifi.annotation.behavior.SystemResource;
|
||||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.Validator;
|
import org.apache.nifi.components.Validator;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -46,12 +46,12 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
import org.apache.nifi.serialization.WriteResult;
|
import org.apache.nifi.serialization.WriteResult;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.util.StringUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -59,24 +59,35 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@Tags({"record", "sample"})
|
@Tags({"record", "sample", "reservoir", "range", "interval"})
|
||||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||||
@CapabilityDescription("Samples the records of a FlowFile based on a specified sampling strategy (such as Reservoir Sampling). The resulting "
|
@CapabilityDescription("Samples the records of a FlowFile based on a specified sampling strategy (such as Reservoir Sampling). The resulting "
|
||||||
+ "FlowFile may be of a fixed number of records (in the case of reservoir-based algorithms) or some subset of the total number of records "
|
+ "FlowFile may be of a fixed number of records (in the case of reservoir-based algorithms) or some subset of the total number of records "
|
||||||
+ "(in the case of probabilistic sampling), or a deterministic number of records (in the case of interval sampling).")
|
+ "(in the case of probabilistic sampling), or a deterministic number of records (in the case of interval sampling).")
|
||||||
@SystemResourceConsideration(resource = SystemResource.MEMORY)
|
@SystemResourceConsideration(resource = SystemResource.MEMORY)
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "mime.type", description = "The MIME type indicated by the record writer"),
|
||||||
|
@WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")
|
||||||
|
})
|
||||||
public class SampleRecord extends AbstractProcessor {
|
public class SampleRecord extends AbstractProcessor {
|
||||||
|
|
||||||
static final String INTERVAL_SAMPLING_KEY = "interval";
|
static final String INTERVAL_SAMPLING_KEY = "interval";
|
||||||
|
static final String RANGE_SAMPLING_KEY = "range";
|
||||||
static final String PROBABILISTIC_SAMPLING_KEY = "probabilistic";
|
static final String PROBABILISTIC_SAMPLING_KEY = "probabilistic";
|
||||||
static final String RESERVOIR_SAMPLING_KEY = "reservoir";
|
static final String RESERVOIR_SAMPLING_KEY = "reservoir";
|
||||||
|
|
||||||
static final AllowableValue INTERVAL_SAMPLING = new AllowableValue(INTERVAL_SAMPLING_KEY, "Interval Sampling",
|
static final AllowableValue INTERVAL_SAMPLING = new AllowableValue(INTERVAL_SAMPLING_KEY, "Interval Sampling",
|
||||||
"Selects every Nth record where N is the value of the 'Interval Value' property");
|
"Selects every Nth record where N is the value of the 'Interval Value' property");
|
||||||
|
static final AllowableValue RANGE_SAMPLING = new AllowableValue(RANGE_SAMPLING_KEY, "Range Sampling",
|
||||||
|
"Creates a sample of records based on the index (i.e. record number) of the records using the specified range. An example is '3,6-8,20-' which includes the third record, "
|
||||||
|
+ "the sixth, seventh and eighth record, and all records from the twentieth record on. Commas separate intervals that don't overlap, and an interval can be between two numbers "
|
||||||
|
+ "(i.e. 6-8) or up to a given number (i.e. -5), or from a number to the number of the last record (i.e. 20-).");
|
||||||
static final AllowableValue PROBABILISTIC_SAMPLING = new AllowableValue(PROBABILISTIC_SAMPLING_KEY, "Probabilistic Sampling",
|
static final AllowableValue PROBABILISTIC_SAMPLING = new AllowableValue(PROBABILISTIC_SAMPLING_KEY, "Probabilistic Sampling",
|
||||||
"Selects each record with probability P where P is the value of the 'Selection Probability' property");
|
"Selects each record with probability P where P is the value of the 'Selection Probability' property");
|
||||||
static final AllowableValue RESERVOIR_SAMPLING = new AllowableValue(RESERVOIR_SAMPLING_KEY, "Reservoir Sampling",
|
static final AllowableValue RESERVOIR_SAMPLING = new AllowableValue(RESERVOIR_SAMPLING_KEY, "Reservoir Sampling",
|
||||||
|
@ -84,6 +95,10 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
+ "the value of the 'Reservoir Size' property. Note that if the value is very large it may cause memory issues as "
|
+ "the value of the 'Reservoir Size' property. Note that if the value is very large it may cause memory issues as "
|
||||||
+ "the reservoir is kept in-memory.");
|
+ "the reservoir is kept in-memory.");
|
||||||
|
|
||||||
|
private final static Pattern RANGE_PATTERN = Pattern.compile("^([0-9]+)?(-)?([0-9]+)?(,([0-9]+)?-?([0-9]+)?)*?");
|
||||||
|
private final static Pattern INTERVAL_PATTERN = Pattern.compile("([0-9]+)?(-)?([0-9]+)?(?:,|$)");
|
||||||
|
|
||||||
|
|
||||||
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
|
||||||
.name("record-reader")
|
.name("record-reader")
|
||||||
.displayName("Record Reader")
|
.displayName("Record Reader")
|
||||||
|
@ -102,7 +117,7 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
.name("sample-record-sampling-strategy")
|
.name("sample-record-sampling-strategy")
|
||||||
.displayName("Sampling Strategy")
|
.displayName("Sampling Strategy")
|
||||||
.description("Specifies which method to use for sampling records from the incoming FlowFile")
|
.description("Specifies which method to use for sampling records from the incoming FlowFile")
|
||||||
.allowableValues(INTERVAL_SAMPLING, PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING)
|
.allowableValues(INTERVAL_SAMPLING, RANGE_SAMPLING, PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING)
|
||||||
.required(true)
|
.required(true)
|
||||||
.defaultValue(RESERVOIR_SAMPLING.getValue())
|
.defaultValue(RESERVOIR_SAMPLING.getValue())
|
||||||
.addValidator(Validator.VALID)
|
.addValidator(Validator.VALID)
|
||||||
|
@ -114,9 +129,21 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
+ "used if Sampling Strategy is set to Interval Sampling. A value of zero (0) will cause no records to be included in the"
|
+ "used if Sampling Strategy is set to Interval Sampling. A value of zero (0) will cause no records to be included in the"
|
||||||
+ "outgoing FlowFile, a value of one (1) will cause all records to be included, and a value of two (2) will cause half the "
|
+ "outgoing FlowFile, a value of one (1) will cause all records to be included, and a value of two (2) will cause half the "
|
||||||
+ "records to be included, and so on.")
|
+ "records to be included, and so on.")
|
||||||
.required(false)
|
.required(true)
|
||||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.dependsOn(SAMPLING_STRATEGY, INTERVAL_SAMPLING)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor SAMPLING_RANGE = new PropertyDescriptor.Builder()
|
||||||
|
.name("sample-record-range")
|
||||||
|
.displayName("Sampling Range")
|
||||||
|
.description("Specifies the range of records to include in the sample, from 1 to the total number of records. An example is '3,6-8,20-' which includes the third record, the sixth, "
|
||||||
|
+ "seventh and eighth records, and all records from the twentieth record on. Commas separate intervals that don't overlap, and an interval can be between two numbers "
|
||||||
|
+ "(i.e. 6-8) or up to a given number (i.e. -5), or from a number to the number of the last record (i.e. 20-). If this property is unset, all records will be included.")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(Validator.VALID)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.dependsOn(SAMPLING_STRATEGY, RANGE_SAMPLING)
|
||||||
.build();
|
.build();
|
||||||
static final PropertyDescriptor SAMPLING_PROBABILITY = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor SAMPLING_PROBABILITY = new PropertyDescriptor.Builder()
|
||||||
.name("sample-record-probability")
|
.name("sample-record-probability")
|
||||||
|
@ -124,18 +151,20 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
.description("Specifies the probability (as a percent from 0-100) of a record being included in the outgoing FlowFile. This property is only "
|
.description("Specifies the probability (as a percent from 0-100) of a record being included in the outgoing FlowFile. This property is only "
|
||||||
+ "used if Sampling Strategy is set to Probabilistic Sampling. A value of zero (0) will cause no records to be included in the"
|
+ "used if Sampling Strategy is set to Probabilistic Sampling. A value of zero (0) will cause no records to be included in the"
|
||||||
+ "outgoing FlowFile, and a value of 100 will cause all records to be included in the outgoing FlowFile..")
|
+ "outgoing FlowFile, and a value of 100 will cause all records to be included in the outgoing FlowFile..")
|
||||||
.required(false)
|
.required(true)
|
||||||
.addValidator(StandardValidators.createLongValidator(0, 100, true))
|
.addValidator(StandardValidators.createLongValidator(0, 100, true))
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.dependsOn(SAMPLING_STRATEGY, PROBABILISTIC_SAMPLING)
|
||||||
.build();
|
.build();
|
||||||
static final PropertyDescriptor RESERVOIR_SIZE = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor RESERVOIR_SIZE = new PropertyDescriptor.Builder()
|
||||||
.name("sample-record-reservoir")
|
.name("sample-record-reservoir")
|
||||||
.displayName("Reservoir Size")
|
.displayName("Reservoir Size")
|
||||||
.description("Specifies the number of records to write to the outgoing FlowFile. This property is only used if Sampling Strategy is set to "
|
.description("Specifies the number of records to write to the outgoing FlowFile. This property is only used if Sampling Strategy is set to "
|
||||||
+ "reservoir-based strategies such as Reservoir Sampling or Weighted Random Sampling.")
|
+ "reservoir-based strategies such as Reservoir Sampling.")
|
||||||
.required(false)
|
.required(true)
|
||||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.dependsOn(SAMPLING_STRATEGY, RESERVOIR_SAMPLING)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
static final PropertyDescriptor RANDOM_SEED = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor RANDOM_SEED = new PropertyDescriptor.Builder()
|
||||||
|
@ -146,6 +175,7 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
.required(false)
|
.required(false)
|
||||||
.addValidator(StandardValidators.LONG_VALIDATOR)
|
.addValidator(StandardValidators.LONG_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.dependsOn(SAMPLING_STRATEGY, PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||||
|
@ -173,6 +203,7 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
props.add(RECORD_WRITER_FACTORY);
|
props.add(RECORD_WRITER_FACTORY);
|
||||||
props.add(SAMPLING_STRATEGY);
|
props.add(SAMPLING_STRATEGY);
|
||||||
props.add(SAMPLING_INTERVAL);
|
props.add(SAMPLING_INTERVAL);
|
||||||
|
props.add(SAMPLING_RANGE);
|
||||||
props.add(SAMPLING_PROBABILITY);
|
props.add(SAMPLING_PROBABILITY);
|
||||||
props.add(RESERVOIR_SIZE);
|
props.add(RESERVOIR_SIZE);
|
||||||
props.add(RANDOM_SEED);
|
props.add(RANDOM_SEED);
|
||||||
|
@ -195,37 +226,6 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
|
||||||
|
|
||||||
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
|
|
||||||
|
|
||||||
final String samplingStrategyValue = validationContext.getProperty(SAMPLING_STRATEGY).getValue();
|
|
||||||
if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
|
||||||
final PropertyValue pd = validationContext.getProperty(SAMPLING_INTERVAL);
|
|
||||||
if (!pd.isSet()) {
|
|
||||||
results.add(new ValidationResult.Builder().subject(INTERVAL_SAMPLING.getDisplayName()).valid(false)
|
|
||||||
.explanation(SAMPLING_INTERVAL.getDisplayName() + " property must be set to use " + INTERVAL_SAMPLING.getDisplayName() + " strategy")
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
} else if (PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
|
||||||
final PropertyValue samplingProbabilityProperty = validationContext.getProperty(SAMPLING_PROBABILITY);
|
|
||||||
if (!samplingProbabilityProperty.isSet()) {
|
|
||||||
results.add(new ValidationResult.Builder().subject(PROBABILISTIC_SAMPLING.getDisplayName()).valid(false)
|
|
||||||
.explanation(SAMPLING_PROBABILITY.getDisplayName() + " property must be set to use " + PROBABILISTIC_SAMPLING.getDisplayName() + " strategy")
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
} else if (RESERVOIR_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
|
||||||
final PropertyValue pd = validationContext.getProperty(RESERVOIR_SIZE);
|
|
||||||
if (!pd.isSet()) {
|
|
||||||
results.add(new ValidationResult.Builder().subject(RESERVOIR_SAMPLING.getDisplayName()).valid(false)
|
|
||||||
.explanation(RESERVOIR_SIZE.getDisplayName() + " property must be set to use " + RESERVOIR_SAMPLING.getDisplayName() + " strategy")
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
FlowFile flowFile = session.get();
|
FlowFile flowFile = session.get();
|
||||||
|
@ -252,6 +252,9 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
||||||
final int intervalValue = context.getProperty(SAMPLING_INTERVAL).evaluateAttributeExpressions(outFlowFile).asInteger();
|
final int intervalValue = context.getProperty(SAMPLING_INTERVAL).evaluateAttributeExpressions(outFlowFile).asInteger();
|
||||||
samplingStrategy = new IntervalSamplingStrategy(recordSetWriter, intervalValue);
|
samplingStrategy = new IntervalSamplingStrategy(recordSetWriter, intervalValue);
|
||||||
|
} else if (RANGE_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
||||||
|
final String rangeExpression = context.getProperty(SAMPLING_RANGE).evaluateAttributeExpressions(outFlowFile).getValue();
|
||||||
|
samplingStrategy = new RangeSamplingStrategy(recordSetWriter, rangeExpression);
|
||||||
} else if (PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
} else if (PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
|
||||||
final int probabilityValue = context.getProperty(SAMPLING_PROBABILITY).evaluateAttributeExpressions(outFlowFile).asInteger();
|
final int probabilityValue = context.getProperty(SAMPLING_PROBABILITY).evaluateAttributeExpressions(outFlowFile).asInteger();
|
||||||
final Long randomSeed = context.getProperty(RANDOM_SEED).isSet()
|
final Long randomSeed = context.getProperty(RANDOM_SEED).isSet()
|
||||||
|
@ -332,6 +335,86 @@ public class SampleRecord extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class RangeSamplingStrategy implements SamplingStrategy {
|
||||||
|
|
||||||
|
final RecordSetWriter writer;
|
||||||
|
final String rangeExpression;
|
||||||
|
int currentCount = 1;
|
||||||
|
final List<Range<Integer>> ranges = new ArrayList<>();
|
||||||
|
|
||||||
|
RangeSamplingStrategy(final RecordSetWriter writer, final String rangeExpression) {
|
||||||
|
this.writer = writer;
|
||||||
|
this.rangeExpression = rangeExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws IOException {
|
||||||
|
currentCount = 1;
|
||||||
|
ranges.clear();
|
||||||
|
writer.beginRecordSet();
|
||||||
|
Matcher validateRangeExpression = RANGE_PATTERN.matcher(rangeExpression);
|
||||||
|
if (!validateRangeExpression.matches()) {
|
||||||
|
throw new IOException(rangeExpression + " is not a valid range expression");
|
||||||
|
}
|
||||||
|
Integer startRange, endRange;
|
||||||
|
if (StringUtils.isEmpty(rangeExpression)) {
|
||||||
|
startRange = 0;
|
||||||
|
endRange = Integer.MAX_VALUE;
|
||||||
|
ranges.add(Range.between(startRange, endRange));
|
||||||
|
} else {
|
||||||
|
Matcher m = INTERVAL_PATTERN.matcher(rangeExpression);
|
||||||
|
while (m.find()) {
|
||||||
|
// groupCount will be 3, need to check nulls to see if it's a range or single number. Groups that are all null are ignored
|
||||||
|
if (m.group(1) == null && m.group(2) == null && m.group(3) == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m.group(1) != null) {
|
||||||
|
startRange = Integer.parseInt(m.group(1));
|
||||||
|
} else if ("-".equals(m.group(2))) {
|
||||||
|
startRange = 0;
|
||||||
|
} else {
|
||||||
|
startRange = null;
|
||||||
|
}
|
||||||
|
if (m.group(3) != null) {
|
||||||
|
endRange = Integer.parseInt(m.group(3));
|
||||||
|
} else if ("-".equals(m.group(2))) {
|
||||||
|
endRange = Integer.MAX_VALUE;
|
||||||
|
} else {
|
||||||
|
endRange = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Range<Integer> range;
|
||||||
|
|
||||||
|
if (startRange != null && endRange == null) {
|
||||||
|
// Single value
|
||||||
|
range = Range.between(startRange, startRange);
|
||||||
|
} else {
|
||||||
|
range = Range.between(startRange, endRange);
|
||||||
|
}
|
||||||
|
ranges.add(range);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sample(Record record) throws IOException {
|
||||||
|
// Check the current record number against the specified ranges
|
||||||
|
for (Range<Integer> range : ranges) {
|
||||||
|
if (range.contains(currentCount)) {
|
||||||
|
writer.write(record);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
currentCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WriteResult finish() throws IOException {
|
||||||
|
return writer.finishRecordSet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class ProbabilisticSamplingStrategy implements SamplingStrategy {
|
static class ProbabilisticSamplingStrategy implements SamplingStrategy {
|
||||||
final RecordSetWriter writer;
|
final RecordSetWriter writer;
|
||||||
final int probabilityValue;
|
final int probabilityValue;
|
||||||
|
|
|
@ -32,8 +32,10 @@ import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@ -116,6 +118,82 @@ public class TestSampleRecord {
|
||||||
out.assertAttributeEquals("record.count", "0");
|
out.assertAttributeEquals("record.count", "0");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRangeSampling() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(SampleRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
|
||||||
|
runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
|
||||||
|
runner.setProperty(SampleRecord.SAMPLING_STRATEGY, SampleRecord.RANGE_SAMPLING_KEY);
|
||||||
|
runner.assertNotValid();
|
||||||
|
runner.setProperty(SampleRecord.SAMPLING_RANGE, "1,4-5,98-"); // 1, 4, 5, 98, 99, 100 -- one-based not zero based
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
for (int i = 1; i <= 100; i++) {
|
||||||
|
readerService.addRecord(i, 5 + i);
|
||||||
|
}
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
|
||||||
|
MockFlowFile out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
|
||||||
|
out.assertAttributeEquals("record.count", "6");
|
||||||
|
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(SampleRecord.SAMPLING_RANGE, "3");
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
|
||||||
|
out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
|
||||||
|
out.assertAttributeEquals("record.count", "1");
|
||||||
|
out.assertContentEquals("header\n3,8\n");
|
||||||
|
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(SampleRecord.SAMPLING_RANGE, "-2");
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
|
||||||
|
out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
|
||||||
|
out.assertAttributeEquals("record.count", "2");
|
||||||
|
out.assertContentEquals("header\n1,6\n2,7\n");
|
||||||
|
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(SampleRecord.SAMPLING_RANGE, "${range}");
|
||||||
|
Map<String, String> attrs = Collections.singletonMap("range", "8,20");
|
||||||
|
runner.enqueue("", attrs);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
|
||||||
|
out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
|
||||||
|
out.assertAttributeEquals("record.count", "2");
|
||||||
|
out.assertContentEquals("header\n8,13\n20,25\n");
|
||||||
|
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(SampleRecord.SAMPLING_RANGE, "");
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
|
||||||
|
runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
|
||||||
|
out = runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
|
||||||
|
out.assertAttributeEquals("record.count", "100");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProbabilisticSamplingWithSeed() throws InitializationException {
|
public void testProbabilisticSamplingWithSeed() throws InitializationException {
|
||||||
final MockRecordParser readerService = new MockRecordParser();
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
|
Loading…
Reference in New Issue