From 3a7ec5d5429606eb6e15592aedc97434f080fd13 Mon Sep 17 00:00:00 2001 From: Nandor Soma Abonyi Date: Thu, 10 Nov 2022 23:45:52 +0100 Subject: [PATCH] NIFI-10785 Allow publishing AMQP message with null header value NIFI-10785 addressing review comment NIFI-10785 addressing review comments (remove unnecessary property to ignore null headers) Signed-off-by: Nathan Gough This closes #6649. --- .../nifi/amqp/processors/PublishAMQP.java | 78 ++++++------ .../nifi/amqp/processors/PublishAMQPTest.java | 113 +++++++++++------- 2 files changed, 104 insertions(+), 87 deletions(-) diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 4bd94f3a41..6a8c7ac645 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -16,25 +16,15 @@ */ package org.apache.nifi.amqp.processors; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Connection; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SystemResource; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -45,13 +35,19 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Connection; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.regex.Pattern; @Tags({ "amqp", "rabbit", "put", "message", "send", "publish" }) @InputRequirement(Requirement.INPUT_REQUIRED) @@ -89,6 +85,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(Validator.VALID) .build(); + public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder() .name("Routing Key") .description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). " @@ -99,6 +96,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder() .name("header.separator") .displayName("Header Separator") @@ -108,10 +106,12 @@ public class PublishAMQP extends AbstractAMQPProcessor { .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR) .required(false) .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are sent to the AMQP destination are routed to this relationship") .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship") @@ -144,7 +144,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { * * NOTE: Attributes extracted from {@link FlowFile} are considered * candidates for AMQP properties if their names are prefixed with - * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml) + * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml) */ @Override protected void processResource(final Connection connection, final AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException { @@ -153,14 +153,16 @@ public class PublishAMQP extends AbstractAMQPProcessor { return; } - final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile, - context.getProperty(HEADER_SEPARATOR).toString().charAt(0)); final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue(); if (routingKey == null) { throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile."); } + final Character separator = context.getProperty(HEADER_SEPARATOR).toString().charAt(0); + + final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile, separator); + final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue(); final byte[] messageContent = extractMessage(flowFile, session); @@ -199,12 +201,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { */ private byte[] extractMessage(FlowFile flowFile, ProcessSession session){ final byte[] messageContent = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, messageContent, true); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true)); return messageContent; } @@ -226,16 +223,9 @@ public class PublishAMQP extends AbstractAMQPProcessor { * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes * extracted from {@link FlowFile} are considered candidates for AMQP * properties if their names are prefixed with - * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml). - * - * Some fields require a specific format and are validated: - * - * {@link AMQPUtils#validateAMQPHeaderProperty} - * {@link AMQPUtils#validateAMQPDeliveryModeProperty} - * {@link AMQPUtils#validateAMQPPriorityProperty} - * {@link AMQPUtils#validateAMQPTimestampProperty} + * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml). */ - private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile,Character headerSeparator) { + private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, Character headerSeparator) { final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); updateBuilderFromAttribute(flowFile, "contentType", builder::contentType); @@ -251,7 +241,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { updateBuilderFromAttribute(flowFile, "userId", builder::userId); updateBuilderFromAttribute(flowFile, "appId", builder::appId); updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId); - updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers,headerSeparator))); + updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers, headerSeparator))); return builder.build(); } @@ -263,15 +253,17 @@ public class PublishAMQP extends AbstractAMQPProcessor { * @param amqpPropValue the value of the property * @return {@link Map} if valid otherwise null */ - private Map validateAMQPHeaderProperty(String amqpPropValue,Character splitValue) { - String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue))); - Map headers = new HashMap<>(); + private Map validateAMQPHeaderProperty(String amqpPropValue, Character splitValue) { + final String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue))); + final Map headers = new HashMap<>(); for (String strEntry : strEntries) { - String[] kv = strEntry.split("="); + final String[] kv = strEntry.split("=", -1); // without using limit, trailing delimiter would be ignored if (kv.length == 2) { headers.put(kv[0].trim(), kv[1].trim()); + } else if (kv.length == 1) { + headers.put(kv[0].trim(), null); } else { - getLogger().warn("Malformed key value pair for AMQP header property: " + amqpPropValue); + getLogger().warn(String.format("Malformed key value pair in AMQP header property (%s): %s", amqpPropValue, strEntry)); } } return headers; diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index 664862576f..72866ef4f6 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.amqp.processors; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.GetResponse; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Test; + import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -24,19 +33,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.GetResponse; -import org.junit.jupiter.api.Test; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class PublishAMQPTest { @@ -45,11 +43,13 @@ public class PublishAMQPTest { public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { final PublishAMQP pubProc = new LocalPublishAMQP(); final TestRunner runner = TestRunners.newTestRunner(pubProc); - runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); - runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); - runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); - runner.setProperty(PublishAMQP.USER, "user"); - runner.setProperty(PublishAMQP.PASSWORD, "password"); + setConnectionProperties(runner); + + final Map expectedHeaders = new HashMap() {{ + put("foo", "bar"); + put("foo2", "bar2"); + put("foo3", null); + }}; final Map attributes = new HashMap<>(); attributes.put("foo", "bar"); @@ -83,13 +83,7 @@ public class PublishAMQPTest { final Map headerMap = msg1.getProps().getHeaders(); - final Object foo = headerMap.get("foo"); - final Object foo2 = headerMap.get("foo2"); - final Object foo3 = headerMap.get("foo3"); - - assertEquals("bar", foo.toString()); - assertEquals("bar2", foo2.toString()); - assertNull(foo3); + assertEquals(expectedHeaders, headerMap); assertEquals((Integer) 1, msg1.getProps().getDeliveryMode()); assertEquals((Integer) 2, msg1.getProps().getPriority()); @@ -110,15 +104,16 @@ public class PublishAMQPTest { public void validateSuccessWithHeaderWithCommaPublishToSuccess() throws Exception { final PublishAMQP pubProc = new LocalPublishAMQP(); final TestRunner runner = TestRunners.newTestRunner(pubProc); - runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); - runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); - runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); - runner.setProperty(PublishAMQP.USER, "user"); - runner.setProperty(PublishAMQP.PASSWORD, "password"); + setConnectionProperties(runner); runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|"); - final Map attributes = new HashMap<>(); + final Map expectedHeaders = new HashMap() {{ + put("foo", "(bar,bar)"); + put("foo2", "bar2"); + put("foo3", null); + }}; + final Map attributes = new HashMap<>(); attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3"); runner.enqueue("Hello Joe".getBytes(), attributes); @@ -134,14 +129,7 @@ public class PublishAMQPTest { final Map headerMap = msg1.getProps().getHeaders(); - final Object foo = headerMap.get("foo"); - final Object foo2 = headerMap.get("foo2"); - final Object foo3 = headerMap.get("foo3"); - - assertEquals("(bar,bar)", foo.toString()); - assertEquals("bar2", foo2.toString()); - assertNull(foo3); - + assertEquals(expectedHeaders, headerMap); assertNotNull(channel.basicGet("queue2", true)); } @@ -152,18 +140,48 @@ public class PublishAMQPTest { final TestRunner runner = TestRunners.newTestRunner(pubProc); runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|,"); runner.assertNotValid(); - } @Test - public void validateFailedPublishAndTransferToFailure() throws Exception { + public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception { + final PublishAMQP pubProc = new LocalPublishAMQP(); + final TestRunner runner = TestRunners.newTestRunner(pubProc); + setConnectionProperties(runner); + runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|"); + + final Map expectedHeaders = new HashMap() {{ + put("foo", "(bar,bar)"); + put("foo2", "bar2"); + put("foo3", null); + }}; + + final Map attributes = new HashMap<>(); + attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed"); + + runner.enqueue("Hello Joe".getBytes(), attributes); + + runner.run(); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + + final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel(); + final GetResponse msg1 = channel.basicGet("queue1", true); + assertNotNull(msg1); + + final Map headerMap = msg1.getProps().getHeaders(); + + assertEquals(expectedHeaders, headerMap); + + assertNotNull(channel.basicGet("queue2", true)); + } + + @Test + public void validateFailedPublishAndTransferToFailure() { PublishAMQP pubProc = new LocalPublishAMQP(); TestRunner runner = TestRunners.newTestRunner(pubProc); - runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); - runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone"); - runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); - runner.setProperty(PublishAMQP.USER, "user"); - runner.setProperty(PublishAMQP.PASSWORD, "password"); + setConnectionProperties(runner); + runner.setProperty(PublishAMQP.EXCHANGE, "nonExistentExchange"); runner.enqueue("Hello Joe".getBytes()); @@ -173,6 +191,13 @@ public class PublishAMQPTest { assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0)); } + private void setConnectionProperties(TestRunner runner) { + runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); + runner.setProperty(PublishAMQP.USER, "user"); + runner.setProperty(PublishAMQP.PASSWORD, "password"); + runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); + runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); + } public static class LocalPublishAMQP extends PublishAMQP { private TestConnection connection;