From ae9953db646f54f3731c56c8ff97503aacc041f3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 18 May 2017 13:36:31 -0400 Subject: [PATCH] NIFI-3857: This closes #1825. Added PartitionRecord processor Signed-off-by: joewitt --- .../record/path/ArrayIndexFieldValue.java | 22 + .../nifi/record/path/MapEntryFieldValue.java | 23 + .../nifi/record/path/StandardFieldValue.java | 16 + .../AbstractRecordSetWriter.java | 9 +- .../record/util/DataTypeUtils.java | 30 ++ .../org/apache/nifi/avro/AvroTypeUtil.java | 40 +- .../record/MockRecordWriter.java | 13 +- .../script/ScriptedRecordSetWriter.java | 1 - .../standard/AbstractRecordProcessor.java | 40 +- .../standard/AbstractRouteRecord.java | 9 +- .../processors/standard/LookupRecord.java | 88 +++- .../processors/standard/PartitionRecord.java | 419 ++++++++++++++++++ .../nifi/processors/standard/SplitRecord.java | 2 - .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 190 ++++++++ .../processors/standard/TestLookupRecord.java | 9 +- .../standard/TestPartitionRecord.java | 194 ++++++++ .../org/apache/nifi/lookup/LookupService.java | 2 +- .../nifi/lookup/RecordLookupService.java | 2 +- .../nifi/lookup/StringLookupService.java | 2 - .../additionalDetails.html | 125 +++++- .../WriteAvroResultWithExternalSchema.java | 5 +- .../nifi/avro/WriteAvroResultWithSchema.java | 6 +- .../org/apache/nifi/csv/WriteCSVResult.java | 5 +- .../org/apache/nifi/json/WriteJsonResult.java | 5 +- .../apache/nifi/text/FreeFormTextWriter.java | 5 +- 26 files changed, 1155 insertions(+), 108 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java index 6a94e4f61c..a753579a01 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java @@ -52,4 +52,26 @@ public class ArrayIndexFieldValue extends StandardFieldValue { public void updateValue(final Object newValue) { getParentRecord().get().setArrayValue(getField().getFieldName(), getArrayIndex(), newValue); } + + @Override + public int hashCode() { + return Objects.hash(getValue(), getField(), getParent(), index); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ArrayIndexFieldValue)) { + return false; + } + + final ArrayIndexFieldValue other = (ArrayIndexFieldValue) obj; + return Objects.equals(getValue(), other.getValue()) && Objects.equals(getField(), other.getField()) + && Objects.equals(getParent(), other.getParent()) && getArrayIndex() == other.getArrayIndex(); + } } diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java index 2553d9a680..f52af3d167 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java @@ -17,6 +17,8 @@ package org.apache.nifi.record.path; +import java.util.Objects; + import org.apache.nifi.serialization.record.RecordField; public class MapEntryFieldValue extends StandardFieldValue { @@ -36,4 +38,25 @@ public class MapEntryFieldValue extends StandardFieldValue { getParentRecord().get().setMapValue(getField().getFieldName(), getMapKey(), newValue); } + @Override + public int hashCode() { + return Objects.hash(getValue(), getField(), getParent(), mapKey); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof MapEntryFieldValue)) { + return false; + } + + final MapEntryFieldValue other = (MapEntryFieldValue) obj; + return Objects.equals(getValue(), other.getValue()) && Objects.equals(getField(), other.getField()) + && Objects.equals(getParent(), other.getParent()) && Objects.equals(getMapKey(), other.getMapKey()); + } } diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java index b02deb4485..75644c504e 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java @@ -56,6 +56,22 @@ public class StandardFieldValue implements FieldValue { return Objects.hash(value, field, parent); } + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof StandardFieldValue)) { + return false; + } + + final StandardFieldValue other = (StandardFieldValue) obj; + return Objects.equals(getValue(), other.getValue()) && Objects.equals(getField(), other.getField()) && Objects.equals(getParent(), other.getParent()); + } + @Override public String toString() { if (value instanceof Object[]) { diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java index 5feb264669..6bf574f455 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -45,11 +45,16 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { Record record; while ((record = recordSet.next()) != null) { write(record); - recordCount++; } return finishRecordSet(); } + @Override + public final WriteResult write(final Record record) throws IOException { + final Map attributes = writeRecord(record); + return WriteResult.of(++recordCount, attributes); + } + protected OutputStream getOutputStream() { return out; } @@ -102,4 +107,6 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { protected Map onFinishRecordSet() throws IOException { return Collections.emptyMap(); } + + protected abstract Map writeRecord(Record record) throws IOException; } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 1396ce183c..05c8281d90 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -174,6 +174,10 @@ public class DataTypeUtils { public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) { for (final DataType subType : choiceType.getPossibleSubTypes()) { if (isCompatibleDataType(value, subType)) { + if (subType.getFieldType() == RecordFieldType.CHOICE) { + return chooseDataType(value, (ChoiceDataType) subType); + } + return subType; } } @@ -893,4 +897,30 @@ public class DataTypeUtils { return new RecordField(fieldName, dataType, defaultValue, aliases); } + + public static boolean isScalarValue(final DataType dataType, final Object value) { + final RecordFieldType fieldType = dataType.getFieldType(); + + final RecordFieldType chosenType; + if (fieldType == RecordFieldType.CHOICE) { + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final DataType chosenDataType = chooseDataType(value, choiceDataType); + if (chosenDataType == null) { + return false; + } + + chosenType = chosenDataType.getFieldType(); + } else { + chosenType = fieldType; + } + + switch (chosenType) { + case ARRAY: + case MAP: + case RECORD: + return false; + } + + return true; + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index daf4031249..19697d2945 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -24,8 +24,10 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Array; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; import org.apache.avro.JsonProperties; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -278,6 +280,7 @@ public class AvroTypeUtil { return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName()); } + @SuppressWarnings("unchecked") private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) { if (rawValue == null) { return null; @@ -465,9 +468,13 @@ public class AvroTypeUtil { final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); try { final Object convertedValue = conversion.apply(nonNullFieldSchema); - if (DataTypeUtils.isCompatibleDataType(convertedValue, desiredDataType) - // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue - || (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) { + + if (isCompatibleDataType(convertedValue, desiredDataType)) { + return convertedValue; + } + + // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue + if (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) { return convertedValue; } } catch (Exception e) { @@ -484,6 +491,33 @@ public class AvroTypeUtil { return null; } + private static boolean isCompatibleDataType(final Object value, final DataType dataType) { + if (value == null) { + return false; + } + + switch (dataType.getFieldType()) { + case RECORD: + if (value instanceof GenericRecord || value instanceof SpecificRecord) { + return true; + } + break; + case STRING: + if (value instanceof Utf8) { + return true; + } + break; + case ARRAY: + if (value instanceof Array) { + return true; + } + break; + } + + return DataTypeUtils.isCompatibleDataType(value, dataType); + } + + /** * Convert an Avro object to a normal Java objects for further processing. * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index 525a51fa4e..1d6aafe199 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -63,12 +63,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { private int recordCount = 0; + private boolean headerWritten = false; @Override public WriteResult write(final RecordSet rs) throws IOException { - if (header != null) { + if (header != null && !headerWritten) { out.write(header.getBytes()); out.write("\n".getBytes()); + headerWritten = true; } int recordCount = 0; @@ -110,9 +112,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor @Override public WriteResult write(Record record) throws IOException { - if (header != null) { + if (++recordCount > failAfterN && failAfterN > -1) { + throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written"); + } + + if (header != null && !headerWritten) { out.write(header.getBytes()); out.write("\n".getBytes()); + headerWritten = true; } final int numCols = record.getSchema().getFieldCount(); @@ -135,8 +142,6 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } out.write("\n".getBytes()); - recordCount++; - return WriteResult.of(1, Collections.emptyMap()); } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java index b18e9de8ac..8553c32740 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -129,7 +129,6 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory extends AbstractProcessor { return; } - final T flowFileContext = getFlowFileContext(flowFile, context); + final T flowFileContext; + try { + flowFileContext = getFlowFileContext(flowFile, context); + } catch (final Exception e) { + getLogger().error("Failed to process {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 26e78b4b59..583ea5181d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -36,11 +37,13 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.lookup.LookupService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; @@ -60,19 +63,29 @@ import org.apache.nifi.util.Tuple; @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") }) -@Tags({"lookup", "enrich", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"}) +@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"}) @CapabilityDescription("Extracts a field from a Record and looks up its value in a LookupService. If a result is returned by the LookupService, " + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then " - + "routed to either the 'matched' relationship or 'unmatched' relationship, indicating whether or not a result was returned by the LookupService, " + + "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), " + + "indicating whether or not a result was returned by the LookupService, " + "allowing the processor to also function as a Routing processor. If any record in the incoming FlowFile has multiple fields match the configured " - + "Lookup RecordPath or if no fields match, then that record will be routed to failure. If one or more fields match the Result RecordPath, all fields " - + "that match will be updated.") -@SeeAlso({ConvertRecord.class, SplitRecord.class}) + + "Lookup RecordPath or if no fields match, then that record will be routed to 'unmatched' (or 'success', depending on the configuration of the 'Routing Strategy' property). " + + "If one or more fields match the Result RecordPath, all fields " + + "that match will be updated. If there is no match in the configured LookupService, then no fields will be updated. I.e., it will not overwrite an existing value in the Record " + + "with a null value. Please note, however, that if the results returned by the LookupService are not accounted for in your schema (specifically, " + + "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.") +@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"}) public class LookupRecord extends AbstractRouteRecord> { private volatile RecordPathCache recordPathCache = new RecordPathCache(25); private volatile LookupService lookupService; + static final AllowableValue ROUTE_TO_SUCCESS = new AllowableValue("route-to-success", "Route to 'success'", + "Records will be routed to a 'success' Relationship regardless of whether or not there is a match in the configured Lookup Service"); + static final AllowableValue ROUTE_TO_MATCHED_UNMATCHED = new AllowableValue("route-to-matched-unmatched", "Route to 'matched' or 'unmatched'", + "Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. " + + "A single input FlowFile may result in two different output FlowFiles."); + static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder() .name("lookup-service") .displayName("Lookup Service") @@ -101,6 +114,16 @@ public class LookupRecord extends AbstractRouteRecord MATCHED_COLLECTION = Collections.singleton(REL_MATCHED); private static final Set UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED); - private static final Set FAILURE_COLLECTION = Collections.singleton(REL_FAILURE); + private static final Set SUCCESS_COLLECTION = Collections.singleton(REL_SUCCESS); + private volatile Set relationships = new HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE})); + private volatile boolean routeToMatchedUnmatched = false; @OnScheduled public void onScheduled(final ProcessContext context) { @@ -122,10 +151,6 @@ public class LookupRecord extends AbstractRouteRecord getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_MATCHED); - relationships.add(REL_UNMATCHED); - relationships.add(REL_FAILURE); return relationships; } @@ -136,9 +161,32 @@ public class LookupRecord extends AbstractRouteRecord matchedUnmatchedRels = new HashSet<>(); + matchedUnmatchedRels.add(REL_MATCHED); + matchedUnmatchedRels.add(REL_UNMATCHED); + matchedUnmatchedRels.add(REL_FAILURE); + this.relationships = matchedUnmatchedRels; + + this.routeToMatchedUnmatched = true; + } else { + final Set successRels = new HashSet<>(); + successRels.add(REL_SUCCESS); + successRels.add(REL_FAILURE); + this.relationships = successRels; + + this.routeToMatchedUnmatched = false; + } + } + } + @Override protected Set route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, final Tuple flowFileContext) { @@ -147,14 +195,17 @@ public class LookupRecord extends AbstractRouteRecord lookupFieldValues = lookupPathResult.getSelectedFields() .filter(fieldVal -> fieldVal.getValue() != null) .collect(Collectors.toList()); + if (lookupFieldValues.isEmpty()) { - getLogger().error("Lookup RecordPath did not match any fields in a record for {}; routing record to failure", new Object[] {flowFile}); - return FAILURE_COLLECTION; + final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + getLogger().debug("Lookup RecordPath did not match any fields in a record for {}; routing record to " + rels, new Object[] {flowFile}); + return rels; } if (lookupFieldValues.size() > 1) { - getLogger().error("Lookup RecordPath matched {} fields in a record for {}; routing record to failure", new Object[] {lookupFieldValues.size(), flowFile}); - return FAILURE_COLLECTION; + final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + getLogger().debug("Lookup RecordPath matched {} fields in a record for {}; routing record to " + rels, new Object[] {lookupFieldValues.size(), flowFile}); + return rels; } final FieldValue fieldValue = lookupFieldValues.get(0); @@ -164,12 +215,12 @@ public class LookupRecord extends AbstractRouteRecord rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + return rels; } // Ensure that the Record has the appropriate schema to account for the newly added values @@ -182,7 +233,8 @@ public class LookupRecord extends AbstractRouteRecord fieldVal.updateValue(replacementValue)); } - return MATCHED_COLLECTION; + final Set rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; + return rels; } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java new file mode 100644 index 0000000000..e69cd72b1c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates one or more RecordPaths against the " + + "each record in the incoming FlowFile. Each record is then grouped with other \"like records\" and a FlowFile is created for each group of \"like records.\" What it means for " + + "two records to be \"like records\" is determined by user-defined properties. The user is required to enter at least one user-defined property whose value is a RecordPath. Two " + + "records are considered alike if they have the same value for all configured RecordPaths. Because we know that all records in a given output FlowFile have the same value for the " + + "fields that are specified by the RecordPath, an attribute is added for each field. See Additional Details on the Usage page for more information and examples.") +@DynamicProperty(name="The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associted RecordPath.", + value="A RecordPath that points to a field in the Record.", + description="Each dynamic property represents a RecordPath that will be evaluated against each record in an incoming FlowFile. When the value of the RecordPath is determined " + + "for a Record, an attribute is added to the outgoing FlowFile. The name of the attribute is the same as the name of this property. The value of the attribute is the same as " + + "the value of the field in the Record that the RecordPath points to. Note that no attribute will be added if the value returned for the RecordPath is null or is not a scalar " + + "value (i.e., the value is an Array, Map, or Record).", + supportsExpressionLanguage=true) +@WritesAttributes({ + @WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"), + @WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate"), + @WritesAttribute(attribute="", + description = "For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information.") +}) +@Tags({"record", "partition", "recordpath", "rpath", "segment", "split", "group", "bin", "organize"}) +@SeeAlso({ConvertRecord.class, SplitRecord.class, UpdateRecord.class, QueryRecord.class}) + +public class PartitionRecord extends AbstractProcessor { + private final RecordPathCache recordPathCache = new RecordPathCache(25); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are successfully partitioned will be routed to this relationship") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_ORIGINAL); + return relationships; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final boolean hasDynamic = validationContext.getProperties().keySet().stream() + .anyMatch(prop -> prop.isDynamic()); + + if (hasDynamic) { + return Collections.emptyList(); + } + + return Collections.singleton(new ValidationResult.Builder() + .subject("User-defined Properties") + .valid(false) + .explanation("At least one RecordPath must be added to this processor by adding a user-defined property") + .build()); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .dynamic(true) + .required(false) + .expressionLanguageSupported(true) + .addValidator(new RecordPathValidator()) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema writeSchema; + try (final InputStream rawIn = session.read(flowFile); + final InputStream in = new BufferedInputStream(rawIn)) { + writeSchema = writerFactory.getSchema(flowFile, in); + } catch (final Exception e) { + getLogger().error("Failed to partition records for {}; will route to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final Map recordPaths; + try { + recordPaths = context.getProperties().keySet().stream() + .filter(prop -> prop.isDynamic()) + .collect(Collectors.toMap( + prop -> prop.getName(), + prop -> getRecordPath(context, prop, flowFile))); + } catch (final Exception e) { + getLogger().error("Failed to compile RecordPath for {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final Map writerMap = new HashMap<>(); + + try (final InputStream in = session.read(flowFile)) { + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + + Record record; + while ((record = reader.nextRecord()) != null) { + final Map> recordMap = new HashMap<>(); + + // Evaluate all of the RecordPath's for this Record + for (final Map.Entry entry : recordPaths.entrySet()) { + final String propName = entry.getKey(); + final RecordPath recordPath = entry.getValue(); + + final Stream fieldValueStream = recordPath.evaluate(record).getSelectedFields(); + final List fieldValues = fieldValueStream + .map(fieldVal -> new ValueWrapper(fieldVal.getValue())) + .collect(Collectors.toList()); + recordMap.put(propName, fieldValues); + } + + final RecordValueMap recordValueMap = new RecordValueMap(recordMap); + + // Get the RecordSetWriter that contains the same values for all RecordPaths - or create one if none exists. + RecordSetWriter writer = writerMap.get(recordValueMap); + if (writer == null) { + final FlowFile childFlowFile = session.create(flowFile); + recordValueMap.setFlowFile(childFlowFile); + + final OutputStream out = session.write(childFlowFile); + + writer = writerFactory.createWriter(getLogger(), writeSchema, childFlowFile, out); + writer.beginRecordSet(); + writerMap.put(recordValueMap, writer); + } + + writer.write(record); + } + + // For each RecordSetWriter, finish the record set and close the writer. + for (final Map.Entry entry : writerMap.entrySet()) { + final RecordValueMap valueMap = entry.getKey(); + final RecordSetWriter writer = entry.getValue(); + + final WriteResult writeResult = writer.finishRecordSet(); + writer.close(); + + final Map attributes = new HashMap<>(); + attributes.putAll(valueMap.getAttributes()); + attributes.putAll(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + + FlowFile childFlowFile = valueMap.getFlowFile(); + childFlowFile = session.putAllAttributes(childFlowFile, attributes); + + session.adjustCounter("Record Processed", writeResult.getRecordCount(), false); + } + + } catch (final Exception e) { + for (final Map.Entry entry : writerMap.entrySet()) { + final RecordValueMap valueMap = entry.getKey(); + final RecordSetWriter writer = entry.getValue(); + + try { + writer.close(); + } catch (final IOException e1) { + getLogger().warn("Failed to close Record Writer for {}; some resources may not be cleaned up appropriately", new Object[] {flowFile, e1}); + } + + session.remove(valueMap.getFlowFile()); + } + + + getLogger().error("Failed to partition {}", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + // Transfer the FlowFiles. We wait until the end to do this, in case any IOException is thrown above, + // because we want to ensure that we are able to remove the child flowfiles in case of a failure. + for (final RecordValueMap valueMap : writerMap.keySet()) { + session.transfer(valueMap.getFlowFile(), REL_SUCCESS); + } + + session.transfer(flowFile, REL_ORIGINAL); + } + + private RecordPath getRecordPath(final ProcessContext context, final PropertyDescriptor prop, final FlowFile flowFile) { + final String pathText = context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue(); + final RecordPath recordPath = recordPathCache.getCompiled(pathText); + return recordPath; + } + + /** + * We have this ValueWrapper class here because we want to use it as part of the key to a Map and we + * want two values that may or may not be arrays. Since calling a.equals(b) returns false when a and b + * are arrays, we need to wrap our values in a class that can handle comparisons appropriately. + */ + static class ValueWrapper { + private final Object value; + + public ValueWrapper(final Object value) { + this.value = value; + } + + public Object get() { + return value; + } + + @Override + public int hashCode() { + if (value == null) { + return 31; + } + + if (value instanceof Object[]) { + return 31 + Arrays.deepHashCode((Object[]) value); + } + + return 31 + value.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ValueWrapper)) { + return false; + } + final ValueWrapper other = (ValueWrapper) obj; + if (value == null && other.value == null) { + return true; + } + if (value == null || other.value == null) { + return false; + } + if (value instanceof Object[] && other.value instanceof Object[]) { + return Arrays.equals((Object[]) value, (Object[]) other.value); + } + return value.equals(other.value); + } + } + + private static class RecordValueMap { + private final Map> values; + private FlowFile flowFile; + + public RecordValueMap(final Map> values) { + this.values = values; + } + + public Map getAttributes() { + final Map attributes = new HashMap<>(); + for (final Map.Entry> entry : values.entrySet()) { + final List values = entry.getValue(); + + // If there are no values or there are multiple values, don't create an attribute. + if (values.size() != 1) { + continue; + } + + // If value is null, don't create an attribute + final Object value = values.get(0).get(); + if (value == null) { + continue; + } + + // If value is not scalar, don't create an attribute + if (value instanceof Object[] || value instanceof Map || value instanceof Record) { + continue; + } + + // There exists a single value that is scalar. Create attribute using the property name as the attribute name + final String attributeValue = DataTypeUtils.toString(value, (String) null); + attributes.put(entry.getKey(), attributeValue); + } + + return attributes; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public void setFlowFile(final FlowFile flowFile) { + this.flowFile = flowFile; + } + + @Override + public int hashCode() { + return 41 + 37 * values.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof RecordValueMap)) { + return false; + } + final RecordValueMap other = (RecordValueMap) obj; + return values.equals(other.values); + } + + @Override + public String toString() { + return "RecordMapValue[" + values + "]"; + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java index 6463374400..853e88d2bb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -27,7 +27,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -47,7 +46,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 7ed3736b18..1034384e44 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -66,6 +66,7 @@ org.apache.nifi.processors.standard.MonitorActivity org.apache.nifi.processors.standard.Notify org.apache.nifi.processors.standard.ParseCEF org.apache.nifi.processors.standard.ParseSyslog +org.apache.nifi.processors.standard.PartitionRecord org.apache.nifi.processors.standard.PostHTTP org.apache.nifi.processors.standard.PutDatabaseRecord org.apache.nifi.processors.standard.PutDistributedMapCache diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html new file mode 100644 index 0000000000..637ac86642 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html @@ -0,0 +1,190 @@ + + + + + + PartitionRecord + + + + + +

+ PartitionRecord allows the user to separate out records in a FlowFile such that each outgoing FlowFile + consists only of records that are "alike." To define what it means for two records to be alike, the Processor + makes use of NiFi's RecordPath DSL. +

+ +

+ In order to make the Processor valid, at least one user-defined property must be added to the Processor. + The value of the property must be a valid RecordPath. Expression Language is supported and will be evaluated before + attempting to compile the RecordPath. However, if Expression Language is used, the Processor is not able to validate + the RecordPath before-hand and may result in having FlowFiles fail processing if the RecordPath is not valid when being + used. +

+ +

+ Once one or more RecordPath's have been added, those RecordPath's are evaluated against each Record in an incoming FlowFile. + In order for Record A and Record B to be considered "like records," both of them must have the same value for all RecordPath's + that are configured. Only the values that are returned by the RecordPath are held in Java's heap. The records themselves are written + immediately to the FlowFile content. This means that for most cases, heap usage is not a concern. However, if the RecordPath points + to a large Record field that is different for each record in a FlowFile, then heap usage may be an important consideration. In such + cases, SplitRecord may be useful to split a large FlowFile into smaller FlowFiles before partitioning. +

+ +

+ Once a FlowFile has been written, we know that all of the Records within that FlowFile have the same value for the fields that are + described by the configured RecordPath's. As a result, this means that we can promote those values to FlowFile Attributes. We do so + by looking at the name of the property to which each RecordPath belongs. For example, if we have a property named country + with a value of /geo/country/name, then each outbound FlowFile will have an attribute named country with the + value of the /geo/country/name field. The addition of these attributes makes it very easy to perform tasks such as routing, + or referencing the value in another Processor that can be used for configuring where to send the data, etc. + However, for any RecordPath whose value is not a scalar value (i.e., the value is of type Array, Map, or Record), no attribute will be added. +

+ + + +

Examples

+ +

+ To better understand how this Processor works, we will lay out a few examples. For the sake of these examples, let's assume that our input + data is JSON formatted and looks like this: +

+ + +
+[ {
+  "name": "John Doe",
+  "dob": "11/30/1976",
+  "favorites": [ "spaghetti", "basketball", "blue" ],
+  "locations": {
+  	"home": {
+  		"number": 123,
+  		"street": "My Street",
+  		"city": "New York",
+  		"state": "NY",
+  		"country": "US"
+  	},
+  	"work": {
+  		"number": 321,
+  		"street": "Your Street",
+  		"city": "New York",
+  		"state": "NY",
+  		"country": "US"
+  	}
+  }
+}, {
+  "name": "Jane Doe",
+  "dob": "10/04/1979",
+  "favorites": [ "spaghetti", "football", "red" ],
+  "locations": {
+  	"home": {
+  		"number": 123,
+  		"street": "My Street",
+  		"city": "New York",
+  		"state": "NY",
+  		"country": "US"
+  	},
+  	"work": {
+  		"number": 456,
+  		"street": "Our Street",
+  		"city": "New York",
+  		"state": "NY",
+  		"country": "US"
+  	}
+  }
+}, {
+  "name": "Jacob Doe",
+  "dob": "04/02/2012",
+  "favorites": [ "chocolate", "running", "yellow" ],
+  "locations": {
+  	"home": {
+  		"number": 123,
+  		"street": "My Street",
+  		"city": "New York",
+  		"state": "NY",
+  		"country": "US"
+  	},
+  	"work": null
+  }
+}, {
+  "name": "Janet Doe",
+  "dob": "02/14/2007",
+  "favorites": [ "spaghetti", "reading", "white" ],
+  "locations": {
+  	"home": {
+  		"number": 1111,
+  		"street": "Far Away",
+  		"city": "San Francisco",
+  		"state": "CA",
+  		"country": "US"
+  	},
+  	"work": null
+  }
+}]
+
+
+ + +

Example 1 - Partition By Simple Field

+ +

+ For a simple case, let's partition all of the records based on the state that they live in. + We can add a property named state with a value of /locations/home/state. + The result will be that we will have two outbound FlowFiles. The first will contain an attribute with the name + state and a value of NY. This FlowFile will consist of 3 records: John Doe, Jane Doe, and Jacob Doe. + The second FlowFile will consist of a single record for Janet Doe and will contain an attribute named state that + has a value of CA. +

+ + +

Example 2 - Partition By Nullable Value

+ +

+ In the above example, there are three different values for the work location. If we use a RecordPath of /locations/work/state + with a property name of state, then we will end up with two different FlowFiles. The first will contain records for John Doe and Jane Doe + because they have the same value for the given RecordPath. This FlowFile will have an attribute named state with a value of NY. +

+

+ The second FlowFile will contain the two records for Jacob Doe and Janet Doe, because the RecordPath will evaluate + to null for both of them. This FlowFile will have no state attribute (unless such an attribute existed on the incoming FlowFile, + in which case its value will be unaltered). +

+ + +

Example 3 - Partition By Multiple Values

+ +

+ Now let's say that we want to partition records based on multiple different fields. We now add two properties to the PartitionRecord processor. + The first property is named home and has a value of /locations/home. The second property is named favorite.food + and has a value of /favorites[0] to reference the first element in the "favorites" array. +

+ +

+ This will result in three different FlowFiles being created. The first FlowFile will contain records for John Doe and Jane Doe. If will contain an attribute + named "favorite.food" with a value of "spaghetti." However, because the second RecordPath pointed to a Record field, no "home" attribute will be added. + In this case, both of these records have the same value for both the first element of the "favorites" array + and the same value for the home address. Janet Doe has the same value for the first element in the "favorites" array but has a different home address. Similarly, + Jacob Doe has the same home address but a different value for the favorite food. +

+ +

+ The second FlowFile will consist of a single record: Jacob Doe. This FlowFile will have an attribute named "favorite.food" with a value of "chocolate." + The third FlowFile will consist of a single record: Janet Doe. This FlowFile will have an attribute named "favorite.food" with a value of "spaghetti." +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index 5f2cd6c8b4..d19ee43ce3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -59,6 +59,7 @@ public class TestLookupRecord { runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name"); runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); recordReader.addSchemaField("name", RecordFieldType.STRING); recordReader.addSchemaField("age", RecordFieldType.INT); @@ -149,8 +150,8 @@ public class TestLookupRecord { runner.enqueue(""); runner.run(); - runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0); out.assertAttributeEquals("record.count", "3"); out.assertAttributeEquals("mime.type", "text/plain"); @@ -201,8 +202,8 @@ public class TestLookupRecord { runner.enqueue(""); runner.run(); - runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0); out.assertAttributeEquals("record.count", "3"); out.assertAttributeEquals("mime.type", "text/plain"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java new file mode 100644 index 0000000000..99e63705a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestPartitionRecord { + + private TestRunner runner; + private MockRecordParser readerService; + private MockRecordWriter writerService; + + @Before + public void setup() throws InitializationException { + readerService = new MockRecordParser(); + writerService = new MockRecordWriter(null, false); + + runner = TestRunners.newTestRunner(PartitionRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(PartitionRecord.RECORD_READER, "reader"); + runner.setProperty(PartitionRecord.RECORD_WRITER, "writer"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + readerService.addSchemaField("sports", RecordFieldType.ARRAY); + } + + @Test + public void groupByStringMixedNumberOfRecords() { + runner.setProperty("person-name", "/name"); + + readerService.addRecord("John", 28, null); + readerService.addRecord("Jake", 49, null); + readerService.addRecord("Mark", 19, null); + readerService.addRecord("Jane", 20, null); + readerService.addRecord("Jake", 14, null); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0); + runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 4); + + final List out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS); + + assertEquals(3L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("1")).count()); + assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).count()); + + out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).forEach(ff -> ff.assertContentEquals("Jake,49,\nJake,14,\n")); + + for (final String name : new String[] {"John", "Jake", "Mark", "Jane"}) { + assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("person-name").equals(name)).count()); + } + } + + @Test + public void testGroupByIntAllRecordsTogether() { + runner.setProperty("age", "/age"); + + readerService.addRecord("John", 30, null); + readerService.addRecord("Jake", 30, null); + readerService.addRecord("Mark", 30, null); + readerService.addRecord("Jane", 30, null); + readerService.addRecord("Jake", 30, null); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0); + runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS).get(0); + out.assertAttributeEquals("record.count", "5"); + out.assertContentEquals("John,30,\nJake,30,\nMark,30,\nJane,30,\nJake,30,\n"); + out.assertAttributeEquals("age", "30"); + } + + + @Test + public void testGroupByMultipleFields() { + runner.setProperty("age", "/age"); + runner.setProperty("name", "/name"); + + readerService.addRecord("John", 30, null); + readerService.addRecord("Jane", 30, null); + readerService.addRecord("John", 30, null); + readerService.addRecord("John", 31, null); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0); + runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 3); + + final List out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS); + assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,30,\nJohn,30,\n") && mff.isAttributeEqual("age", "30") && mff.isAttributeEqual("name", "John")).count()); + assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("Jane,30,\n") && mff.isAttributeEqual("age", "30") && mff.isAttributeEqual("name", "Jane")).count()); + assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,31,\n") && mff.isAttributeEqual("age", "31") && mff.isAttributeEqual("name", "John")).count()); + } + + + @Test + public void testGroupByArrayField() { + runner.setProperty("sports", "/sports"); + + readerService.addRecord("John", 30, new String[] {"baseball"}); + readerService.addRecord("Jane", 30, new String[] {"baseball"}); + readerService.addRecord("John", 30, new String[] {"basketball"}); + readerService.addRecord("John", 31, new String[] {"football"}); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1); + runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0); + runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 3); + + final List out = runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS); + assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,30,[baseball]\nJane,30,[baseball]\n")).count()); + assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,30,[basketball]\n")).count()); + assertEquals(1L, out.stream().filter(mff -> mff.isContentEqual("John,31,[football]\n")).count()); + + // There should be no sports attribute because it's not a scalar value + assertTrue(out.stream().noneMatch(mff -> mff.getAttributes().containsKey("sports"))); + } + + @Test + public void testReadFailure() throws IOException { + runner.setProperty("sports", "/sports"); + readerService.failAfter(2); + + readerService.addRecord("John", 30, new String[] {"baseball"}); + readerService.addRecord("Jane", 30, new String[] {"baseball"}); + readerService.addRecord("John", 30, new String[] {"basketball"}); + readerService.addRecord("John", 31, new String[] {"football"}); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PartitionRecord.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(PartitionRecord.REL_FAILURE).get(0).assertContentEquals(new byte[0]); + } + + + @Test + public void testValueWrapperEqualityWithArrays() { + final Object a = new String[] {"baseball"}; + final Object b = new String[] {"baseball"}; + + assertEquals(new PartitionRecord.ValueWrapper(a), new PartitionRecord.ValueWrapper(b)); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java index 2796ff586d..00258b63c8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java @@ -29,7 +29,7 @@ public interface LookupService extends ControllerService { * @param key the key to lookup * @return a value that corresponds to the given key * - * @throws if unable to lookup a value for the given key + * @throws LookupFailureException if unable to lookup a value for the given key */ Optional lookup(String key) throws LookupFailureException; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java index 57bc8e0bf2..fee28847e6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java @@ -29,7 +29,7 @@ public interface RecordLookupService extends LookupService { * @param key the key to lookup * @return an Optional Record that corresponds to the given key * - * @throws if unable to lookup a value for the given key + * @throws LookupFailureException if unable to lookup a value for the given key */ @Override Optional lookup(String key) throws LookupFailureException; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java index 8f5199e9ee..be7d7c86ac 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java @@ -26,8 +26,6 @@ public interface StringLookupService extends LookupService { * * @param key the key to lookup * @return an Optional String that represents the value for the given key - * - * @throws if unable to lookup a value for the given key */ @Override Optional lookup(String key); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html index 8f64510706..af29da566e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html @@ -25,35 +25,40 @@

The IPLookupService is powered by a MaxMind database and can return several different types of enrichment information about a given IP address. Below is the schema of the Record that is returned by this service (in Avro Schema format). + The schema is for a single record that consists of several fields: geo, isp, + domainName, connectionType, and anonymousIp. Each of these fields is nullable + and will be populated only if the IP address that is searched for has the relevant information in the MaxMind database + and if the Controller Service is configured to return such information. Because each of the fields requires a separate + lookup in the database, it is advisable to retrieve only those fields that are of value.

- +
 {
-  "name": "ipEnrichment",
+  "name": "enrichmentRecord",
   "namespace": "nifi",
   "type": "record",
   "fields": [
     {
       "name": "geo",
-      "type": {
+      "type": ["null", {
         "name": "cityGeo",
         "type": "record",
         "fields": [
-          { "name": "city", "type": "string" },
-          { "name": "accuracy", "type": "int", "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
-          { "name": "metroCode", "type": "int" },
-          { "name": "timeZone", "type": "string" },
-          { "name": "latitude", "type": "double" },
-          { "name": "longitude", "type": "double" },
-          { "name": "country", "type": {
+          { "name": "city", "type": ["null", "string"] },
+          { "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
+          { "name": "metroCode", "type": ["null", "int"] },
+          { "name": "timeZone", "type": ["null", "string"] },
+          { "name": "latitude", "type": ["null", "double"] },
+          { "name": "longitude", "type": ["null", "double"] },
+          { "name": "country", "type": ["null", {
             "type": "record",
             "name": "country",
             "fields": [
               { "name": "name", "type": "string" },
               { "name": "isoCode", "type": "string" }
             ]
-          } },
+          }] },
             { "name": "subdivisions", "type": {
               "type": "array",
               "items": {
@@ -66,37 +71,109 @@
               }
             }
           },
-          { "name": "continent", "type": "string" },
-          { "name": "postalCode", "type": "string" }
+          { "name": "continent", "type": ["null", "string"] },
+          { "name": "postalCode", "type": ["null", "string"] }
         ]
-      }
+      }]
     },
     {
       "name": "isp",
-      "type": {
-          "name": "ispEnrich",
+      "type": ["null", {
+        "name": "ispEnrich",
         "type": "record",
         "fields": [
-          { "name": "name", "type": "string" },
-          { "name": "organization", "type": "string" },
-          { "name": "asn", "type": "int" },
-          { "name": "asnOrganization", "type": "string" }
+          { "name": "name", "type": ["null", "string"] },
+          { "name": "organization", "type": ["null", "string"] },
+          { "name": "asn", "type": ["null", "int"] },
+          { "name": "asnOrganization", "type": ["null", "string"] }
         ]
-    }
+      }]
     },
     {
       "name": "domainName",
-      "type": "string"
+      "type": ["null", "string"]
     },
     {
       "name": "connectionType",
-      "type": "string",
+      "type": ["null", "string"],
       "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
+    },
+    {
+      "name": "anonymousIp",
+      "type": ["null", {
+        "name": "anonymousIpType",
+        "type": "record",
+        "fields": [
+          { "name": "anonymous", "type": "boolean" },
+          { "name": "anonymousVpn", "type": "boolean" },
+          { "name": "hostingProvider", "type": "boolean" },
+          { "name": "publicProxy", "type": "boolean" },
+          { "name": "torExitNode", "type": "boolean" }
+        ]
+      }]
     }
   ]
 }
 
- + +

+ While this schema is fairly complex, it is a single record with 5 fields. This makes it quite easy to update + an existing schema to allow for this record, by adding a new field to an existing schema and pasting in the schema + above as the type. +

+ +

+ For example, suppose that we have an existing schema that is as simple as: +

+ +
+
+
+{
+  "name": "ipRecord",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [
+    { "name": "ip", "type": "string" }
+  ]
+}
+
+
+
+ +

+ Now, let's suppose that we want to add a new field named enrichment to the above schema. + Further, let's say that we want the new enrichment field to be nullable. + We can do so by copying and pasting our enrichment schema from above thus: +

+ +
+
+
+{
+  "name": "ipRecord",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [
+    { "name": "ip", "type": "string" },
+    { "name": "enrichment", "type": ["null",
+
+
+      <Paste Enrichment Schema Here>
+
+
+    ]
+    }
+  ]
+}
+
+
+
+ + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index a3f9cb87ef..c1f000b418 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -30,7 +30,6 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; -import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; @@ -68,10 +67,10 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { } @Override - public WriteResult write(final Record record) throws IOException { + public Map writeRecord(final Record record) throws IOException { final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema); datumWriter.write(rec, encoder); - return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema)); + return schemaAccessWriter.getAttributes(recordSchema); } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index 9bfb4cff43..dd151180f1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -20,13 +20,13 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.OutputStream; import java.util.Collections; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.nifi.serialization.AbstractRecordSetWriter; -import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { @@ -49,10 +49,10 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { } @Override - public WriteResult write(final Record record) throws IOException { + public Map writeRecord(final Record record) throws IOException { final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); dataFileWriter.append(rec); - return WriteResult.of(1, Collections.emptyMap()); + return Collections.emptyMap(); } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index 8475a50eca..f8998f92e4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -27,7 +27,6 @@ import org.apache.commons.csv.CSVPrinter; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -90,14 +89,14 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet } @Override - public WriteResult write(final Record record) throws IOException { + public Map writeRecord(final Record record) throws IOException { int i = 0; for (final RecordField recordField : recordSchema.getFields()) { fieldValues[i++] = record.getAsString(recordField, getFormat(recordField)); } printer.printRecord(fieldValues); - return WriteResult.of(1, schemaWriter.getAttributes(recordSchema)); + return schemaWriter.getAttributes(recordSchema); } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index b73ecabca3..a41412f0dc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -28,7 +28,6 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -97,9 +96,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe } @Override - public WriteResult write(final Record record) throws IOException { + public Map writeRecord(final Record record) throws IOException { writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); - return WriteResult.of(1, schemaAccess.getAttributes(recordSchema)); + return schemaAccess.getAttributes(recordSchema); } private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java index 95f2a73717..7012504ca8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java @@ -29,7 +29,6 @@ import java.util.Map; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; @@ -60,9 +59,9 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor } @Override - public WriteResult write(final Record record) throws IOException { + public Map writeRecord(final Record record) throws IOException { write(record, out, getColumnNames(record.getSchema())); - return WriteResult.of(1, Collections.emptyMap()); + return Collections.emptyMap(); } private void write(final Record record, final OutputStream out, final List columnNames) throws IOException {