mirror of https://github.com/apache/nifi.git
NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors
This commit is contained in:
parent
602fa7a860
commit
eabf2d52fc
|
@ -48,7 +48,6 @@ import javax.jms.MessageConsumer;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
|
||||||
import org.apache.nifi.logging.ProcessorLog;
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -59,10 +58,8 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
import org.apache.nifi.processors.standard.util.JmsFactory;
|
import org.apache.nifi.processors.standard.util.JmsFactory;
|
||||||
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
|
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
|
||||||
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
|
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
|
||||||
import org.apache.nifi.util.BooleanHolder;
|
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||||
import org.apache.nifi.util.IntegerHolder;
|
import org.apache.nifi.util.IntegerHolder;
|
||||||
import org.apache.nifi.util.LongHolder;
|
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
|
||||||
import org.apache.nifi.util.StopWatch;
|
import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
public abstract class JmsConsumer extends AbstractProcessor {
|
public abstract class JmsConsumer extends AbstractProcessor {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BY
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
|
||||||
|
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MAP;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
|
||||||
|
@ -257,18 +258,22 @@ public class PutJMS extends AbstractProcessor {
|
||||||
switch (context.getProperty(MESSAGE_TYPE).getValue()) {
|
switch (context.getProperty(MESSAGE_TYPE).getValue()) {
|
||||||
case MSG_TYPE_EMPTY: {
|
case MSG_TYPE_EMPTY: {
|
||||||
message = jmsSession.createTextMessage("");
|
message = jmsSession.createTextMessage("");
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case MSG_TYPE_STREAM: {
|
case MSG_TYPE_STREAM: {
|
||||||
final StreamMessage streamMessage = jmsSession.createStreamMessage();
|
final StreamMessage streamMessage = jmsSession.createStreamMessage();
|
||||||
streamMessage.writeBytes(messageContent);
|
streamMessage.writeBytes(messageContent);
|
||||||
message = streamMessage;
|
message = streamMessage;
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case MSG_TYPE_TEXT: {
|
case MSG_TYPE_TEXT: {
|
||||||
message = jmsSession.createTextMessage(new String(messageContent, UTF8));
|
message = jmsSession.createTextMessage(new String(messageContent, UTF8));
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
case MSG_TYPE_MAP: {
|
||||||
|
message = jmsSession.createMapMessage();
|
||||||
|
break;
|
||||||
|
}
|
||||||
case MSG_TYPE_BYTE:
|
case MSG_TYPE_BYTE:
|
||||||
default: {
|
default: {
|
||||||
final BytesMessage bytesMessage = jmsSession.createBytesMessage();
|
final BytesMessage bytesMessage = jmsSession.createBytesMessage();
|
||||||
|
|
|
@ -33,6 +33,7 @@ public class JmsProperties {
|
||||||
public static final String MSG_TYPE_BYTE = "byte";
|
public static final String MSG_TYPE_BYTE = "byte";
|
||||||
public static final String MSG_TYPE_TEXT = "text";
|
public static final String MSG_TYPE_TEXT = "text";
|
||||||
public static final String MSG_TYPE_STREAM = "stream";
|
public static final String MSG_TYPE_STREAM = "stream";
|
||||||
|
public static final String MSG_TYPE_MAP = "map";
|
||||||
public static final String MSG_TYPE_EMPTY = "empty";
|
public static final String MSG_TYPE_EMPTY = "empty";
|
||||||
|
|
||||||
// Standard JMS Properties
|
// Standard JMS Properties
|
||||||
|
@ -142,7 +143,7 @@ public class JmsProperties {
|
||||||
.name("Message Type")
|
.name("Message Type")
|
||||||
.description("The Type of JMS Message to Construct")
|
.description("The Type of JMS Message to Construct")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_EMPTY)
|
.allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
|
||||||
.defaultValue(MSG_TYPE_BYTE)
|
.defaultValue(MSG_TYPE_BYTE)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()
|
||||||
|
|
Loading…
Reference in New Issue