From b531b97a4da2fc35b16e5bed0d24028865383ee4 Mon Sep 17 00:00:00 2001 From: Stephen Harper Date: Sat, 26 Mar 2016 20:06:50 +0000 Subject: [PATCH] NIFI-1686 - NiFi is unable to populate over 1/4 of AMQP properties from flow properties --- .../nifi/amqp/processors/AMQPUtils.java | 169 ++++++++++++++++-- .../nifi/amqp/processors/PublishAMQP.java | 81 +++++++-- .../nifi/amqp/processors/PublishAMQPTest.java | 41 +++++ 3 files changed, 255 insertions(+), 36 deletions(-) diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java index 618965af7d..6cfa2c7176 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java @@ -18,9 +18,8 @@ package org.apache.nifi.amqp.processors; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.util.Arrays; +import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.nifi.flowfile.FlowFile; @@ -43,16 +42,48 @@ abstract class AMQPUtils { private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class); - private final static List propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding", - "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo", - "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId", - "amqp$clusterId"); - /** - * Returns a {@link List} of AMQP property names defined in - * {@link BasicProperties} - */ - public static List getAmqpPropertyNames() { - return propertyNames; + public enum PropertyNames { + CONTENT_TYPE (AMQP_PROP_PREFIX + "contentType"), + CONTENT_ENCODING (AMQP_PROP_PREFIX + "contentEncoding"), + HEADERS (AMQP_PROP_PREFIX + "headers"), + DELIVERY_MODE (AMQP_PROP_PREFIX + "deliveryMode"), + PRIORITY (AMQP_PROP_PREFIX + "priority"), + CORRELATION_ID (AMQP_PROP_PREFIX + "correlationId"), + REPLY_TO (AMQP_PROP_PREFIX + "replyTo"), + EXPIRATION (AMQP_PROP_PREFIX + "expiration"), + MESSAGE_ID (AMQP_PROP_PREFIX + "messageId"), + TIMESTAMP (AMQP_PROP_PREFIX + "timestamp"), + TYPE (AMQP_PROP_PREFIX + "type"), + USER_ID (AMQP_PROP_PREFIX + "userId"), + APP_ID (AMQP_PROP_PREFIX + "appId"), + CLUSTER_ID (AMQP_PROP_PREFIX + "clusterId"); + + PropertyNames(String value) { + this.value = value; + } + + private final String value; + + private static final Map lookup = new HashMap<>(); + + public static PropertyNames fromValue(String s) { + return lookup.get(s); + } + + static { + for(PropertyNames propertyNames : PropertyNames.values()) { + lookup.put(propertyNames.getValue(), propertyNames); + } + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return value; + } } /** @@ -69,14 +100,14 @@ abstract class AMQPUtils { if (amqpProperties != null){ try { Method[] methods = BasicProperties.class.getDeclaredMethods(); - Map attributes = new HashMap(); + Map attributes = new HashMap<>(); for (Method method : methods) { if (Modifier.isPublic(method.getModifiers()) && method.getName().startsWith("get")) { Object amqpPropertyValue = method.invoke(amqpProperties); if (amqpPropertyValue != null) { String propertyName = extractPropertyNameFromMethod(method); if (isValidAmqpPropertyName(propertyName)) { - if (propertyName.equals(AMQP_PROP_PREFIX + "contentType")) { + if (propertyName.equals(PropertyNames.CONTENT_TYPE.getValue())) { attributes.put(CoreAttributes.MIME_TYPE.key(), amqpPropertyValue.toString()); } attributes.put(propertyName, amqpPropertyValue.toString()); @@ -95,14 +126,12 @@ abstract class AMQPUtils { /** * Will validate if provided name corresponds to valid AMQP property. * - * @see AMQPUtils#getAmqpPropertyNames() - * * @param name * the name of the property * @return 'true' if valid otherwise 'false' */ public static boolean isValidAmqpPropertyName(String name) { - return propertyNames.contains(name); + return PropertyNames.fromValue(name) != null; } /** @@ -113,4 +142,110 @@ abstract class AMQPUtils { c[0] = Character.toLowerCase(c[0]); return AMQP_PROP_PREFIX + new String(c); } + + /** + * Will validate if provided amqpPropValue can be converted to a {@link Map}. + * Should be passed in the format: amqp$headers=key=value,key=value etc. + * + * @param amqpPropValue + * the value of the property + * @return {@link Map} if valid otherwise null + */ + public static Map validateAMQPHeaderProperty(String amqpPropValue){ + String[] strEntries = amqpPropValue.split(","); + Map headers = new HashMap<>(); + for (String strEntry : strEntries) { + String[] kv = strEntry.split("="); + if (kv.length == 2) { + headers.put(kv[0].trim(), kv[1].trim()); + } else { + logger.warn("Malformed key value pair for AMQP header property: " + amqpPropValue); + } + } + + return headers; + } + + /** + * Will validate if provided amqpPropValue can be converted to an {@link Integer}, and that its + * value is 1 or 2. + * + * @param amqpPropValue + * the value of the property + * @return {@link Integer} if valid otherwise null + */ + public static Integer validateAMQPDeliveryModeProperty(String amqpPropValue){ + Integer deliveryMode = toInt(amqpPropValue); + + if (deliveryMode == null || !(deliveryMode == 1 || deliveryMode == 2)) { + logger.warn("Invalid value for AMQP deliveryMode property: " + amqpPropValue); + } + return deliveryMode; + } + + /** + * Will validate if provided amqpPropValue can be converted to an {@link Integer} and that its + * value is between 0 and 9 (inclusive). + * + * @param amqpPropValue + * the value of the property + * @return {@link Integer} if valid otherwise null + */ + public static Integer validateAMQPPriorityProperty(String amqpPropValue){ + Integer priority = toInt(amqpPropValue); + + if (priority == null || !(priority >= 0 && priority <= 9)){ + logger.warn("Invalid value for AMQP priority property: " + amqpPropValue); + } + return priority; + } + + /** + * Will validate if provided amqpPropValue can be converted to a {@link Date}. + * + * @param amqpPropValue + * the value of the property + * @return {@link Date} if valid otherwise null + */ + public static Date validateAMQPTimestampProperty(String amqpPropValue){ + Long timestamp = toLong(amqpPropValue); + + if (timestamp == null){ + logger.warn("Invalid value for AMQP timestamp property: " + amqpPropValue); + return null; + } + + //milliseconds are lost when sending to AMQP + return new Date(timestamp); + } + + /** + * Takes a {@link String} and tries to convert to an {@link Integer}. + * + * @param strVal + * the value to be converted + * @return {@link Integer} if valid otherwise null + */ + private static Integer toInt(String strVal){ + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException aE){ + return null; + } + } + + /** + * Takes a {@link String} and tries to convert to a {@link Long}. + * + * @param strVal + * the value to be converted + * @return {@link Long} if valid otherwise null + */ + private static Long toLong(String strVal){ + try { + return Long.parseLong(strVal); + } catch (NumberFormatException aE){ + return null; + } + } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index b1e442e0ed..85116c2540 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -18,14 +18,12 @@ package org.apache.nifi.amqp.processors; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -97,8 +95,6 @@ public class PublishAMQP extends AbstractAMQPProcessor { private final static Set relationships; - private final static List amqpPropertyNames = AMQPUtils.getAmqpPropertyNames(); - /* * Will ensure that the list of property descriptors is build only once. * Will also create a Set of relationships @@ -118,11 +114,11 @@ public class PublishAMQP extends AbstractAMQPProcessor { /** * Will construct AMQP message by extracting its body from the incoming - * {@link FlowFile}. AMQP {@link Properties} will be extracted from the + * {@link FlowFile}. AMQP Properties will be extracted from the * {@link FlowFile} and converted to {@link BasicProperties} to be sent * along with the message. Upon success the incoming {@link FlowFile} is - * transfered to 'success' {@link Relationship} and upon failure FlowFile is - * penalized and transfered to the 'failure' {@link Relationship} + * transferred to 'success' {@link Relationship} and upon failure FlowFile is + * penalized and transferred to the 'failure' {@link Relationship} *
* NOTE: Attributes extracted from {@link FlowFile} are considered * candidates for AMQP properties if their names are prefixed with @@ -195,26 +191,73 @@ public class PublishAMQP extends AbstractAMQPProcessor { * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes * extracted from {@link FlowFile} are considered candidates for AMQP * properties if their names are prefixed with - * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml) + * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml). + * + * Some fields require a specific format and are validated: + * + * {@link AMQPUtils#validateAMQPHeaderProperty} + * {@link AMQPUtils#validateAMQPDeliveryModeProperty} + * {@link AMQPUtils#validateAMQPPriorityProperty} + * {@link AMQPUtils#validateAMQPTimestampProperty} */ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) { Map attributes = flowFile.getAttributes(); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); for (Entry attributeEntry : attributes.entrySet()) { if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) { - String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1]; + String amqpPropName = attributeEntry.getKey(); String amqpPropValue = attributeEntry.getValue(); - try { - if (amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) { - Method m = builder.getClass().getDeclaredMethod(amqpPropName, String.class); - m.invoke(builder, amqpPropValue); - } else { - getLogger().warn("Unrecogninsed AMQP property '" + amqpPropName + "', will ignore."); + + AMQPUtils.PropertyNames propertyNames = AMQPUtils.PropertyNames.fromValue(amqpPropName); + + if (propertyNames != null) { + switch (propertyNames){ + case CONTENT_TYPE: + builder.contentType(amqpPropValue); + break; + case CONTENT_ENCODING: + builder.contentEncoding(amqpPropValue); + break; + case HEADERS: + builder.headers(AMQPUtils.validateAMQPHeaderProperty(amqpPropValue)); + break; + case DELIVERY_MODE: + builder.deliveryMode(AMQPUtils.validateAMQPDeliveryModeProperty(amqpPropValue)); + break; + case PRIORITY: + builder.priority(AMQPUtils.validateAMQPPriorityProperty(amqpPropValue)); + break; + case CORRELATION_ID: + builder.correlationId(amqpPropValue); + break; + case REPLY_TO: + builder.replyTo(amqpPropValue); + break; + case EXPIRATION: + builder.expiration(amqpPropValue); + break; + case MESSAGE_ID: + builder.messageId(amqpPropValue); + break; + case TIMESTAMP: + builder.timestamp(AMQPUtils.validateAMQPTimestampProperty(amqpPropValue)); + break; + case TYPE: + builder.type(amqpPropValue); + break; + case USER_ID: + builder.userId(amqpPropValue); + break; + case APP_ID: + builder.appId(amqpPropValue); + break; + case CLUSTER_ID: + builder.clusterId(amqpPropValue); + break; } - } catch (Exception e) { - // should really never happen since it should be caught by - // the above IF. - getLogger().warn("Failed while trying to build AMQP Properties.", e); + + } else { + getLogger().warn("Unrecognised AMQP property '" + amqpPropName + "', will ignore."); } } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index 3a9b8d16bf..fec3d5015f 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -18,9 +18,11 @@ package org.apache.nifi.amqp.processors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,20 @@ public class PublishAMQPTest { Map attributes = new HashMap<>(); attributes.put("foo", "bar"); attributes.put("amqp$contentType", "foo/bar"); + attributes.put("amqp$contentEncoding", "foobar123"); + attributes.put("amqp$headers", "foo=bar,foo2=bar2,foo3"); + attributes.put("amqp$deliveryMode", "1"); + attributes.put("amqp$priority", "2"); + attributes.put("amqp$correlationId", "correlationId123"); + attributes.put("amqp$replyTo", "replyTo123"); + attributes.put("amqp$expiration", "expiration123"); + attributes.put("amqp$messageId", "messageId123"); + attributes.put("amqp$timestamp", "123456789"); + attributes.put("amqp$type", "type123"); + attributes.put("amqp$userId", "userId123"); + attributes.put("amqp$appId", "appId123"); + attributes.put("amqp$clusterId", "clusterId123"); + runner.enqueue("Hello Joe".getBytes(), attributes); runner.run(); @@ -60,6 +76,31 @@ public class PublishAMQPTest { GetResponse msg1 = channel.basicGet("queue1", true); assertNotNull(msg1); assertEquals("foo/bar", msg1.getProps().getContentType()); + + assertEquals("foobar123", msg1.getProps().getContentEncoding()); + + Map headerMap = msg1.getProps().getHeaders(); + + Object foo = headerMap.get("foo"); + Object foo2 = headerMap.get("foo2"); + Object foo3 = headerMap.get("foo3"); + + assertEquals("bar", foo.toString()); + assertEquals("bar2", foo2.toString()); + assertNull(foo3); + + assertEquals((Integer) 1, msg1.getProps().getDeliveryMode()); + assertEquals((Integer) 2, msg1.getProps().getPriority()); + assertEquals("correlationId123", msg1.getProps().getCorrelationId()); + assertEquals("replyTo123", msg1.getProps().getReplyTo()); + assertEquals("expiration123", msg1.getProps().getExpiration()); + assertEquals("messageId123", msg1.getProps().getMessageId()); + assertEquals(new Date(123456789L), msg1.getProps().getTimestamp()); + assertEquals("type123", msg1.getProps().getType()); + assertEquals("userId123", msg1.getProps().getUserId()); + assertEquals("appId123", msg1.getProps().getAppId()); + assertEquals("clusterId123", msg1.getProps().getClusterId()); + assertNotNull(channel.basicGet("queue2", true)); }