mirror of https://github.com/apache/nifi.git
NIFI-6139: Add fragment attributes to PartitionRecord
This closes #3382. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
48a6c81fa2
commit
ce09b93ef1
|
@ -29,6 +29,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -48,6 +49,7 @@ import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
@ -84,6 +86,11 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"),
|
@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"),
|
||||||
@WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate"),
|
@WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate"),
|
||||||
|
@WritesAttribute(attribute = "fragment.identifier", description = "All partitioned FlowFiles produced from the same parent FlowFile will have the same randomly "
|
||||||
|
+ "generated UUID added for this attribute"),
|
||||||
|
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"),
|
||||||
|
@WritesAttribute(attribute = "fragment.count", description = "The number of partitioned FlowFiles generated from the parent FlowFile"),
|
||||||
|
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile"),
|
||||||
@WritesAttribute(attribute="<dynamic property name>",
|
@WritesAttribute(attribute="<dynamic property name>",
|
||||||
description = "For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information.")
|
description = "For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information.")
|
||||||
})
|
})
|
||||||
|
@ -232,6 +239,8 @@ public class PartitionRecord extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// For each RecordSetWriter, finish the record set and close the writer.
|
// For each RecordSetWriter, finish the record set and close the writer.
|
||||||
|
int fragmentIndex = 0;
|
||||||
|
final String fragmentId = UUID.randomUUID().toString();
|
||||||
for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : writerMap.entrySet()) {
|
for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : writerMap.entrySet()) {
|
||||||
final RecordValueMap valueMap = entry.getKey();
|
final RecordValueMap valueMap = entry.getKey();
|
||||||
final RecordSetWriter writer = entry.getValue();
|
final RecordSetWriter writer = entry.getValue();
|
||||||
|
@ -244,11 +253,16 @@ public class PartitionRecord extends AbstractProcessor {
|
||||||
attributes.putAll(writeResult.getAttributes());
|
attributes.putAll(writeResult.getAttributes());
|
||||||
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||||
|
attributes.put(FragmentAttributes.FRAGMENT_INDEX.key(), String.valueOf(fragmentIndex));
|
||||||
|
attributes.put(FragmentAttributes.FRAGMENT_ID.key(), fragmentId);
|
||||||
|
attributes.put(FragmentAttributes.FRAGMENT_COUNT.key(), String.valueOf(writerMap.size()));
|
||||||
|
attributes.put(FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
|
||||||
FlowFile childFlowFile = valueMap.getFlowFile();
|
FlowFile childFlowFile = valueMap.getFlowFile();
|
||||||
childFlowFile = session.putAllAttributes(childFlowFile, attributes);
|
childFlowFile = session.putAllAttributes(childFlowFile, attributes);
|
||||||
|
|
||||||
session.adjustCounter("Record Processed", writeResult.getRecordCount(), false);
|
session.adjustCounter("Record Processed", writeResult.getRecordCount(), false);
|
||||||
|
fragmentIndex++;
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||||
|
@ -80,6 +81,9 @@ public class TestPartitionRecord {
|
||||||
|
|
||||||
assertEquals(3L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("1")).count());
|
assertEquals(3L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("1")).count());
|
||||||
assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).count());
|
assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).count());
|
||||||
|
out.forEach(ff -> ff.assertAttributeEquals("fragment.count", "4"));
|
||||||
|
IntStream.of(1, 3).forEach((i) -> out.get(i).assertAttributeEquals("fragment.id", out.get(0).getAttribute("fragment.id")));
|
||||||
|
IntStream.of(0, 3).forEach((i) -> out.get(i).assertAttributeEquals("fragment.index", String.valueOf(i)));
|
||||||
|
|
||||||
out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).forEach(ff -> ff.assertContentEquals("Jake,49,\nJake,14,\n"));
|
out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).forEach(ff -> ff.assertContentEquals("Jake,49,\nJake,14,\n"));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue