NIFI-12411 Update PublishAMQP with configurable Header Source Property

This closes #8105

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Umar Hussain 2023-12-02 15:05:00 +01:00 committed by exceptionfactory
parent 7348740ecc
commit abe41ff649
No known key found for this signature in database
7 changed files with 387 additions and 189 deletions

View File

@ -28,7 +28,7 @@ import com.rabbitmq.client.ReturnListener;
/**
* Generic publisher of messages to AMQP-based messaging system. It is based on
* RabbitMQ client API (https://www.rabbitmq.com/api-guide.html)
* RabbitMQ client API (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>)
*/
final class AMQPPublisher extends AMQPWorker {
@ -63,7 +63,7 @@ final class AMQPPublisher extends AMQPWorker {
exchange = exchange == null ? "" : exchange.trim();
if (processorLog.isDebugEnabled()) {
if (exchange.length() == 0) {
if (exchange.isEmpty()) {
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey);

View File

@ -24,7 +24,6 @@ import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@ -49,7 +48,7 @@ import org.apache.nifi.ssl.SSLContextService;
/**
* Base processor that uses RabbitMQ client API
* (https://www.rabbitmq.com/api-guide.html) to rendezvous with AMQP-based
* (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API Guide</a>) to rendezvous with AMQP-based
* messaging systems version 0.9.1
*
* @param <T> the type of {@link AMQPWorker}. Please see {@link AMQPPublisher}
@ -57,6 +56,20 @@ import org.apache.nifi.ssl.SSLContextService;
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
public static final String AMQP_APPID_ATTRIBUTE = "amqp$appId";
public static final String AMQP_CONTENT_ENCODING_ATTRIBUTE = "amqp$contentEncoding";
public static final String AMQP_CONTENT_TYPE_ATTRIBUTE = "amqp$contentType";
public static final String AMQP_HEADERS_ATTRIBUTE = "amqp$headers";
public static final String AMQP_DELIVERY_MODE_ATTRIBUTE = "amqp$deliveryMode";
public static final String AMQP_PRIORITY_ATTRIBUTE = "amqp$priority";
public static final String AMQP_CORRELATION_ID_ATTRIBUTE = "amqp$correlationId";
public static final String AMQP_REPLY_TO_ATTRIBUTE = "amqp$replyTo";
public static final String AMQP_EXPIRATION_ATTRIBUTE = "amqp$expiration";
public static final String AMQP_MESSAGE_ID_ATTRIBUTE = "amqp$messageId";
public static final String AMQP_TIMESTAMP_ATTRIBUTE = "amqp$timestamp";
public static final String AMQP_TYPE_ATTRIBUTE = "amqp$type";
public static final String AMQP_USER_ID_ATTRIBUTE = "amqp$userId";
public static final String AMQP_CLUSTER_ID_ATTRIBUTE = "amqp$clusterId";
public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder()
.name("Brokers")
.description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is " +
@ -129,17 +142,15 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
properties.add(USER);
properties.add(PASSWORD);
properties.add(AMQP_VERSION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USE_CERT_AUTHENTICATION);
propertyDescriptors = Collections.unmodifiableList(properties);
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION);
}
protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {

View File

@ -22,13 +22,14 @@ import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.util.EnumSet;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -52,37 +53,31 @@ import java.util.stream.Collectors;
@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be "
+ "emitted as its own FlowFile to the 'success' relationship.")
@WritesAttributes({
@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, description = "The App ID field from the AMQP Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The Content Encoding reported by the AMQP Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content Type reported by the AMQP Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."),
@WritesAttribute(attribute = "<Header Key Prefix>.<attribute>",
description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"),
@WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = "amqp$priority", description = "The Message priority"),
@WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@WritesAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"),
@WritesAttribute(attribute = "amqp$expiration", description = "The Message Expiration"),
@WritesAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"),
@WritesAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@WritesAttribute(attribute = "amqp$type", description = "The type of message"),
@WritesAttribute(attribute = "amqp$userId", description = "The ID of the user"),
@WritesAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"),
@WritesAttribute(attribute = "amqp$routingKey", description = "The routingKey of the AMQP Message"),
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received")
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric indicator for the Message's Delivery Mode"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, description = "The Message priority"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The Message's Correlation ID"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, description = "The value of the Message's Reply-To field"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message Expiration"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID of the Message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, description = "The type of message"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, description = "The ID of the user"),
@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"),
@WritesAttribute(attribute = ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, description = "The routingKey of the AMQP Message"),
@WritesAttribute(attribute = ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, description = "The exchange from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = new AllowableValue("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'.");
public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new AllowableValue("JSON String", "JSON String",
"Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well.");
public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new AllowableValue("FlowFile Attributes", "FlowFile Attributes",
"Put each header as attribute of the flow file with a prefix specified in the properties");
public static final String AMQP_ROUTING_KEY_ATTRIBUTE = "amqp$routingKey";
public static final String AMQP_EXCHANGE_ATTRIBUTE = "amqp$exchange";
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("Queue")
.description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ")
@ -128,8 +123,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.name("header.format")
.displayName("Header Output Format")
.description("Defines how to output headers from the received message")
.allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
.defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
.allowableValues(OutputHeaderFormat.getAllowedValues())
.defaultValue(OutputHeaderFormat.COMMA_SEPARATED_STRING)
.required(true)
.build();
public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder()
@ -138,7 +133,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property")
.defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
.dependsOn(HEADER_FORMAT, OutputHeaderFormat.ATTRIBUTES)
.required(true)
.build();
@ -149,7 +144,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.defaultValue(",")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.dependsOn(HEADER_FORMAT, OutputHeaderFormat.COMMA_SEPARATED_STRING)
.required(false)
.build();
static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder()
@ -159,7 +154,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("False")
.allowableValues("True", "False")
.dependsOn(HEADER_FORMAT, HEADERS_FORMAT_COMMA_SEPARATED_STRING)
.dependsOn(HEADER_FORMAT, OutputHeaderFormat.COMMA_SEPARATED_STRING)
.required(false)
.build();
@ -223,7 +218,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
final String headerFormat = context.getProperty(HEADER_FORMAT).getValue();
final String headerKeyPrefix = context.getProperty(HEADER_KEY_PREFIX).getValue();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString());
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), context.getProperty(HEADER_SEPARATOR).toString());
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@ -237,59 +232,62 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
}
}
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, String headersStringFormat, String headerAttributePrefix, boolean removeCurlyBraces,
String valueSeparatorForHeaders) {
AllowableValue headerFormat = new AllowableValue(headersStringFormat);
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, String headersFormat, String headerAttributePrefix, boolean removeCurlyBraces,
String valueSeparatorForHeaders) {
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "replyTo", properties.getReplyTo());
addAttribute(attributes, ATTRIBUTES_PREFIX + "expiration", properties.getExpiration());
addAttribute(attributes, ATTRIBUTES_PREFIX + "messageId", properties.getMessageId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "timestamp", properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, properties.getAppId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, properties.getContentEncoding());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getContentType());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, properties.getDeliveryMode());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, properties.getPriority());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, properties.getCorrelationId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, properties.getReplyTo());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, properties.getExpiration());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, properties.getMessageId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, properties.getType());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, properties.getUserId());
addAttribute(attributes, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, properties.getClusterId());
addAttribute(attributes, AMQP_ROUTING_KEY_ATTRIBUTE, envelope.getRoutingKey());
addAttribute(attributes, AMQP_EXCHANGE_ATTRIBUTE, envelope.getExchange());
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (HEADERS_FORMAT_ATTRIBUTES.equals(headerFormat)) {
headers.forEach((key, value) -> addAttribute(attributes,
String.format("%s.%s", headerAttributePrefix, key), value));
if (OutputHeaderFormat.ATTRIBUTES.getValue().equals(headersFormat)) {
headers.forEach((key, value) -> addAttribute(attributes, String.format("%s.%s", headerAttributePrefix, key), value));
} else {
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
buildHeaders(properties.getHeaders(), headerFormat, removeCurlyBraces,
valueSeparatorForHeaders));
addAttribute(attributes, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
buildHeaders(properties.getHeaders(), headersFormat, removeCurlyBraces,
valueSeparatorForHeaders));
}
}
return attributes;
}
/**
* Adds the given attribute name and value in to the map of attributes
* @param attributes List of attributes to update
* @param attributeName Name of the attribute
* @param value Value of the attribute
*/
private void addAttribute(final Map<String, String> attributes, final String attributeName, final Object value) {
if (value == null) {
return;
}
attributes.put(attributeName, value.toString());
}
private String buildHeaders(Map<String, Object> headers, AllowableValue headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
private String buildHeaders(Map<String, Object> headers, String headerFormat, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
if (headers == null) {
return null;
}
String headerString = null;
if (headerFormat.equals(HEADERS_FORMAT_COMMA_SEPARATED_STRING)) {
if ( OutputHeaderFormat.COMMA_SEPARATED_STRING.getValue().equals(headerFormat)) {
headerString = convertMapToString(headers, valueSeparatorForHeaders);
if (!removeCurlyBraces) {
headerString = "{" + headerString + "}";
}
} else if (headerFormat.equals(HEADERS_FORMAT_JSON_STRING)) {
} else if (OutputHeaderFormat.JSON_STRING.getValue().equals(headerFormat)) {
try {
headerString = convertMapToJSONString(headers);
} catch (JsonProcessingException e) {
@ -314,9 +312,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
final String queueName = context.getProperty(QUEUE).getValue();
final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
final int prefetchCount = context.getProperty(PREFETCH_COUNT).asInteger();
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, prefetchCount, getLogger());
return amqpConsumer;
return new AMQPConsumer(connection, queueName, autoAcknowledge, prefetchCount, getLogger());
} catch (final IOException ioe) {
throw new ProcessException("Failed to connect to AMQP Broker", ioe);
}
@ -331,4 +327,42 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
public Set<Relationship> getRelationships() {
return relationships;
}
public enum OutputHeaderFormat implements DescribedValue {
COMMA_SEPARATED_STRING("Comma-Separated String", "Comma-Separated String",
"Put all headers as a string with the specified separator in the attribute 'amqp$headers'."),
JSON_STRING("JSON String", "JSON String",
"Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well."),
ATTRIBUTES("FlowFile Attributes", "FlowFile Attributes",
"Put each header as attribute of the flow file with a prefix specified in the properties");
private final String value;
private final String displayName;
private final String description;
OutputHeaderFormat(String value, String displayName, String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
public static EnumSet<OutputHeaderFormat> getAllowedValues() {
return EnumSet.of(COMMA_SEPARATED_STRING, JSON_STRING, ATTRIBUTES);
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -42,7 +43,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -58,23 +58,23 @@ import java.util.regex.Pattern;
+ "that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@ReadsAttributes({
@ReadsAttribute(attribute = "amqp$appId", description = "The App ID field to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$contentType", description = "The Content Type to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$headers", description = "The headers to set on the AMQP Message"),
@ReadsAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"),
@ReadsAttribute(attribute = "amqp$priority", description = "The Message priority"),
@ReadsAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"),
@ReadsAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"),
@ReadsAttribute(attribute = "amqp$expiration", description = "The Message Expiration"),
@ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"),
@ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@ReadsAttribute(attribute = "amqp$type", description = "The type of message"),
@ReadsAttribute(attribute = "amqp$userId", description = "The ID of the user"),
@ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, description = "The App ID field to set on the AMQP Message"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The Content Encoding to set on the AMQP Message"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content Type to set on the AMQP Message"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, description = "The headers to set on the AMQP Message, if 'Header Source' is set to use it. "
+ "See additional details of the processor."),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric indicator for the Message's Delivery Mode"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, description = "The Message priority"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The Message's Correlation ID"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, description = "The value of the Message's Reply-To field"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message Expiration"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID of the Message"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of the Message, as the number of milliseconds since epoch"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, description = "The type of message"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, description = "The ID of the user"),
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"),
})
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
private static final String ATTRIBUTES_PREFIX = "amqp$";
public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
.name("Exchange Name")
@ -85,7 +85,6 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
.name("Routing Key")
.description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
@ -96,13 +95,30 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder()
.name("Headers Source")
.description("The source of the headers which will be applied to the published message.")
.required(true)
.allowableValues(InputHeaderSource.class)
.defaultValue(InputHeaderSource.AMQP_HEADERS_ATTRIBUTE)
.build();
public static final PropertyDescriptor HEADERS_PATTERN = new PropertyDescriptor.Builder()
.name("Headers Pattern")
.description("Regular expression that will be evaluated against the FlowFile attributes to select the matching attributes and put as AMQP headers. "
+ "Attribute name will be used as header key.")
.required(true)
.dependsOn(HEADERS_SOURCE, InputHeaderSource.FLOWFILE_ATTRIBUTES)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
.name("header.separator")
.displayName("Header Separator")
.description("The character that is used to split key-value for headers. The value must only one character. "
+ "Otherwise you will get an error message")
.defaultValue(",")
.dependsOn(HEADERS_SOURCE, InputHeaderSource.AMQP_HEADERS_ATTRIBUTE)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.required(false)
.build();
@ -125,26 +141,24 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(EXCHANGE);
properties.add(ROUTING_KEY);
properties.add(HEADERS_SOURCE);
properties.add(HEADERS_PATTERN);
properties.add(HEADER_SEPARATOR);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);
Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
/**
* Will construct AMQP message by extracting its body from the incoming {@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile is penalized and transferred to the 'failure' {@link Relationship}
* <br>
*
* NOTE: Attributes extracted from {@link FlowFile} are considered
* candidates for AMQP properties if their names are prefixed with
* {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml)
* <p>
* NOTE: Attributes extracted from {@link FlowFile} are considered candidates for AMQP properties if their names are prefixed with
* "amqp$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it depends on the value of
* {@link PublishAMQP#HEADERS_SOURCE}, if the value is {@link InputHeaderSource#FLOWFILE_ATTRIBUTES} then message headers are created from this attribute value,
* otherwise this attribute will be ignored.
*/
@Override
protected void processResource(final Connection connection, final AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException {
@ -158,10 +172,17 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
InputHeaderSource selectedHeaderSource = context.getProperty(HEADERS_SOURCE).asAllowableValue(InputHeaderSource.class);
Character headerSeparator = null;
Pattern pattern = null;
if (context.getProperty(HEADERS_PATTERN).isSet()) {
pattern = Pattern.compile(context.getProperty(HEADERS_PATTERN).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(HEADER_SEPARATOR).isSet()) {
headerSeparator = context.getProperty(HEADER_SEPARATOR).getValue().charAt(0);
}
final Character separator = context.getProperty(HEADER_SEPARATOR).toString().charAt(0);
final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile, separator);
final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile, selectedHeaderSource, headerSeparator, pattern);
final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
final byte[] messageContent = extractMessage(flowFile, session);
@ -199,15 +220,22 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
/**
* Extracts contents of the {@link FlowFile} as byte array.
*/
private byte[] extractMessage(FlowFile flowFile, ProcessSession session) {
private byte[] extractMessage(final FlowFile flowFile, ProcessSession session) {
final byte[] messageContent = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true));
return messageContent;
}
private void updateBuilderFromAttribute(final FlowFile flowFile, final String attribute, final Consumer<String> updater) {
final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX + attribute);
/**
* Reads an attribute from flowFile and pass it to the consumer function
*
* @param flowFile FlowFile for reading the attribute
* @param attributeKey Name of the attribute
* @param updater Consumer function which will use the attribute value
*/
private void readAmqpAttribute(final FlowFile flowFile, final String attributeKey, final Consumer<String> updater) {
final String attributeValue = flowFile.getAttribute(attributeKey);
if (attributeValue == null) {
return;
}
@ -215,37 +243,69 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
try {
updater.accept(attributeValue);
} catch (final Exception e) {
getLogger().warn("Failed to update AMQP Message Property {}", attribute, e);
getLogger().warn("Failed to update AMQP Message Property [{}]", attributeKey, e);
}
}
/**
* Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
* properties if their names are prefixed with
* {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
* properties if their names are prefixed with "amqp$" (e.g., amqp$contentType=text/xml).
*/
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, Character headerSeparator) {
private BasicProperties extractAmqpPropertiesFromFlowFile(final FlowFile flowFile, final InputHeaderSource selectedHeaderSource, final Character separator, final Pattern pattern) {
final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
updateBuilderFromAttribute(flowFile, "contentType", builder::contentType);
updateBuilderFromAttribute(flowFile, "contentEncoding", builder::contentEncoding);
updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> builder.deliveryMode(Integer.parseInt(mode)));
updateBuilderFromAttribute(flowFile, "priority", pri -> builder.priority(Integer.parseInt(pri)));
updateBuilderFromAttribute(flowFile, "correlationId", builder::correlationId);
updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
updateBuilderFromAttribute(flowFile, "expiration", builder::expiration);
updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
updateBuilderFromAttribute(flowFile, "timestamp", ts -> builder.timestamp(new Date(Long.parseLong(ts))));
updateBuilderFromAttribute(flowFile, "type", builder::type);
updateBuilderFromAttribute(flowFile, "userId", builder::userId);
updateBuilderFromAttribute(flowFile, "appId", builder::appId);
updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, builder::contentEncoding);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode -> builder.deliveryMode(Integer.parseInt(mode)));
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri -> builder.priority(Integer.parseInt(pri)));
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new Date(Long.parseLong(ts))));
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, builder::type);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId);
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
Map<String, Object> headers = prepareAMQPHeaders(flowFile, selectedHeaderSource, separator, pattern);
builder.headers(headers);
return builder.build();
}
/**
* Extract AMQP headers from incoming {@link FlowFile} based on selected headers source value.
*
* @param flowFile used to extract headers
* @return {@link Map}
*/
private Map<String, Object> prepareAMQPHeaders(final FlowFile flowFile, final InputHeaderSource selectedHeaderSource, final Character headerSeparator, final Pattern pattern) {
final Map<String, Object> headers = new HashMap<>();
if (InputHeaderSource.FLOWFILE_ATTRIBUTES.equals(selectedHeaderSource)) {
headers.putAll(getMatchedAttributes(flowFile.getAttributes(), pattern));
} else if (InputHeaderSource.AMQP_HEADERS_ATTRIBUTE.equals(selectedHeaderSource)) {
readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value -> headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
}
return headers;
}
/**
* Matches the pattern to keys of input attributes and output the amqp headers map
* @param attributes flowFile attributes to scan for match
* @return Map with entries matching the pattern
*/
private Map<String, String> getMatchedAttributes(final Map<String, String> attributes, final Pattern pattern) {
final Map<String, String> headers = new HashMap<>();
for (Map.Entry<String, String> attributeEntry : attributes.entrySet()) {
if (pattern.matcher(attributeEntry.getKey()).matches()) {
headers.put(attributeEntry.getKey(), attributeEntry.getValue());
}
}
return headers;
}
/**
* Will validate if provided amqpPropValue can be converted to a {@link Map}.
* Should be passed in the format: amqp$headers=key=value
@ -253,7 +313,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
* @param amqpPropValue the value of the property
* @return {@link Map} if valid otherwise null
*/
private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Character splitValue) {
private Map<String, Object> validateAMQPHeaderProperty(final String amqpPropValue, final Character splitValue) {
final String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
final Map<String, Object> headers = new HashMap<>();
for (String strEntry : strEntries) {
@ -268,4 +328,31 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
}
return headers;
}
public enum InputHeaderSource implements DescribedValue {
FLOWFILE_ATTRIBUTES("FlowFile Attributes", "Select FlowFile Attributes based on regular expression pattern for event headers. Key of the matching attribute will be used as header key"),
AMQP_HEADERS_ATTRIBUTE("AMQP Headers Attribute", "Prepare headers from '" + AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE + "' attribute string");
private final String name;
private final String description;
InputHeaderSource(String displayName, String description) {
this.name = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return name;
}
@Override
public String getDescription() {
return description;
}
}
}

View File

@ -34,8 +34,8 @@
This processor does two things. It constructs AMQP Message by extracting FlowFile contents (both body and attributes).
Once message is constructed it is sent to an AMQP Exchange.
AMQP Properties will be extracted from the FlowFile and converted to <i>com.rabbitmq.client.AMQP.BasicProperties</i> to be sent
along with the message. Upon success the incoming FlowFile is transfered to <i>success</i> Relationship and upon failure FlowFile is
penalized and transfered to the <i>failure</i> Relationship.
along with the message. Upon success the incoming FlowFile is transferred to <i>success</i> Relationship and upon failure FlowFile is
penalized and transferred to the <i>failure</i> Relationship.
</p>
<h2>Where did my message go?</h2>
<p>
@ -51,10 +51,22 @@
properties if their names are prefixed with <i>amqp$</i> (e.g., amqp$contentType=text/xml). To enrich message with additional AMQP properties
you may use <b>UpdateAttribute</b> processor between the source processor and PublishAMQP processor.
The following is the list of available standard AMQP properties: <i>("amqp$contentType", "amqp$contentEncoding",
"amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
"amqp$headers" (if 'Headers Source' is set to 'Attribute "amqp$headers" Value') , "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
"amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
"amqp$clusterId")</i>
</p>
<h3>AMQP Message Headers Source</h3>
<p>
The headers attached to AMQP message by the processor depends on the "Headers Source" property value.
<ol>
<li><b>Attribute "amqp$headers" Value</b> - The processor will read single attribute "amqp$headers" and split it based on "Header Separator" and then read headers
in <i>key=value</i> format.
</li>
<li><b>Attributes Matching Regex</b> - The processor will pick flow file attributes by matching the regex provided in "Attributes To Headers Regular Expression".
The name of the attribute is used as key of header
</li>
</ol>
</p>
<h2>Configuration Details</h2>
<p>
At the time of writing this document it only defines the essential configuration properties which are suitable for most cases.

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.apache.nifi.amqp.processors.ConsumeAMQP.OutputHeaderFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
@ -160,8 +161,8 @@ public class ConsumeAMQPTest {
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
}
}
@ -187,13 +188,13 @@ public class ConsumeAMQPTest {
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, OutputHeaderFormat.JSON_STRING);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
String headers = successFF.getAttribute(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE);
JsonNode jsonNode = objectMapper.readTree(headers);
assertEquals(expectedJson, jsonNode);
}
@ -222,14 +223,14 @@ public class ConsumeAMQPTest {
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT, OutputHeaderFormat.ATTRIBUTES);
runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX, headerPrefix);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
successFF.assertAttributeNotExists("amqp$headers");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
successFF.assertAttributeNotExists(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE);
expectedHeadersMap.forEach((key, value) -> {
successFF.assertAttributeEquals(headerPrefix + "." + key, value.toString());
} );
@ -260,9 +261,9 @@ public class ConsumeAMQPTest {
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
String headers = successFF.getAttribute(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE);
assertEquals(EXPECTED_RESULT, headers);
}
}
@ -297,9 +298,9 @@ public class ConsumeAMQPTest {
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
successFF.assertAttributeEquals(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "key1=(bar,bar)");
}
}
@ -329,9 +330,9 @@ public class ConsumeAMQPTest {
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
String headers = successFF.getAttribute(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE);
assertEquals(EXPECTED_RESULT, headers);
}
}
@ -362,9 +363,9 @@ public class ConsumeAMQPTest {
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, "myExchange");
String headers = successFF.getAttribute(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE);
assertEquals(EXPECTED_RESULT, headers);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.amqp.processors;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import org.apache.nifi.amqp.processors.PublishAMQP.InputHeaderSource;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -45,28 +46,27 @@ public class PublishAMQPTest {
final TestRunner runner = TestRunners.newTestRunner(pubProc);
setConnectionProperties(runner);
final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put("foo", "bar");
put("foo2", "bar2");
put("foo3", null);
}};
final Map<String, String> expectedHeaders = new HashMap<>();
expectedHeaders.put("foo", "bar");
expectedHeaders.put("foo2", "bar2");
expectedHeaders.put("foo3", null);
final Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "bar");
attributes.put("amqp$contentType", "foo/bar");
attributes.put("amqp$contentEncoding", "foobar123");
attributes.put("amqp$headers", "foo=bar,foo2=bar2,foo3");
attributes.put("amqp$deliveryMode", "1");
attributes.put("amqp$priority", "2");
attributes.put("amqp$correlationId", "correlationId123");
attributes.put("amqp$replyTo", "replyTo123");
attributes.put("amqp$expiration", "expiration123");
attributes.put("amqp$messageId", "messageId123");
attributes.put("amqp$timestamp", "123456789");
attributes.put("amqp$type", "type123");
attributes.put("amqp$userId", "userId123");
attributes.put("amqp$appId", "appId123");
attributes.put("amqp$clusterId", "clusterId123");
attributes.put(AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, "foo/bar");
attributes.put(AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, "foobar123");
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=bar,foo2=bar2,foo3");
attributes.put(AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, "1");
attributes.put(AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, "2");
attributes.put(AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, "correlationId123");
attributes.put(AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, "replyTo123");
attributes.put(AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, "expiration123");
attributes.put(AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, "messageId123");
attributes.put(AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, "123456789");
attributes.put(AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, "type123");
attributes.put(AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, "userId123");
attributes.put(AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, "appId123");
attributes.put(AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, "clusterId123");
runner.enqueue("Hello Joe".getBytes(), attributes);
@ -107,14 +107,13 @@ public class PublishAMQPTest {
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR, "|");
final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put("foo", "(bar,bar)");
put("foo2", "bar2");
put("foo3", null);
}};
final Map<String, String> expectedHeaders = new HashMap<>();
expectedHeaders.put("foo", "(bar,bar)");
expectedHeaders.put("foo2", "bar2");
expectedHeaders.put("foo3", null);
final Map<String, String> attributes = new HashMap<>();
attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3");
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3");
runner.enqueue("Hello Joe".getBytes(), attributes);
@ -135,7 +134,7 @@ public class PublishAMQPTest {
}
@Test
public void validateWithNotValidHeaderSeparatorParameter() {
public void validateWithNotValidHeaderSeparatorParameter() {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR, "|,");
@ -149,14 +148,14 @@ public class PublishAMQPTest {
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR, "|");
final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put("foo", "(bar,bar)");
put("foo2", "bar2");
put("foo3", null);
}};
final Map<String, String> expectedHeaders = new HashMap<>();
expectedHeaders.put("foo", "(bar,bar)");
expectedHeaders.put("foo2", "bar2");
expectedHeaders.put("foo3", null);
final Map<String, String> attributes = new HashMap<>();
attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed");
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed");
runner.enqueue("Hello Joe".getBytes(), attributes);
@ -191,6 +190,60 @@ public class PublishAMQPTest {
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
}
@Test
public void validateSuccessWithHeaderFromAttributeRegexToSuccess() throws Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADERS_SOURCE, InputHeaderSource.FLOWFILE_ATTRIBUTES);
runner.setProperty(PublishAMQP.HEADERS_PATTERN, "test.*|tmp\\..*|foo2|foo3");
final Map<String, String> expectedHeaders = new HashMap<>();
expectedHeaders.put("test1", "value1");
expectedHeaders.put("test2", "value2");
expectedHeaders.put("foo2", "");
expectedHeaders.put("foo3", null);
expectedHeaders.put("tmp.test1", "tmp1");
expectedHeaders.put("tmp.test2.key", "tmp2");
final Map<String, String> attributes = new HashMap<>();
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3");
attributes.put("test1", "value1");
attributes.put("test2", "value2");
attributes.put("foo4", "value4");
attributes.put("foo2", "");
attributes.put("foo3", null);
attributes.put("tmp.test1", "tmp1");
attributes.put("tmp.test2.key", "tmp2");
runner.enqueue("Hello Joe".getBytes(), attributes);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
final GetResponse msg1 = channel.basicGet("queue1", true);
assertNotNull(msg1);
final Map<String, Object> headerMap = msg1.getProps().getHeaders();
assertEquals(expectedHeaders, headerMap);
assertNotNull(channel.basicGet("queue2", true));
}
@Test
public void validateWithNotValidRegexForAttributeMatch() {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADERS_SOURCE, InputHeaderSource.FLOWFILE_ATTRIBUTES);
runner.setProperty(PublishAMQP.HEADERS_PATTERN, "*");
runner.assertNotValid();
}
private void setConnectionProperties(TestRunner runner) {
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.USER, "user");
@ -200,7 +253,7 @@ public class PublishAMQPTest {
}
public static class LocalPublishAMQP extends PublishAMQP {
private TestConnection connection;
private final TestConnection connection;
public LocalPublishAMQP() {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));