NIFI-1079 Create a new Attribute from the existing FlowFile attributes by taking

either all of the existing attributes or a user defined list. The
existing Attributes are converted to JSON and placed in a new Attribute
on the existing FlowFile as Attribute “JSONAttributes”

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Jeremy Dyer 2015-10-28 16:20:48 -04:00 committed by Bryan Bende
parent 5cc2b04b91
commit 19b7a4cc7d
3 changed files with 295 additions and 0 deletions

View File

@ -0,0 +1,117 @@
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.*;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"JSON", "attributes"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Evaluates the attributes from a FlowFile and generates a JSON string with the attribute key/value pair. " +
"The resulting JSON string is placed in the FlowFile as a new Attribute with the name 'JSONAttributes'. By default all" +
"Attributes in the FlowFile are placed in the resulting JSON string. If only certain Attributes are desired you may" +
"specify a list of FlowFile Attributes that you want in the resulting JSON string")
@WritesAttribute(attribute = "JSONAttributes", description = "JSON string of all the pre-existing attributes in the flowfile")
public class AttributesToJSON extends AbstractProcessor {
public static final String JSON_ATTRIBUTE_NAME = "JSONAttribute";
private static final String AT_LIST_SEPARATOR = ",";
private static final String DEFAULT_VALUE_IF_ATT_NOT_PRESENT = "";
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
.name("Attributes List")
.description("Comma separated list of attributes to be included in the '" + JSON_ATTRIBUTE_NAME +"' attribute. This list of attributes is case sensitive. If a specified attribute is not found" +
"in the flowfile an empty string will be output for that attritbute in the resulting JSON")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("'" + JSON_ATTRIBUTE_NAME + "' attribute has been successfully added to the flowfile").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to add '" + JSON_ATTRIBUTE_NAME + "' attribute to the flowfile").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
final String atList = context.getProperty(ATTRIBUTES_LIST).getValue();
Map<String, String> atsToWrite = null;
//If list of attributes specified get only those attributes. Otherwise write them all
if (atList != null && !StringUtils.isEmpty(atList)) {
atsToWrite = new HashMap<>();
String[] ats = StringUtils.split(atList, AT_LIST_SEPARATOR);
if (ats != null) {
for (String str : ats) {
String cleanStr = str.trim();
String val = original.getAttribute(cleanStr);
if (val != null) {
atsToWrite.put(cleanStr, val);
} else {
atsToWrite.put(cleanStr, DEFAULT_VALUE_IF_ATT_NOT_PRESENT);
}
}
}
} else {
atsToWrite = original.getAttributes();
}
if (atsToWrite != null) {
if (atsToWrite.size() == 0) {
getLogger().debug("Flowfile contains no attributes to convert to JSON");
} else {
try {
FlowFile updated = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atsToWrite));
session.transfer(updated, REL_SUCCESS);
} catch (JsonProcessingException e) {
getLogger().error(e.getMessage());
session.transfer(original, REL_FAILURE);
}
}
}
}
}

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.standard.AttributesToJSON
org.apache.nifi.processors.standard.Base64EncodeContent
org.apache.nifi.processors.standard.CompressContent
org.apache.nifi.processors.standard.ControlRate

View File

@ -0,0 +1,177 @@
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertTrue;
public class TestAttributesToJSON {
private static Logger LOGGER;
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.AttributesToJSON", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestAttributesToJSON", "debug");
LOGGER = LoggerFactory.getLogger(TestAttributesToJSON.class);
}
private static final String TEST_ATTRIBUTE_KEY = "TestAttribute";
private static final String TEST_ATTRIBUTE_VALUE = "TestValue";
@Test(expected = AssertionError.class)
public void testInvalidUserSuppliedAttributeList() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
//Attribute list CANNOT be empty
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
testRunner.enqueue(ff);
testRunner.run();
}
@Test
public void testInvalidJSONValueInAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
//Create attribute that contains an invalid JSON Character
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, "'badjson'");
testRunner.enqueue(ff);
testRunner.run();
//Expecting success transition because Jackson is taking care of escaping the bad JSON characters
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
}
@Test
public void testAttribuets_emptyListUserSpecifiedAttributes() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
}
@Test
public void testAttribute_singleUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
assertTrue(val.size() == 1);
}
@Test
public void testAttribute_singleUserDefinedAttributeWithWhiteSpace() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " ");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
assertTrue(val.size() == 1);
}
@Test
public void testAttribute_singleNonExistingUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "NonExistingAttribute");
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).
assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
//If a Attribute is requested but does not exist then it is placed in the JSON with an empty string
assertTrue(val.get("NonExistingAttribute").equals(""));
assertTrue(val.size() == 1);
}
}