diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java new file mode 100644 index 0000000000..4859d41c9d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.Collections; +import java.util.Arrays; +import java.util.ArrayList; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + + "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + + "If the attribute value contains a comma, newline or double quote, then the attribute value will be " + + "escaped with double quotes. Any double quote characters in the attribute value are escaped with " + + "another double quote.") +@WritesAttributes({ + @WritesAttribute(attribute = "CSVSchema", description = "CSV representation of the Schema"), + @WritesAttribute(attribute = "CSVData", description = "CSV representation of Attributes") +}) + +public class AttributesToCSV extends AbstractProcessor { + private static final String DATA_ATTRIBUTE_NAME = "CSVData"; + private static final String SCHEMA_ATTRIBUTE_NAME = "CSVSchema"; + private static final String OUTPUT_SEPARATOR = ","; + private static final String OUTPUT_MIME_TYPE = "text/csv"; + private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"; + + static final AllowableValue OUTPUT_OVERWRITE_CONTENT = new AllowableValue("flowfile-content", "flowfile-content", "The resulting CSV string will be placed into the content of the flowfile." + + "Existing flowfile context will be overwritten. 'CSVData' will not be written to at all (neither null nor empty string)."); + static final AllowableValue OUTPUT_NEW_ATTRIBUTE= new AllowableValue("flowfile-attribute", "flowfile-attribute", "The resulting CSV string will be placed into a new flowfile" + + " attribute named 'CSVData'. The content of the flowfile will not be changed."); + + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() + .name("attribute-list") + .displayName("Attribute List") + .description("Comma separated list of attributes to be included in the resulting CSV. If this value " + + "is left empty then all existing Attributes will be included. This list of attributes is " + + "case sensitive and supports attribute names that contain commas. If an attribute specified in the list is not found it will be emitted " + + "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " + + "If a core attribute is specified in this list " + + "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " + + "ALWAYS wins.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder() + .name("attributes-regex") + .displayName("Attributes Regular Expression") + .description("Regular expression that will be evaluated against the flow file attributes to select " + + "the matching attributes. This property can be used in combination with the attributes " + + "list property. The final output will contain a combination of matches found in the ATTRIBUTE_LIST and ATTRIBUTE_REGEX.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("destination") + .displayName("Destination") + .description("Control if CSV value is written as a new flowfile attribute 'CSVData' " + + "or written in the flowfile content.") + .required(true) + .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT) + .defaultValue(OUTPUT_NEW_ATTRIBUTE.getDisplayName()) + .build(); + + public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("include-core-attributes") + .displayName("Include Core Attributes") + .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " + + "contained in every FlowFile, should be included in the final CSV value generated. Core attributes " + + "will be added to the end of the CSVData and CSVSchema strings. The Attribute List property " + + "overrides this setting.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder() + .name("null-value") + .displayName("Null Value") + .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " + + "string will be placed in the CSV") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + public static final PropertyDescriptor INCLUDE_SCHEMA = new PropertyDescriptor.Builder() + .name("include-schema") + .displayName("Include Schema") + .description("If true the schema (attribute names) will also be converted to a CSV string which will either be " + + "applied to a new attribute named 'CSVSchema' or applied at the first row in the " + + "content depending on the DESTINATION property setting.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully converted attributes to CSV").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to convert attributes to CSV").build(); + + private List properties; + private Set relationships; + private volatile Boolean includeCoreAttributes; + private volatile Set coreAttributes; + private volatile boolean destinationContent; + private volatile boolean nullValForEmptyString; + private volatile Pattern pattern; + private volatile Boolean includeSchema; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(ATTRIBUTES_LIST); + properties.add(ATTRIBUTES_REGEX); + properties.add(DESTINATION); + properties.add(INCLUDE_CORE_ATTRIBUTES); + properties.add(NULL_VALUE_FOR_EMPTY_STRING); + properties.add(INCLUDE_SCHEMA); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + private Map buildAttributesMapForFlowFile(FlowFile ff, Set attributes, Pattern attPattern) { + Map result; + Map ffAttributes = ff.getAttributes(); + result = new LinkedHashMap<>(ffAttributes.size()); + + if (!attributes.isEmpty() || attPattern != null) { + if (!attributes.isEmpty()) { + //the user gave a list of attributes + for (String attribute : attributes) { + String val = ff.getAttribute(attribute); + if (val != null && !val.isEmpty()) { + result.put(attribute, val); + } else { + if (nullValForEmptyString) { + result.put(attribute, "null"); + } else { + result.put(attribute, ""); + } + } + } + } + if(attPattern != null) { + for (Map.Entry e : ff.getAttributes().entrySet()) { + if(attPattern.matcher(e.getKey()).matches()) { + result.put(e.getKey(), e.getValue()); + } + } + } + } else { + //the user did not give a list of attributes, take all the attributes from the flowfile + result.putAll(ffAttributes); + } + + //now glue on the core attributes if the user wants them. + if(includeCoreAttributes) { + for (String coreAttribute : coreAttributes) { + //make sure this coreAttribute is applicable to this flowfile. + String val = ff.getAttribute(coreAttribute); + if(ffAttributes.containsKey(coreAttribute)) { + if (!StringUtils.isEmpty(val)){ + result.put(coreAttribute, val); + } else { + if (nullValForEmptyString) { + result.put(coreAttribute, "null"); + } else { + result.put(coreAttribute, ""); + } + } + } + } + } else { + //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins + for (String coreAttribute : coreAttributes) { + //never override user specified attributes, even if the user has selected to exclude core attributes + if(!attributes.contains(coreAttribute)) { + result.remove(coreAttribute); + } + } + } + return result; + } + + private LinkedHashSet attributeListStringToSet(String attributeList) { + //take the user specified attribute list string and convert to list of strings. + LinkedHashSet result = new LinkedHashSet<>(); + if (StringUtils.isNotBlank(attributeList)) { + String[] ats = attributeList.split(SPLIT_REGEX); + for (String str : ats) { + result.add(StringEscapeUtils.unescapeCsv(str.trim())); + } + } + return result; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + includeCoreAttributes = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(); + coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); + destinationContent = OUTPUT_OVERWRITE_CONTENT.getValue().equals(context.getProperty(DESTINATION).getValue()); + nullValForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); + includeSchema = context.getProperty(INCLUDE_SCHEMA).asBoolean(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + if(context.getProperty(ATTRIBUTES_REGEX).isSet()) { + pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions(original).getValue()); + } + + final Set attributeList = attributeListStringToSet(context.getProperty(ATTRIBUTES_LIST).evaluateAttributeExpressions(original).getValue()); + final Map atrList = buildAttributesMapForFlowFile(original, attributeList, pattern); + + //escape attribute values + int index = 0; + final int atrListSize = atrList.values().size() -1; + final StringBuilder sbValues = new StringBuilder(); + for (final Map.Entry attr : atrList.entrySet()) { + sbValues.append(StringEscapeUtils.escapeCsv(attr.getValue())); + sbValues.append(index++ < atrListSize ? OUTPUT_SEPARATOR : ""); + } + + //build the csv header if needed + final StringBuilder sbNames = new StringBuilder(); + if(includeSchema){ + index = 0; + for (final Map.Entry attr : atrList.entrySet()) { + sbNames.append(StringEscapeUtils.escapeCsv(attr.getKey())); + sbNames.append(index++ < atrListSize ? OUTPUT_SEPARATOR : ""); + } + } + + try { + if (destinationContent) { + FlowFile conFlowfile = session.write(original, (in, out) -> { + if(includeSchema){ + sbNames.append(System.getProperty("line.separator")); + out.write(sbNames.toString().getBytes()); + } + out.write(sbValues.toString().getBytes()); + }); + conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), OUTPUT_MIME_TYPE); + session.transfer(conFlowfile, REL_SUCCESS); + } else { + FlowFile atFlowfile = session.putAttribute(original, DATA_ATTRIBUTE_NAME , sbValues.toString()); + if(includeSchema){ + session.putAttribute(original, SCHEMA_ATTRIBUTE_NAME , sbNames.toString()); + } + session.transfer(atFlowfile, REL_SUCCESS); + } + } catch (Exception e) { + getLogger().error(e.getMessage()); + session.transfer(original, REL_FAILURE); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a15ccf5436..8195963001 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.standard.AttributesToJSON +org.apache.nifi.processors.standard.AttributesToCSV org.apache.nifi.processors.standard.Base64EncodeContent org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java new file mode 100644 index 0000000000..457cbc790b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java @@ -0,0 +1,821 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.google.common.base.Splitter; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestAttributesToCSV { + + private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; + private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; + private static final String OUTPUT_ATTRIBUTE_NAME = "CSVData"; + private static final String OUTPUT_SEPARATOR = ","; + private static final String OUTPUT_MIME_TYPE = "text/csv"; + private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"; + private static final String newline = System.getProperty("line.separator"); + + @Test + public void testAttrListNoCoreNullOffNewAttrToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData",""); + } + + @Test + public void testAttrListNoCoreNullOffNewAttrToContent() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + //set the destination of the csv string to be an attribute + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + //use only one attribute, which does not exists, as the list of attributes to convert to csv + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData",""); + } + + @Test + public void testAttrListNoCoreNullOffTwoNewAttrToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData",","); + } + + @Test + public void testAttrListNoCoreNullTwoNewAttrToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true"); + + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData","null,null"); + } + + @Test + public void testNoAttrListNoCoreNullOffToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + //set the destination of the csv string to be an attribute + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData",""); + } + + @Test + public void testNoAttrListNoCoreNullToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true"); + testRunner.enqueue(new byte[0]); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData",""); + } + + + @Test + public void testNoAttrListCoreNullOffToContent() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + final Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertEquals(OUTPUT_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final byte[] contentData = testRunner.getContentAsByteArray(flowFile); + + final String contentDataString = new String(contentData, "UTF-8"); + + Set contentValues = new HashSet<>(getStrings(contentDataString)); + + assertEquals(6, contentValues.size()); + + assertTrue(contentValues.contains("Malibu Beach")); + assertTrue(contentValues.contains("\"California, US\"")); + assertTrue(contentValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + assertTrue(contentValues.contains(flowFile.getAttribute("filename"))); + assertTrue(contentValues.contains(flowFile.getAttribute("path"))); + assertTrue(contentValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testNoAttrListCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set csvAttributeValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, csvAttributeValues.size()); + + assertTrue(csvAttributeValues.contains("Malibu Beach")); + assertTrue(csvAttributeValues.contains("\"California, US\"")); + assertTrue(csvAttributeValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(csvAttributeValues.contains(flowFile.getAttribute("filename"))); + assertTrue(csvAttributeValues.contains(flowFile.getAttribute("path"))); + assertTrue(csvAttributeValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testNoAttrListNoCoreNullOffToContent() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertEquals(OUTPUT_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final byte[] contentData = testRunner.getContentAsByteArray(flowFile); + + final String contentDataString = new String(contentData, "UTF-8"); + Set contentValues = new HashSet<>(getStrings(contentDataString)); + + assertEquals(3, contentValues.size()); + + assertTrue(contentValues.contains("Malibu Beach")); + assertTrue(contentValues.contains("\"California, US\"")); + assertTrue(contentValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + } + + + @Test + public void testAttrListNoCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVAttribute!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(3, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + } + + @Test + public void testAttrListCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testAttrListNoCoreNullOffOverrideCoreByAttrListToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement,uuid"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(4, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testAttrListFromExpCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "${myAttribs}"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + put("myAttribs", "beach-name,beach-location,beach-endorsement"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + //Test flow file 0 with ATTRIBUTE_LIST populated from expression language + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid"))); + + //Test flow file 1 with ATTRIBUTE_LIST populated from expression language containing commas (output should be he same) + flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid"))); + + } + + @Test + public void testAttrListWithCommasInNameFromExpCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "${myAttribs}"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + + Map attrsCommaInName = new HashMap(){{ + put("beach,name", "Malibu Beach"); + put("beach,location", "California, US"); + put("beach,endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + put("myAttribs", "\"beach,name\",\"beach,location\",\"beach,endorsement\""); + }}; + + testRunner.enqueue(new byte[0], attrsCommaInName); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + //Test flow file 0 with ATTRIBUTE_LIST populated from expression language + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid"))); + + //Test flow file 1 with ATTRIBUTE_LIST populated from expression language containing commas (output should be he same) + flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(CSVDataValues.contains(flowFile.getAttribute("uuid"))); + + } + + + @Test + public void testAttrListFromExpNoCoreNullOffOverrideCoreByAttrListToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "${myAttribs}"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + put("myAttribs", "beach-name,beach-location,beach-endorsement"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(3, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testAttributesRegex() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "${myRegEx}"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + put("myRegEx", "beach-.*"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(3, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testAttributesRegexAndList() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "${myRegEx}"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "moreInfo1,moreInfo2"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + put("myRegEx", "beach-.*"); + put("moreInfo1", "A+ Rating"); + put("moreInfo2", "Avg Temp: 61f"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set CSVDataValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(5, CSVDataValues.size()); + + assertTrue(CSVDataValues.contains("Malibu Beach")); + assertTrue(CSVDataValues.contains("\"California, US\"")); + assertTrue(CSVDataValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + assertTrue(CSVDataValues.contains("A+ Rating")); + assertTrue(CSVDataValues.contains("Avg Temp: 61f")); + + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("filename"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("path"))); + assertTrue(!CSVDataValues.contains(flowFile.getAttribute("uuid"))); + } + + + @Test + public void testSchemaToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVData"); + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVSchema"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVData","Malibu Beach,\"California, US\""); + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVSchema","beach-name,beach-location"); + } + + @Test + public void testSchemaToContent() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + //set the destination of the csv string to be an attribute + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0); + flowFile.assertAttributeNotExists("CSVData"); + flowFile.assertAttributeNotExists("CSVSchema"); + + final byte[] contentData = testRunner.getContentAsByteArray(flowFile); + + final String contentDataString = new String(contentData, "UTF-8"); + assertEquals(contentDataString.split(newline)[0], "beach-name,beach-location"); + assertEquals(contentDataString.split(newline)[1], "Malibu Beach,\"California, US\""); + } + + + @Test + public void testSchemaWithCoreAttribuesToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0); + flowFile.assertAttributeExists("CSVData"); + flowFile.assertAttributeExists("CSVSchema"); + + final String path = flowFile.getAttribute("path"); + final String filename = flowFile.getAttribute("filename"); + final String uuid = flowFile.getAttribute("uuid"); + + flowFile.assertAttributeEquals("CSVData", "Malibu Beach,\"California, US\"," + path + "," + filename + "," + uuid); + flowFile.assertAttributeEquals("CSVSchema","beach-name,beach-location,path,filename,uuid"); + } + + @Test + public void testSchemaWithCoreAttribuesToContent() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + //set the destination of the csv string to be an attribute + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + testRunner.setProperty(AttributesToCSV.INCLUDE_SCHEMA, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_REGEX, "beach-.*"); + + Map attrs = new HashMap(){{ + put("beach-name", "Malibu Beach"); + put("beach-location", "California, US"); + put("attribute-should-be-eliminated", "This should not be in CSVData!"); + }}; + + testRunner.enqueue(new byte[0], attrs); + testRunner.run(); + + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0); + flowFile.assertAttributeNotExists("CSVData"); + flowFile.assertAttributeNotExists("CSVSchema"); + + final String path = flowFile.getAttribute("path"); + final String filename = flowFile.getAttribute("filename"); + final String uuid = flowFile.getAttribute("uuid"); + + final byte[] contentData = testRunner.getContentAsByteArray(flowFile); + + final String contentDataString = new String(contentData, "UTF-8"); + assertEquals(contentDataString.split(newline)[0], "beach-name,beach-location,path,filename,uuid"); + assertEquals(contentDataString.split(newline)[1], "Malibu Beach,\"California, US\"," + path + "," + filename + "," + uuid); + } + private List getStrings(String sdata) { + return Splitter.on(Pattern.compile(SPLIT_REGEX)).splitToList(sdata); + } + +}