NIFI-10317 Taking care of NullPointerException if AMQP header value is null

NIFI-10317 refactoring to remove repeated conversions to string from char
NIFI-10317 correctly handle null values: null vs "null"
NIFI-10317 adding test
NIFI-10317 - Updated ConsumeAMQPTest to test for null and empty header values.
NIFI-10317 - Updated ConsumeAMQPTest to use hard coded string values when testing. Made convertMapToString() private.

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6382.
This commit is contained in:
SaumyaGurtu 2022-09-19 11:57:27 +05:30 committed by Nathan Gough
parent 2a85264dd5
commit 747b5d4d9e
2 changed files with 37 additions and 54 deletions

View File

@ -43,6 +43,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"}) @Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -170,7 +171,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
final BasicProperties amqpProperties = response.getProps(); final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope(); final Envelope envelope = response.getEnvelope();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(), final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
context.getProperty(HEADER_SEPARATOR).toString().charAt(0)); context.getProperty(HEADER_SEPARATOR).toString());
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue()); session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@ -184,12 +185,12 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
} }
} }
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, Character valueSeperatorForHeaders) { private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces, String valueSeperatorForHeaders) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId()); addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding()); addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType()); addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces,valueSeperatorForHeaders)); addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces, valueSeperatorForHeaders));
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode()); addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority()); addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId()); addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
@ -213,31 +214,21 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
attributes.put(attributeName, value.toString()); attributes.put(attributeName, value.toString());
} }
private String buildHeaders(Map<String, Object> headers, boolean removeCurlyBraces,Character valueSeparatorForHeaders) { private String buildHeaders(Map<String, Object> headers, boolean removeCurlyBraces, String valueSeparatorForHeaders) {
if (headers == null) { if (headers == null) {
return null; return null;
} }
String headerString = convertMapToString(headers,valueSeparatorForHeaders); String headerString = convertMapToString(headers,valueSeparatorForHeaders);
if (!removeCurlyBraces) { if (!removeCurlyBraces) {
StringBuilder stringBuilder = new StringBuilder(); headerString = "{" + headerString + "}";
stringBuilder.append("{").append(headerString).append("}");
headerString = stringBuilder.toString();
} }
return headerString; return headerString;
} }
public static String convertMapToString(Map<String, Object> headers,Character valueSeparatorForHeaders) { private static String convertMapToString(Map<String, Object> headers, String valueSeparatorForHeaders) {
StringBuilder stringBuilder = new StringBuilder(); return headers.entrySet().stream().map(e -> (e.getValue()!= null) ? e.getKey() + "=" + e.getValue(): e.getKey())
boolean notFirst = false; .collect(Collectors.joining(valueSeparatorForHeaders));
for (Map.Entry<String, Object> entry : headers.entrySet()) {
if (notFirst) {
stringBuilder.append(valueSeparatorForHeaders);
}
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue().toString());
notFirst = true;
}
return stringBuilder.toString();
} }
@Override @Override

View File

@ -16,11 +16,16 @@
*/ */
package org.apache.nifi.amqp.processors; package org.apache.nifi.amqp.processors;
import static org.junit.jupiter.api.Assertions.assertEquals; import com.rabbitmq.client.AMQP;
import static org.junit.jupiter.api.Assertions.assertFalse; import com.rabbitmq.client.Connection;
import static org.junit.jupiter.api.Assertions.assertNotNull; import com.rabbitmq.client.MessageProperties;
import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.nifi.logging.ComponentLog;
import static org.mockito.Mockito.mock; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -30,19 +35,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.nifi.logging.ComponentLog; import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.nifi.processor.ProcessContext; import static org.junit.jupiter.api.Assertions.assertFalse;
import org.apache.nifi.processor.exception.ProcessException; import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.apache.nifi.util.MockFlowFile; import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.nifi.util.TestRunner; import static org.mockito.Mockito.mock;
import org.apache.nifi.util.TestRunners;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.jupiter.api.Test;
public class ConsumeAMQPTest { public class ConsumeAMQPTest {
@ -170,8 +168,11 @@ public class ConsumeAMQPTest {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Map<String, Object> headersMap = new HashMap<>(); final Map<String, Object> headersMap = new HashMap<>();
headersMap.put("foo1","bar,bar"); headersMap.put("foo1", "bar,bar");
headersMap.put("foo2","bar,bar"); headersMap.put("foo2", "bar,bar");
headersMap.put("foo3", "null");
headersMap.put("foo4", null);
final String EXPECTED_RESULT = "{foo1=bar,bar|foo2=bar,bar|foo3=null|foo4}";
AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap); builderBasicProperties.headers(headersMap);
@ -190,8 +191,7 @@ public class ConsumeAMQPTest {
successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange"); successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers"); String headers = successFF.getAttribute("amqp$headers");
Map<String, String> properties = convertStringToMap(headers.substring(1,headers.length()-1),"|"); assertEquals(EXPECTED_RESULT, headers);
assertEquals(headersMap,properties);
} }
} }
@Test @Test
@ -239,6 +239,7 @@ public class ConsumeAMQPTest {
final Map<String, Object> headersMap = new HashMap<>(); final Map<String, Object> headersMap = new HashMap<>();
headersMap.put("key1","(bar,bar)"); headersMap.put("key1","(bar,bar)");
headersMap.put("key2","(bar,bar)"); headersMap.put("key2","(bar,bar)");
final String EXPECTED_RESULT = "key1=(bar,bar)|key2=(bar,bar)";
AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap); builderBasicProperties.headers(headersMap);
@ -259,8 +260,7 @@ public class ConsumeAMQPTest {
successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange"); successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers"); String headers = successFF.getAttribute("amqp$headers");
Map<String, String> properties = convertStringToMap(headers,"|"); assertEquals(EXPECTED_RESULT, headers);
assertEquals(headersMap,properties);
} }
} }
@ -271,10 +271,14 @@ public class ConsumeAMQPTest {
final Map<String, Object> headersMap = new HashMap<>(); final Map<String, Object> headersMap = new HashMap<>();
headersMap.put("key1","bar"); headersMap.put("key1","bar");
headersMap.put("key2","bar2"); headersMap.put("key2","bar2");
headersMap.put("key3","");
headersMap.put("key4", null);
final String EXPECTED_RESULT = "{key1=bar,key2=bar2,key3=,key4}";
AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap); builderBasicProperties.headers(headersMap);
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
@ -289,22 +293,10 @@ public class ConsumeAMQPTest {
successFF.assertAttributeEquals("amqp$routingKey", "key1"); successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange"); successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers"); String headers = successFF.getAttribute("amqp$headers");
Map<String, String> properties = convertStringToMap(headers.substring(1,headers.length()-1),","); assertEquals(EXPECTED_RESULT, headers);
assertEquals(headersMap,properties);
} }
} }
private Map<String,String> convertStringToMap(String map,String splitCharacter){
Map<String, String> headers = new HashMap<>();
String[] pairs = map.split(Pattern.quote(String.valueOf(splitCharacter)));
for (String pair : pairs) {
String[] keyValue = pair.split("=", 2);
assertEquals(2,keyValue.length);
headers.put(keyValue[0].trim(), keyValue[1].trim());
}
return headers;
}
private TestRunner initTestRunner(ConsumeAMQP proc) { private TestRunner initTestRunner(ConsumeAMQP proc) {
TestRunner runner = TestRunners.newTestRunner(proc); TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672"); runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");