NIFI-11995 Added Header Format configuration to ConsumeAMQP

This closes #7652

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Umar Hussain 2023-08-27 20:19:08 +02:00 committed by exceptionfactory
parent eb7d49cdff
commit 4f4e99085d
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 184 additions and 39 deletions

View File

@ -45,6 +45,14 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.amqp.processors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
@ -26,6 +28,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -39,7 +42,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -53,7 +55,9 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
@WritesAttribute(attribute = "<Header Key Prefix>.<attribute>",
description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"),
@WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = "amqp$priority", description = "The Message priority"),
@WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@ -68,7 +72,16 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'.");
public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new AllowableValue("JSON String", "JSON String",
"Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well.");
public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new AllowableValue("FlowFile Attributes", "FlowFile Attributes",
"Put each header as attribute of the flow file with a prefix specified in the properties");
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
@ -76,7 +89,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
.name("auto.acknowledge")
.displayName("Auto-Acknowledge Messages")
.description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing "
@ -99,15 +112,35 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();
public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder()
.name("header.format")
.displayName("Header Output Format")
.description("Defines how to output headers from the received message")
.allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
.defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
.required(true)
.build();
public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder()
.name("header.key.prefix")
.displayName("Header Key Prefix")
.description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property")
.defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
.name("header.separator")
.displayName("Header Separator")
.description("The character that is used to separate key-value for header in String. The value must only one character."
+ "Otherwise you will get an error message")
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.defaultValue(",")
.required(false)
.build();
.name("header.separator")
.displayName("Header Separator")
.description("The character that is used to separate key-value for header in String. The value must be only one character."
)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.defaultValue(",")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.required(false)
.build();
static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder()
.name("remove.curly.braces")
.displayName("Remove Curly Braces")
@ -115,6 +148,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("False")
.allowableValues("True", "False")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.required(false)
.build();
@ -126,19 +160,23 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
private static final List<PropertyDescriptor> propertyDescriptors;
private static final Set<Relationship> relationships;
private static final ObjectMapper objectMapper;
static {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
properties.add(REMOVE_CURLY_BRACES);
properties.add(HEADER_FORMAT);
properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
properties.add(REMOVE_CURLY_BRACES);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);
Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(rels);
relationships = Set.of(REL_SUCCESS);
objectMapper = new ObjectMapper();
}
/**
@ -170,8 +208,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
context.getProperty(HEADER_SEPARATOR).toString());
final String headerFormat = context.getProperty(HEADER_FORMAT).getValue();
final String headerKeyPrefix = context.getProperty(HEADER_KEY_PREFIX).getValue();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString());
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@ -185,12 +225,13 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
}
}
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, String valueSeperatorForHeaders) {
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, String headersStringFormat, String headerAttributePrefix, boolean removeCurlyBraces,
String valueSeparatorForHeaders) {
AllowableValue headerFormat = new AllowableValue(headersStringFormat);
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces, valueSeperatorForHeaders));
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
@ -201,8 +242,19 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (HEADERS_FORMAT_ATTRIBUTES.equals(headerFormat)) {
headers.forEach((key, value) -> addAttribute(attributes,
String.format("%s.%s", headerAttributePrefix, key), value));
} else {
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
buildHeaders(properties.getHeaders(), headerFormat, removeCurlyBraces,
valueSeparatorForHeaders));
}
}
return attributes;
}
@ -214,14 +266,23 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
attributes.put(attributeName, value.toString());
}
private String buildHeaders(Map<String, Object> headers, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
private String buildHeaders(Map<String, Object> headers, AllowableValue headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
if (headers == null) {
return null;
}
String headerString = convertMapToString(headers,valueSeparatorForHeaders);
String headerString = null;
if (headerFormat.equals(HEADERS_FORMAT_COMMA_SEPARATED_STRING)) {
headerString = convertMapToString(headers, valueSeparatorForHeaders);
if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
}
} else if (headerFormat.equals(HEADERS_FORMAT_JSON_STRING)) {
try {
headerString = convertMapToJSONString(headers);
} catch (JsonProcessingException e) {
getLogger().warn("Header formatting as JSON failed", e);
}
}
return headerString;
}
@ -231,6 +292,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.collect(Collectors.joining(valueSeparatorForHeaders));
}
private static String convertMapToJSONString(Map<String, Object> headers) throws JsonProcessingException {
return objectMapper.writeValueAsString(headers);
}
@Override
protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
try {

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.amqp.processors;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
@ -61,12 +63,12 @@ public class ConsumeAMQPTest {
runner.run();
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 2);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
final MockFlowFile worldFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(1);
worldFF.assertContentEquals("world");
// A single cumulative ack should be used
@ -92,12 +94,12 @@ public class ConsumeAMQPTest {
runner.run(2);
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 2);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
final MockFlowFile worldFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(1);
worldFF.assertContentEquals("world");
// A single cumulative ack should be used
@ -125,9 +127,9 @@ public class ConsumeAMQPTest {
runner.run();
proc.close();
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1);
runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 1);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
helloFF.assertContentEquals("hello");
@ -156,13 +158,83 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}
@Test
public void validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess() throws Exception {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Map<String, Object> headersMap = new HashMap<>();
headersMap.put("foo1", "bar,bar");
headersMap.put("foo2", "bar,bar");
headersMap.put("foo3", "null");
headersMap.put("foo4", null);
ObjectMapper objectMapper = new ObjectMapper();
JsonNode expectedJson = objectMapper.valueToTree(headersMap);
AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap);
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
JsonNode jsonNode = objectMapper.readTree(headers);
assertEquals(expectedJson, jsonNode);
}
}
@Test
public void validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess() throws Exception {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Map<String,Object> expectedHeadersMap = new HashMap<>();
expectedHeadersMap.put("foo1", "bar,bar");
expectedHeadersMap.put("foo2", "bar,bar");
expectedHeadersMap.put("foo3", "null");
final Map<String, Object> headersMap = new HashMap<>(expectedHeadersMap);
headersMap.put("foo4", null);
final String headerPrefix = "test.header";
AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap);
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES);
runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX,headerPrefix);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
successFF.assertAttributeNotExists("amqp$headers");
expectedHeadersMap.forEach((key, value) ->{
successFF.assertAttributeEquals(headerPrefix + "." + key, value.toString());
} );
}
}
@Test
public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
@ -186,7 +258,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@ -223,7 +295,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@ -255,7 +327,7 @@ public class ConsumeAMQPTest {
runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|");
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@ -288,7 +360,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
@ -321,7 +393,7 @@ public class ConsumeAMQPTest {
throw new IllegalStateException("Consumer already created");
}
consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(), context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
return consumer;
} catch (IOException e) {
throw new ProcessException(e);