NIFI-10785 Allow publishing AMQP message with null header value

NIFI-10785 addressing review comment
NIFI-10785 addressing review comments (remove unnecessary property to ignore null headers)

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6649.
This commit is contained in:
Nandor Soma Abonyi 2022-11-10 23:45:52 +01:00 committed by Nathan Gough
parent de296b5e65
commit 3a7ec5d542
2 changed files with 104 additions and 87 deletions

View File

@ -16,25 +16,15 @@
*/
package org.apache.nifi.amqp.processors;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@ -45,13 +35,19 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
@Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -89,6 +85,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
.name("Routing Key")
.description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
@ -99,6 +96,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
.name("header.separator")
.displayName("Header Separator")
@ -108,10 +106,12 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.required(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship")
@ -144,7 +144,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
*
* NOTE: Attributes extracted from {@link FlowFile} are considered
* candidates for AMQP properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
* {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml)
*/
@Override
protected void processResource(final Connection connection, final AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException {
@ -153,14 +153,16 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
return;
}
final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile,
context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null) {
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
final Character separator = context.getProperty(HEADER_SEPARATOR).toString().charAt(0);
final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile, separator);
final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
final byte[] messageContent = extractMessage(flowFile, session);
@ -199,12 +201,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
*/
private byte[] extractMessage(FlowFile flowFile, ProcessSession session){
final byte[] messageContent = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, messageContent, true);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true));
return messageContent;
}
@ -226,16 +223,9 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
* Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
* properties if their names are prefixed with
* {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml).
*
* Some fields require a specific format and are validated:
*
* {@link AMQPUtils#validateAMQPHeaderProperty}
* {@link AMQPUtils#validateAMQPDeliveryModeProperty}
* {@link AMQPUtils#validateAMQPPriorityProperty}
* {@link AMQPUtils#validateAMQPTimestampProperty}
* {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
*/
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile,Character headerSeparator) {
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, Character headerSeparator) {
final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
updateBuilderFromAttribute(flowFile, "contentType", builder::contentType);
@ -251,7 +241,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
updateBuilderFromAttribute(flowFile, "userId", builder::userId);
updateBuilderFromAttribute(flowFile, "appId", builder::appId);
updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers,headerSeparator)));
updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
return builder.build();
}
@ -263,15 +253,17 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
* @param amqpPropValue the value of the property
* @return {@link Map} if valid otherwise null
*/
private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue,Character splitValue) {
String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
Map<String, Object> headers = new HashMap<>();
private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Character splitValue) {
final String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
final Map<String, Object> headers = new HashMap<>();
for (String strEntry : strEntries) {
String[] kv = strEntry.split("=");
final String[] kv = strEntry.split("=", -1); // without using limit, trailing delimiter would be ignored
if (kv.length == 2) {
headers.put(kv[0].trim(), kv[1].trim());
} else if (kv.length == 1) {
headers.put(kv[0].trim(), null);
} else {
getLogger().warn("Malformed key value pair for AMQP header property: " + amqpPropValue);
getLogger().warn(String.format("Malformed key value pair in AMQP header property (%s): %s", amqpPropValue, strEntry));
}
}
return headers;

View File

@ -16,6 +16,15 @@
*/
package org.apache.nifi.amqp.processors;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -24,19 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PublishAMQPTest {
@ -45,11 +43,13 @@ public class PublishAMQPTest {
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
runner.setProperty(PublishAMQP.PASSWORD, "password");
setConnectionProperties(runner);
final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put("foo", "bar");
put("foo2", "bar2");
put("foo3", null);
}};
final Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "bar");
@ -83,13 +83,7 @@ public class PublishAMQPTest {
final Map<String, Object> headerMap = msg1.getProps().getHeaders();
final Object foo = headerMap.get("foo");
final Object foo2 = headerMap.get("foo2");
final Object foo3 = headerMap.get("foo3");
assertEquals("bar", foo.toString());
assertEquals("bar2", foo2.toString());
assertNull(foo3);
assertEquals(expectedHeaders, headerMap);
assertEquals((Integer) 1, msg1.getProps().getDeliveryMode());
assertEquals((Integer) 2, msg1.getProps().getPriority());
@ -110,15 +104,16 @@ public class PublishAMQPTest {
public void validateSuccessWithHeaderWithCommaPublishToSuccess() throws Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
runner.setProperty(PublishAMQP.PASSWORD, "password");
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|");
final Map<String, String> attributes = new HashMap<>();
final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put("foo", "(bar,bar)");
put("foo2", "bar2");
put("foo3", null);
}};
final Map<String, String> attributes = new HashMap<>();
attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3");
runner.enqueue("Hello Joe".getBytes(), attributes);
@ -134,14 +129,7 @@ public class PublishAMQPTest {
final Map<String, Object> headerMap = msg1.getProps().getHeaders();
final Object foo = headerMap.get("foo");
final Object foo2 = headerMap.get("foo2");
final Object foo3 = headerMap.get("foo3");
assertEquals("(bar,bar)", foo.toString());
assertEquals("bar2", foo2.toString());
assertNull(foo3);
assertEquals(expectedHeaders, headerMap);
assertNotNull(channel.basicGet("queue2", true));
}
@ -152,18 +140,48 @@ public class PublishAMQPTest {
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|,");
runner.assertNotValid();
}
@Test
public void validateFailedPublishAndTransferToFailure() throws Exception {
public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|");
final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
put("foo", "(bar,bar)");
put("foo2", "bar2");
put("foo3", null);
}};
final Map<String, String> attributes = new HashMap<>();
attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed");
runner.enqueue("Hello Joe".getBytes(), attributes);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
final GetResponse msg1 = channel.basicGet("queue1", true);
assertNotNull(msg1);
final Map<String, Object> headerMap = msg1.getProps().getHeaders();
assertEquals(expectedHeaders, headerMap);
assertNotNull(channel.basicGet("queue2", true));
}
@Test
public void validateFailedPublishAndTransferToFailure() {
PublishAMQP pubProc = new LocalPublishAMQP();
TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
runner.setProperty(PublishAMQP.PASSWORD, "password");
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.EXCHANGE, "nonExistentExchange");
runner.enqueue("Hello Joe".getBytes());
@ -173,6 +191,13 @@ public class PublishAMQPTest {
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
}
private void setConnectionProperties(TestRunner runner) {
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.USER, "user");
runner.setProperty(PublishAMQP.PASSWORD, "password");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
}
public static class LocalPublishAMQP extends PublishAMQP {
private TestConnection connection;