From 9c31e45d3f94bf1e73c87379f315f6559e6a23f4 Mon Sep 17 00:00:00 2001 From: Joe Trite Date: Mon, 13 Mar 2017 07:55:19 -0400 Subject: [PATCH] NIFI-1705 Adding AttributesToCSV processor --- .../processors/standard/AttributesToCSV.java | 280 +++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestAttributesToCSV.java | 448 ++++++++++++++++++ 3 files changed, 729 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java new file mode 100644 index 0000000000..2186109bb7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToCSV.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + + +import java.io.BufferedOutputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.Collections; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"csv", "attributes", "flowfile"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Generates a CSV representation of the input FlowFile Attributes. The resulting CSV " + + "can be written to either a newly generated attribute named 'CSVAttributes' or written to the FlowFile as content. " + + "If the attribute value contains a comma, newline or double quote, then the attribute value will be " + + "escaped with double quotes. Any double quote characters in the attribute value are escaped with " + + "another double quote. If the attribute value does not contain a comma, newline or double quote, then the " + + "attribute value is returned unchanged.") +@WritesAttribute(attribute = "CSVAttributes", description = "CSV representation of Attributes") +public class AttributesToCSV extends AbstractProcessor { + + private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; + private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; + private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; + private static final String OUTPUT_SEPARATOR = ","; + private static final String OUTPUT_MIME_TYPE = "text/csv"; + + + public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder() + .name("attribute-list") + .displayName("Attribute List") + .description("Comma separated list of attributes to be included in the resulting CSV. If this value " + + "is left empty then all existing Attributes will be included. This list of attributes is " + + "case sensitive. If an attribute specified in the list is not found it will be emitted " + + "to the resulting CSV with an empty string or null depending on the 'Null Value' property. " + + "If a core attribute is specified in this list " + + "and the 'Include Core Attributes' property is false, the core attribute will be included. The attribute list " + + "ALWAYS wins.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("destination") + .displayName("Destination") + .description("Control if CSV value is written as a new flowfile attribute 'CSVAttributes' " + + "or written in the flowfile content. Writing to flowfile content will overwrite any " + + "existing flowfile content.") + .required(true) + .allowableValues(OUTPUT_NEW_ATTRIBUTE, OUTPUT_OVERWRITE_CONTENT) + .defaultValue(OUTPUT_NEW_ATTRIBUTE) + .build(); + + public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("include-core-userSpecifiedAttributes") + .displayName("Include Core Attributes") + .description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes, which are " + + "contained in every FlowFile, should be included in the final CSV value generated. The Attribute List property " + + "overrides this setting.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder() + .name(("null-value")) + .displayName("Null Value") + .description("If true a non existing or empty attribute will be 'null' in the resulting CSV. If false an empty " + + "string will be placed in the CSV") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully converted attributes to CSV").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to convert attributes to CSV").build(); + + private List properties; + private Set relationships; + private volatile Set userSpecifiedAttributes; + private volatile Boolean includeCoreAttributes; + private volatile Set coreAttributes; + private volatile boolean destinationContent; + private volatile boolean nullValForEmptyString; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(ATTRIBUTES_LIST); + properties.add(DESTINATION); + properties.add(INCLUDE_CORE_ATTRIBUTES); + properties.add(NULL_VALUE_FOR_EMPTY_STRING); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + private Map buildAttributesMapForFlowFile(FlowFile ff, Set userSpecifiedAttributes) { + Map result; + if (!userSpecifiedAttributes.isEmpty()) { + //the user gave a list of attributes + result = new HashMap<>(userSpecifiedAttributes.size()); + for (String attribute : userSpecifiedAttributes) { + String val = ff.getAttribute(attribute); + if (val != null && !val.isEmpty()) { + result.put(attribute, val); + } else { + if (nullValForEmptyString) { + result.put(attribute, "null"); + } else { + result.put(attribute, ""); + } + } + } + } else { + //the user did not give a list of attributes, take all the attributes from the flowfile + Map ffAttributes = ff.getAttributes(); + result = new HashMap<>(ffAttributes.size()); + for (Map.Entry e : ffAttributes.entrySet()) { + result.put(e.getKey(), e.getValue()); + } + } + + //now glue on the core attributes if the user wants them. + if(includeCoreAttributes) { + for (String coreAttribute : coreAttributes) { + String val = ff.getAttribute(coreAttribute); + //make sure this coreAttribute is applicable to this flowfile. + if(ff.getAttributes().containsKey(coreAttribute)) { + if (val != null && !val.isEmpty()) { + result.put(coreAttribute, val); + } else { + if (nullValForEmptyString) { + result.put(coreAttribute, "null"); + } else { + result.put(coreAttribute, ""); + } + } + } + } + } else { + //remove core attributes since the user does not want them, unless they are in the attribute list. Attribute List always wins + for (String coreAttribute : coreAttributes) { + //never override user specified attributes, even if the user has selected to exclude core attributes + if(!userSpecifiedAttributes.contains(coreAttribute)) { + result.remove(coreAttribute); + } + } + } + return result; + } + + private Set attributeListStringToSet(String attributeList) { + //take the user specified attribute list string and convert to list of strings. + Set result = new HashSet<>(); + if (StringUtils.isNotBlank(attributeList)) { + String[] ats = StringUtils.split(attributeList, OUTPUT_SEPARATOR); + if (ats != null) { + for (String str : ats) { + String trim = str.trim(); + result.add(trim); + } + } + } + return result; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + includeCoreAttributes = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(); + coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); + userSpecifiedAttributes = attributeListStringToSet(context.getProperty(ATTRIBUTES_LIST).getValue()); + destinationContent = OUTPUT_OVERWRITE_CONTENT.equals(context.getProperty(DESTINATION).getValue()); + nullValForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final Map atrList = buildAttributesMapForFlowFile(original, userSpecifiedAttributes); + + //escape attribute values + final StringBuilder sb = new StringBuilder(); + for (final String val : atrList.values()) { + sb.append(StringEscapeUtils.escapeCsv(val)); + sb.append(OUTPUT_SEPARATOR); + } + + //check if the output separator is at the end of the string, if so then remove it + if(sb.length() > 0 && sb.lastIndexOf(OUTPUT_SEPARATOR) == sb.length() -1) { + //remove last separator + sb.deleteCharAt(sb.length() - 1); + } + + try { + if (destinationContent) { + FlowFile conFlowfile = session.write(original, (in, out) -> { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(sb.toString().getBytes()); + } + }); + conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), OUTPUT_MIME_TYPE); + session.transfer(conFlowfile, REL_SUCCESS); + } else { + FlowFile atFlowfile = session.putAttribute(original, OUTPUT_ATTRIBUTE_NAME , sb.toString()); + session.transfer(atFlowfile, REL_SUCCESS); + } + } catch (Exception e) { + getLogger().error(e.getMessage()); + session.transfer(original, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index cfcc85afb3..3220899998 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.standard.AttributesToJSON +org.apache.nifi.processors.standard.AttributesToCSV org.apache.nifi.processors.standard.Base64EncodeContent org.apache.nifi.processors.standard.CompressContent org.apache.nifi.processors.standard.ControlRate diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java new file mode 100644 index 0000000000..a74576aa26 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToCSV.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.google.common.base.Splitter; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.regex.Pattern; + +import static org.junit.Assert.*; + + +public class TestAttributesToCSV { + + private static final String OUTPUT_NEW_ATTRIBUTE = "flowfile-attribute"; + private static final String OUTPUT_OVERWRITE_CONTENT = "flowfile-content"; + private static final String OUTPUT_ATTRIBUTE_NAME = "CSVAttributes"; + private static final String OUTPUT_SEPARATOR = ","; + private static final String OUTPUT_MIME_TYPE = "text/csv"; + private static final String SPLIT_REGEX = OUTPUT_SEPARATOR + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"; + + @Test + public void testAttrListNoCoreNullOffNewAttrToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVAttributes"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVAttributes",""); + } + + @Test + public void testAttrListNoCoreNullOffNewAttrToContent() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + //set the destination of the csv string to be an attribute + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + //use only one attribute, which does not exists, as the list of attributes to convert to csv + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVAttributes"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVAttributes",""); + } + + @Test + public void testAttrListNoCoreNullOffTwoNewAttrToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVAttributes"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVAttributes",","); + } + + @Test + public void testAttrListNoCoreNullTwoNewAttrToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true"); + + final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length"; + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVAttributes"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVAttributes","null,null"); + } + + @Test + public void testNoAttrListNoCoreNullOffToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + //set the destination of the csv string to be an attribute + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVAttributes"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVAttributes",""); + } + + @Test + public void testNoAttrListNoCoreNullToAttribute() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + testRunner.enqueue(ff); + testRunner.run(); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0) + .assertAttributeExists("CSVAttributes"); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + + testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS) + .get(0).assertAttributeEquals("CSVAttributes",""); + } + + + @Test + public void testNoAttrListCoreNullOffToContent() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, "beach-name", "Malibu Beach"); + ff = session.putAttribute(ff, "beach-location", "California, US"); + ff = session.putAttribute(ff, "beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertEquals(OUTPUT_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final byte[] contentData = testRunner.getContentAsByteArray(flowFile); + + final String contentDataString = new String(contentData, "UTF-8"); + + Set contentValues = new HashSet<>(getStrings(contentDataString)); + + assertEquals(6, contentValues.size()); + + assertTrue(contentValues.contains("Malibu Beach")); + assertTrue(contentValues.contains("\"California, US\"")); + assertTrue(contentValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + assertTrue(contentValues.contains(flowFile.getAttribute("filename"))); + assertTrue(contentValues.contains(flowFile.getAttribute("path"))); + assertTrue(contentValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testNoAttrListCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, "beach-name", "Malibu Beach"); + ff = session.putAttribute(ff, "beach-location", "California, US"); + ff = session.putAttribute(ff, "beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set csvAttributeValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, csvAttributeValues.size()); + + assertTrue(csvAttributeValues.contains("Malibu Beach")); + assertTrue(csvAttributeValues.contains("\"California, US\"")); + assertTrue(csvAttributeValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(csvAttributeValues.contains(flowFile.getAttribute("filename"))); + assertTrue(csvAttributeValues.contains(flowFile.getAttribute("path"))); + assertTrue(csvAttributeValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testNoAttrListNoCoreNullOffToContent() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_OVERWRITE_CONTENT); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, "beach-name", "Malibu Beach"); + ff = session.putAttribute(ff, "beach-location", "California, US"); + ff = session.putAttribute(ff, "beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertEquals(OUTPUT_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final byte[] contentData = testRunner.getContentAsByteArray(flowFile); + + final String contentDataString = new String(contentData, "UTF-8"); + Set contentValues = new HashSet<>(getStrings(contentDataString)); + + assertEquals(3, contentValues.size()); + + assertTrue(contentValues.contains("Malibu Beach")); + assertTrue(contentValues.contains("\"California, US\"")); + assertTrue(contentValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + } + + + @Test + public void testAttrListNoCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, "beach-name", "Malibu Beach"); + ff = session.putAttribute(ff, "beach-location", "California, US"); + ff = session.putAttribute(ff, "beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + ff = session.putAttribute(ff, "attribute-should-be-eliminated", "This should not be in CSVAttribute!"); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set csvAttributesValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(3, csvAttributesValues.size()); + + assertTrue(csvAttributesValues.contains("Malibu Beach")); + assertTrue(csvAttributesValues.contains("\"California, US\"")); + assertTrue(csvAttributesValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + } + + @Test + public void testAttrListCoreNullOffToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "true"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, "beach-name", "Malibu Beach"); + ff = session.putAttribute(ff, "beach-location", "California, US"); + ff = session.putAttribute(ff, "beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + ff = session.putAttribute(ff, "attribute-should-be-eliminated", "This should not be in CSVAttribute!"); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set csvAttributesValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(6, csvAttributesValues.size()); + + assertTrue(csvAttributesValues.contains("Malibu Beach")); + assertTrue(csvAttributesValues.contains("\"California, US\"")); + assertTrue(csvAttributesValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + assertTrue(csvAttributesValues.contains(flowFile.getAttribute("filename"))); + assertTrue(csvAttributesValues.contains(flowFile.getAttribute("path"))); + assertTrue(csvAttributesValues.contains(flowFile.getAttribute("uuid"))); + } + + @Test + public void testAttrListNoCoreNullOffOverrideCoreByAttrListToAttribute() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV()); + testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE); + testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false"); + testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, "beach-name,beach-location,beach-endorsement,uuid"); + testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "false"); + + ProcessSession session = testRunner.getProcessSessionFactory().createSession(); + FlowFile ff = session.create(); + + ff = session.putAttribute(ff, "beach-name", "Malibu Beach"); + ff = session.putAttribute(ff, "beach-location", "California, US"); + ff = session.putAttribute(ff, "beach-endorsement", "This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim"); + ff = session.putAttribute(ff, "attribute-should-be-eliminated", "This should not be in CSVAttribute!"); + + testRunner.enqueue(ff); + testRunner.run(); + + List flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS); + + testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1); + + MockFlowFile flowFile = flowFilesForRelationship.get(0); + + assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + + final String attributeData = flowFile.getAttribute(OUTPUT_ATTRIBUTE_NAME); + + Set csvAttributesValues = new HashSet<>(getStrings(attributeData)); + + assertEquals(4, csvAttributesValues.size()); + + assertTrue(csvAttributesValues.contains("Malibu Beach")); + assertTrue(csvAttributesValues.contains("\"California, US\"")); + assertTrue(csvAttributesValues.contains("\"This is our family's favorite beach. We highly recommend it. \n\nThanks, Jim\"")); + + + assertTrue(!csvAttributesValues.contains(flowFile.getAttribute("filename"))); + assertTrue(!csvAttributesValues.contains(flowFile.getAttribute("path"))); + assertTrue(csvAttributesValues.contains(flowFile.getAttribute("uuid"))); + } + + private List getStrings(String sdata) { + return Splitter.on(Pattern.compile(SPLIT_REGEX)).splitToList(sdata); + } + +}