diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java index 734d3efd40..87cefc7922 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java @@ -43,6 +43,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; @Tags({"amqp", "rabbit", "get", "message", "receive", "consume"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -170,7 +171,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { final BasicProperties amqpProperties = response.getProps(); final Envelope envelope = response.getEnvelope(); final Map attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), - context.getProperty(HEADER_SEPARATOR).toString().charAt(0)); + context.getProperty(HEADER_SEPARATOR).toString()); flowFile = session.putAllAttributes(flowFile, attributes); session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue()); @@ -184,12 +185,12 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { } } - private Map buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, Character valueSeperatorForHeaders) { + private Map buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, String valueSeperatorForHeaders) { final Map attributes = new HashMap<>(); addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId()); addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding()); addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType()); - addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces,valueSeperatorForHeaders)); + addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces, valueSeperatorForHeaders)); addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode()); addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority()); addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId()); @@ -213,31 +214,21 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { attributes.put(attributeName, value.toString()); } - private String buildHeaders(Map headers, boolean removeCurlyBraces,Character valueSeparatorForHeaders) { + private String buildHeaders(Map headers, boolean removeCurlyBraces, String valueSeparatorForHeaders) { if (headers == null) { return null; } String headerString = convertMapToString(headers,valueSeparatorForHeaders); if (!removeCurlyBraces) { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("{").append(headerString).append("}"); - headerString = stringBuilder.toString(); + headerString = "{" + headerString + "}"; } return headerString; } - public static String convertMapToString(Map headers,Character valueSeparatorForHeaders) { - StringBuilder stringBuilder = new StringBuilder(); - boolean notFirst = false; - for (Map.Entry entry : headers.entrySet()) { - if (notFirst) { - stringBuilder.append(valueSeparatorForHeaders); - } - stringBuilder.append(entry.getKey()).append("=").append(entry.getValue().toString()); - notFirst = true; - } - return stringBuilder.toString(); + private static String convertMapToString(Map headers, String valueSeparatorForHeaders) { + return headers.entrySet().stream().map(e -> (e.getValue()!= null) ? e.getKey() + "=" + e.getValue(): e.getKey()) + .collect(Collectors.joining(valueSeparatorForHeaders)); } @Override diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 6daf5d979c..c382e730dd 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -16,11 +16,16 @@ */ package org.apache.nifi.amqp.processors; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.MessageProperties; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Arrays; @@ -30,19 +35,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; -import java.util.regex.Pattern; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.MessageProperties; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; public class ConsumeAMQPTest { @@ -170,8 +168,11 @@ public class ConsumeAMQPTest { final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final Map headersMap = new HashMap<>(); - headersMap.put("foo1","bar,bar"); - headersMap.put("foo2","bar,bar"); + headersMap.put("foo1", "bar,bar"); + headersMap.put("foo2", "bar,bar"); + headersMap.put("foo3", "null"); + headersMap.put("foo4", null); + final String EXPECTED_RESULT = "{foo1=bar,bar|foo2=bar,bar|foo3=null|foo4}"; AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); builderBasicProperties.headers(headersMap); @@ -190,8 +191,7 @@ public class ConsumeAMQPTest { successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); String headers = successFF.getAttribute("amqp$headers"); - Map properties = convertStringToMap(headers.substring(1,headers.length()-1),"|"); - assertEquals(headersMap,properties); + assertEquals(EXPECTED_RESULT, headers); } } @Test @@ -239,6 +239,7 @@ public class ConsumeAMQPTest { final Map headersMap = new HashMap<>(); headersMap.put("key1","(bar,bar)"); headersMap.put("key2","(bar,bar)"); + final String EXPECTED_RESULT = "key1=(bar,bar)|key2=(bar,bar)"; AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); builderBasicProperties.headers(headersMap); @@ -259,8 +260,7 @@ public class ConsumeAMQPTest { successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); String headers = successFF.getAttribute("amqp$headers"); - Map properties = convertStringToMap(headers,"|"); - assertEquals(headersMap,properties); + assertEquals(EXPECTED_RESULT, headers); } } @@ -271,10 +271,14 @@ public class ConsumeAMQPTest { final Map headersMap = new HashMap<>(); headersMap.put("key1","bar"); headersMap.put("key2","bar2"); + headersMap.put("key3",""); + headersMap.put("key4", null); + final String EXPECTED_RESULT = "{key1=bar,key2=bar2,key3=,key4}"; AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); builderBasicProperties.headers(headersMap); + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { @@ -289,22 +293,10 @@ public class ConsumeAMQPTest { successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$exchange", "myExchange"); String headers = successFF.getAttribute("amqp$headers"); - Map properties = convertStringToMap(headers.substring(1,headers.length()-1),","); - assertEquals(headersMap,properties); + assertEquals(EXPECTED_RESULT, headers); } } - - private Map convertStringToMap(String map,String splitCharacter){ - Map headers = new HashMap<>(); - String[] pairs = map.split(Pattern.quote(String.valueOf(splitCharacter))); - for (String pair : pairs) { - String[] keyValue = pair.split("=", 2); - assertEquals(2,keyValue.length); - headers.put(keyValue[0].trim(), keyValue[1].trim()); - } - return headers; - } private TestRunner initTestRunner(ConsumeAMQP proc) { TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");