From eabf2d52fc48731cd5a78b602d401d441657b3a6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 10 Feb 2015 10:27:17 -0500 Subject: [PATCH] NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors --- .../apache/nifi/processors/standard/JmsConsumer.java | 5 +---- .../org/apache/nifi/processors/standard/PutJMS.java | 11 ++++++++--- .../nifi/processors/standard/util/JmsProperties.java | 3 ++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java index c48d52009f..e4bbaec590 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java @@ -48,7 +48,6 @@ import javax.jms.MessageConsumer; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -59,10 +58,8 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.standard.util.JmsFactory; import org.apache.nifi.processors.standard.util.JmsProcessingSummary; import org.apache.nifi.processors.standard.util.WrappedMessageConsumer; -import org.apache.nifi.util.BooleanHolder; +import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.util.IntegerHolder; -import org.apache.nifi.util.LongHolder; -import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; public abstract class JmsConsumer extends AbstractProcessor { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java index ce5bea5698..99c7bb7c93 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java @@ -40,6 +40,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BY import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY; import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM; import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT; +import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MAP; import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD; import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE; import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT; @@ -257,18 +258,22 @@ public class PutJMS extends AbstractProcessor { switch (context.getProperty(MESSAGE_TYPE).getValue()) { case MSG_TYPE_EMPTY: { message = jmsSession.createTextMessage(""); + break; } - break; case MSG_TYPE_STREAM: { final StreamMessage streamMessage = jmsSession.createStreamMessage(); streamMessage.writeBytes(messageContent); message = streamMessage; + break; } - break; case MSG_TYPE_TEXT: { message = jmsSession.createTextMessage(new String(messageContent, UTF8)); + break; + } + case MSG_TYPE_MAP: { + message = jmsSession.createMapMessage(); + break; } - break; case MSG_TYPE_BYTE: default: { final BytesMessage bytesMessage = jmsSession.createBytesMessage(); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java index 67d0bbfad6..3a5695e732 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java @@ -33,6 +33,7 @@ public class JmsProperties { public static final String MSG_TYPE_BYTE = "byte"; public static final String MSG_TYPE_TEXT = "text"; public static final String MSG_TYPE_STREAM = "stream"; + public static final String MSG_TYPE_MAP = "map"; public static final String MSG_TYPE_EMPTY = "empty"; // Standard JMS Properties @@ -142,7 +143,7 @@ public class JmsProperties { .name("Message Type") .description("The Type of JMS Message to Construct") .required(true) - .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_EMPTY) + .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY) .defaultValue(MSG_TYPE_BYTE) .build(); public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()