mirror of https://github.com/apache/nifi.git
NIFI-3240 - AttributesToJson performance improvements
This closes #1352. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
be6bcf20ad
commit
1b4729e448
|
@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
@ -36,20 +37,19 @@ import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.StreamCallback;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
|
@ -66,7 +66,7 @@ public class AttributesToJSON extends AbstractProcessor {
|
||||||
|
|
||||||
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
||||||
public static final String DESTINATION_CONTENT = "flowfile-content";
|
public static final String DESTINATION_CONTENT = "flowfile-content";
|
||||||
private static final String APPLICATION_JSON = "application/json";
|
public static final String APPLICATION_JSON = "application/json";
|
||||||
|
|
||||||
|
|
||||||
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
|
||||||
|
@ -116,6 +116,10 @@ public class AttributesToJSON extends AbstractProcessor {
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
private Set<Relationship> relationships;
|
private Set<Relationship> relationships;
|
||||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
private volatile Set<String> attributesToRemove;
|
||||||
|
private volatile Set<String> attributes;
|
||||||
|
private volatile Boolean nullValueForEmptyString;
|
||||||
|
private volatile boolean destinationContent;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
@ -149,55 +153,57 @@ public class AttributesToJSON extends AbstractProcessor {
|
||||||
* @return
|
* @return
|
||||||
* Map of values that are feed to a Jackson ObjectMapper
|
* Map of values that are feed to a Jackson ObjectMapper
|
||||||
*/
|
*/
|
||||||
protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, String atrList,
|
protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove, boolean nullValForEmptyString) {
|
||||||
boolean includeCoreAttributes,
|
Map<String, String> result;
|
||||||
boolean nullValForEmptyString) {
|
//If list of attributes specified get only those attributes. Otherwise write them all
|
||||||
|
if (attributes != null) {
|
||||||
Map<String, String> atsToWrite = new HashMap<>();
|
result = new HashMap<>(attributes.size());
|
||||||
|
for (String attribute : attributes) {
|
||||||
|
String val = ff.getAttribute(attribute);
|
||||||
|
if (val != null || nullValForEmptyString) {
|
||||||
|
result.put(attribute, val);
|
||||||
|
} else {
|
||||||
|
result.put(attribute, "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Map<String, String> ffAttributes = ff.getAttributes();
|
||||||
|
result = new HashMap<>(ffAttributes.size());
|
||||||
|
for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
|
||||||
|
if (!attributesToRemove.contains(e.getKey())) {
|
||||||
|
result.put(e.getKey(), e.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> buildAtrs(String atrList, Set<String> atrsToExclude) {
|
||||||
//If list of attributes specified get only those attributes. Otherwise write them all
|
//If list of attributes specified get only those attributes. Otherwise write them all
|
||||||
if (StringUtils.isNotBlank(atrList)) {
|
if (StringUtils.isNotBlank(atrList)) {
|
||||||
String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
|
String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
|
||||||
if (ats != null) {
|
if (ats != null) {
|
||||||
|
Set<String> result = new HashSet<>(ats.length);
|
||||||
for (String str : ats) {
|
for (String str : ats) {
|
||||||
String cleanStr = str.trim();
|
String trim = str.trim();
|
||||||
String val = ff.getAttribute(cleanStr);
|
if (!atrsToExclude.contains(trim)) {
|
||||||
if (val != null) {
|
result.add(trim);
|
||||||
atsToWrite.put(cleanStr, val);
|
|
||||||
} else {
|
|
||||||
if (nullValForEmptyString) {
|
|
||||||
atsToWrite.put(cleanStr, null);
|
|
||||||
} else {
|
|
||||||
atsToWrite.put(cleanStr, "");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
atsToWrite.putAll(ff.getAttributes());
|
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
if (!includeCoreAttributes) {
|
|
||||||
atsToWrite = removeCoreAttributes(atsToWrite);
|
|
||||||
}
|
|
||||||
|
|
||||||
return atsToWrite;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@OnScheduled
|
||||||
* Remove all of the CoreAttributes from the Attributes that will be written to the Flowfile.
|
public void onScheduled(ProcessContext context) {
|
||||||
*
|
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
|
||||||
* @param atsToWrite
|
.map(CoreAttributes::key)
|
||||||
* List of Attributes that have already been generated including the CoreAttributes
|
.collect(Collectors.toSet());
|
||||||
*
|
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove);
|
||||||
* @return
|
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
|
||||||
* Difference of all attributes minus the CoreAttributes
|
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
|
||||||
*/
|
|
||||||
protected Map<String, String> removeCoreAttributes(Map<String, String> atsToWrite) {
|
|
||||||
for (CoreAttributes c : CoreAttributes.values()) {
|
|
||||||
atsToWrite.remove(c.key());
|
|
||||||
}
|
|
||||||
return atsToWrite;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -207,33 +213,21 @@ public class AttributesToJSON extends AbstractProcessor {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Map<String, String> atrList = buildAttributesMapForFlowFile(original,
|
final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString);
|
||||||
context.getProperty(ATTRIBUTES_LIST).getValue(),
|
|
||||||
context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean(),
|
|
||||||
context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean());
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (destinationContent) {
|
||||||
switch (context.getProperty(DESTINATION).getValue()) {
|
FlowFile conFlowfile = session.write(original, (in, out) -> {
|
||||||
case DESTINATION_ATTRIBUTE:
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME,
|
outputStream.write(objectMapper.writeValueAsBytes(atrList));
|
||||||
objectMapper.writeValueAsString(atrList));
|
}
|
||||||
session.transfer(atFlowfile, REL_SUCCESS);
|
});
|
||||||
break;
|
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
|
||||||
case DESTINATION_CONTENT:
|
session.transfer(conFlowfile, REL_SUCCESS);
|
||||||
FlowFile conFlowfile = session.write(original, new StreamCallback() {
|
} else {
|
||||||
@Override
|
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList));
|
||||||
public void process(InputStream in, OutputStream out) throws IOException {
|
session.transfer(atFlowfile, REL_SUCCESS);
|
||||||
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
|
||||||
outputStream.write(objectMapper.writeValueAsBytes(atrList));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
|
|
||||||
session.transfer(conFlowfile, REL_SUCCESS);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
getLogger().error(e.getMessage());
|
getLogger().error(e.getMessage());
|
||||||
session.transfer(original, REL_FAILURE);
|
session.transfer(original, REL_FAILURE);
|
||||||
|
|
|
@ -19,13 +19,20 @@ package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -193,7 +200,6 @@ public class TestAttributesToJSON {
|
||||||
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
|
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).assertContentEquals("{}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAttribute_singleUserDefinedAttribute() throws Exception {
|
public void testAttribute_singleUserDefinedAttribute() throws Exception {
|
||||||
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
|
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
|
||||||
|
@ -279,4 +285,107 @@ public class TestAttributesToJSON {
|
||||||
assertTrue(val.size() == 1);
|
assertTrue(val.size() == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttribute_noIncludeCoreAttributesUserDefined() throws IOException {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
|
||||||
|
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, " " + TEST_ATTRIBUTE_KEY + " , " + CoreAttributes.PATH.key() + " ");
|
||||||
|
testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
|
||||||
|
ff = session.putAttribute(ff, CoreAttributes.PATH.key(), TEST_ATTRIBUTE_VALUE);
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
|
||||||
|
.assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
|
||||||
|
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
|
||||||
|
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
Map<String, String> val = mapper.readValue(json, HashMap.class);
|
||||||
|
assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY));
|
||||||
|
assertEquals(1, val.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttribute_noIncludeCoreAttributesContent() throws IOException {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
|
||||||
|
testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "false");
|
||||||
|
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
|
||||||
|
ff = session.putAttribute(ff, CoreAttributes.PATH.key(), TEST_ATTRIBUTE_VALUE);
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
Map<String, String> val = mapper.readValue(testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0).toByteArray(), HashMap.class);
|
||||||
|
assertEquals(TEST_ATTRIBUTE_VALUE, val.get(TEST_ATTRIBUTE_KEY));
|
||||||
|
assertEquals(1, val.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttribute_includeCoreAttributesContent() throws IOException {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
|
||||||
|
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_CONTENT);
|
||||||
|
testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "true");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile flowFile = flowFilesForRelationship.get(0);
|
||||||
|
|
||||||
|
assertEquals(AttributesToJSON.APPLICATION_JSON, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
|
||||||
|
|
||||||
|
Map<String, String> val = new ObjectMapper().readValue(flowFile.toByteArray(), HashMap.class);
|
||||||
|
assertEquals(3, val.size());
|
||||||
|
Set<String> coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
|
||||||
|
val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttribute_includeCoreAttributesAttribute() throws IOException {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
|
||||||
|
testRunner.setProperty(AttributesToJSON.INCLUDE_CORE_ATTRIBUTES, "true");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS);
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
|
||||||
|
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile flowFile = flowFilesForRelationship.get(0);
|
||||||
|
|
||||||
|
assertNull(flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
|
||||||
|
|
||||||
|
Map<String, String> val = new ObjectMapper().readValue(flowFile.getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME), HashMap.class);
|
||||||
|
assertEquals(3, val.size());
|
||||||
|
Set<String> coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet());
|
||||||
|
val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue