NIFI-6979 Add record.index field to UpdateRecord

This closes #3955

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Shawn Weeks 2020-01-03 10:47:49 -06:00 committed by Mike Thomsen
parent c15876494a
commit b470db620b
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
4 changed files with 36 additions and 5 deletions

View File

@ -144,7 +144,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
return; return;
} }
firstRecord = AbstractRecordProcessor.this.process(firstRecord, original, context); firstRecord = AbstractRecordProcessor.this.process(firstRecord, original, context, 1L);
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema()); final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes)) { try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes)) {
@ -153,8 +153,9 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
writer.write(firstRecord); writer.write(firstRecord);
Record record; Record record;
long count = 1L;
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
final Record processed = AbstractRecordProcessor.this.process(record, original, context); final Record processed = AbstractRecordProcessor.this.process(record, original, context, ++count);
writer.write(processed); writer.write(processed);
} }
@ -189,5 +190,5 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile}); getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile});
} }
protected abstract Record process(Record record, FlowFile flowFile, ProcessContext context); protected abstract Record process(Record record, FlowFile flowFile, ProcessContext context, long count);
} }

View File

@ -59,7 +59,7 @@ public class ConvertRecord extends AbstractRecordProcessor {
} }
@Override @Override
protected Record process(final Record record, final FlowFile flowFile, final ProcessContext context) { protected Record process(final Record record, final FlowFile flowFile, final ProcessContext context, final long count) {
return record; return record;
} }

View File

@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -66,12 +67,15 @@ import java.util.stream.Stream;
+ "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should " + "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should "
+ "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from " + "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from "
+ "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.") + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.")
@WritesAttribute(attribute = "record.index", description = "This attribute provides the current row index and is only available inside the literal value expression.")
@SeeAlso({ConvertRecord.class}) @SeeAlso({ConvertRecord.class})
public class UpdateRecord extends AbstractRecordProcessor { public class UpdateRecord extends AbstractRecordProcessor {
private static final String FIELD_NAME = "field.name"; private static final String FIELD_NAME = "field.name";
private static final String FIELD_VALUE = "field.value"; private static final String FIELD_VALUE = "field.value";
private static final String FIELD_TYPE = "field.type"; private static final String FIELD_TYPE = "field.type";
private static final String RECORD_INDEX = "record.index";
private volatile RecordPathCache recordPathCache; private volatile RecordPathCache recordPathCache;
private volatile List<String> recordPaths; private volatile List<String> recordPaths;
@ -142,7 +146,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
} }
@Override @Override
protected Record process(Record record, final FlowFile flowFile, final ProcessContext context) { protected Record process(Record record, final FlowFile flowFile, final ProcessContext context, final long count) {
final boolean evaluateValueAsRecordPath = context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue()); final boolean evaluateValueAsRecordPath = context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue());
for (final String recordPathText : recordPaths) { for (final String recordPathText : recordPaths) {
@ -171,6 +175,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName()); fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName());
fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null)); fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null));
fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name()); fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
fieldVariables.put(RECORD_INDEX, String.valueOf(count));
final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue(); final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue();
fieldVal.updateValue(evaluatedReplacementVal, RecordFieldType.STRING.getDataType()); fieldVal.updateValue(evaluatedReplacementVal, RecordFieldType.STRING.getDataType());

View File

@ -117,6 +117,31 @@ public class TestUpdateRecord {
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(UpdateRecord.REL_FAILURE, 1);
} }
@Test
public void testLiteralReplacementRowIndexValueExpressionLanguage() throws InitializationException {
readerService = new MockRecordParser();
readerService.addSchemaField("id", RecordFieldType.LONG);
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.LITERAL_VALUES);
runner.setProperty("/id", "${record.index}");
runner.enqueue("");
readerService.addRecord(null, "John Doe", 35);
readerService.addRecord(null, "Jane Doe", 36);
readerService.addRecord(null, "John Smith", 37);
readerService.addRecord(null, "Jane Smith", 38);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0);
out.assertContentEquals("header\n1,John Doe,35\n2,Jane Doe,36\n3,John Smith,37\n4,Jane Smith,38\n");
}
@Test @Test
public void testReplaceWithMissingRecordPath() throws InitializationException { public void testReplaceWithMissingRecordPath() throws InitializationException {
readerService = new MockRecordParser(); readerService = new MockRecordParser();