ARTEMIS-1785 Producer CLI would throw NPEs if using Text Messages

This commit is contained in:
Clebert Suconic 2018-04-04 15:19:02 -04:00
parent 0bd99dfff7
commit dae1b7de1c
6 changed files with 95 additions and 43 deletions

View File

@ -21,6 +21,7 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
@ -157,11 +158,23 @@ public class ConsumerThread extends Thread {
System.out.println("Received " + count);
}
}
if (verbose) {
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
System.out.println("Message:" + msg);
System.out.println("Received a message with " + bytes.length);
}
if (msg instanceof TextMessage) {
String text = ((TextMessage) msg).getText();
System.out.println("Received text sized at " + text.length());
}
if (msg instanceof ObjectMessage) {
Object obj = ((ObjectMessage) msg).getObject();
System.out.println("Received object " + obj.toString().length());
}
}
received++;
} else {

View File

@ -29,6 +29,8 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "producer", description = "It will send messages to an instance")
public class Producer extends DestAbstract {
public static final String DEMO_TEXT = "demo.txt";
@Option(name = "--non-persistent", description = "It will send messages non persistently")
boolean nonpersistent = false;
@ -38,6 +40,9 @@ public class Producer extends DestAbstract {
@Option(name = "--text-size", description = "Size of each textMessage (The producer will use text message on this case)")
int textMessageSize;
@Option(name = "--object-size", description = "Size of each ObjectMessage (The producer will use object mesasge on this case)")
int objectSize;
@Option(name = "--msgttl", description = "TTL for each message")
long msgTTL = 0L;
@ -63,7 +68,7 @@ public class Producer extends DestAbstract {
threadsArray[i] = new ProducerThread(session, dest, i);
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
setMessageSize(messageSize).setTextMessageSize(textMessageSize).
setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize).
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
setMessageCount(messageCount);
}

View File

@ -44,6 +44,7 @@ public class ProducerThread extends Thread {
boolean persistent = true;
int messageSize = 0;
int textMessageSize;
int objectSize;
long msgTTL = 0L;
String msgGroupID = null;
int transactionBatchSize;
@ -150,9 +151,23 @@ public class ProducerThread extends Thread {
answer = session.createBytesMessage();
((BytesMessage) answer).writeBytes(payload);
} else {
if (textMessageSize > 0) {
if (textMessageSize > 0 || objectSize > 0) {
if (objectSize > 0) {
textMessageSize = objectSize;
}
if (messageText == null) {
messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
String read = readInputStream(getClass().getResourceAsStream(Producer.DEMO_TEXT), textMessageSize, i);
if (read.length() == textMessageSize) {
messageText = read;
} else {
StringBuffer buffer = new StringBuffer(read);
while (buffer.length() < textMessageSize) {
buffer.append(read);
}
messageText = buffer.toString();
}
}
} else if (payloadUrl != null) {
messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
@ -161,8 +176,13 @@ public class ProducerThread extends Thread {
} else {
messageText = createDefaultMessage(i);
}
if (objectSize > 0) {
answer = session.createObjectMessage(messageText);
} else {
answer = session.createTextMessage(messageText);
}
}
if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
answer.setStringProperty("JMSXGroupID", msgGroupID);
}
@ -341,4 +361,13 @@ public class ProducerThread extends Thread {
this.verbose = verbose;
return this;
}
public int getObjectSize() {
return objectSize;
}
public ProducerThread setObjectSize(int objectSize) {
this.objectSize = objectSize;
return this;
}
}

View File

@ -583,6 +583,10 @@ public class ArtemisTest extends CliTestBase {
assertEquals(Integer.valueOf(100), Artemis.internalExecute("producer", "--message-count", "100", "--user", "admin", "--password", "admin"));
assertEquals(Integer.valueOf(100), Artemis.internalExecute("consumer", "--break-on-null", "--receive-timeout", "100", "--user", "admin", "--password", "admin"));
assertEquals(Integer.valueOf(10), Artemis.internalExecute("producer", "--text-size", "500", "--message-count", "10", "--user", "admin", "--password", "admin"));
assertEquals(Integer.valueOf(10), Artemis.internalExecute("consumer", "--break-on-null", "--receive-timeout", "100", "--user", "admin", "--password", "admin"));
assertEquals(Integer.valueOf(10), Artemis.internalExecute("producer", "--message-size", "500", "--message-count", "10", "--user", "admin", "--password", "admin"));
assertEquals(Integer.valueOf(10), Artemis.internalExecute("consumer", "--break-on-null", "--receive-timeout", "100", "--user", "admin", "--password", "admin"));
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection("admin", "admin");

View File

@ -20,6 +20,7 @@ package org.apache.activemq.cli.test;
import java.io.InputStream;
import org.apache.activemq.artemis.cli.commands.Create;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.junit.Assert;
import org.junit.Test;
@ -30,43 +31,43 @@ public class StreamClassPathTest {
*/
@Test
public void testFindStreams() throws Exception {
openStream(Create.BIN_ARTEMIS_CMD);
openStream(Create.BIN_ARTEMIS_SERVICE_EXE);
openStream(Create.BIN_ARTEMIS_SERVICE_XML);
openStream("etc/" + Create.ETC_ARTEMIS_PROFILE_CMD);
openStream(Create.BIN_ARTEMIS);
openStream(Create.BIN_ARTEMIS_SERVICE);
openStream("etc/" + Create.ETC_ARTEMIS_PROFILE);
openStream("etc/" + Create.ETC_LOGGING_PROPERTIES);
openStream("etc/" + Create.ETC_BOOTSTRAP_XML);
openStream("etc/" + Create.ETC_MANAGEMENT_XML);
openStream("etc/" + Create.ETC_BROKER_XML);
openStream("etc/" + Create.ETC_ARTEMIS_ROLES_PROPERTIES);
openStream("etc/" + Create.ETC_ARTEMIS_USERS_PROPERTIES);
openStream(Create.ETC_REPLICATED_SETTINGS_TXT);
openStream(Create.ETC_REPLICATED_SETTINGS_TXT);
openStream(Create.ETC_SHARED_STORE_SETTINGS_TXT);
openStream(Create.ETC_CLUSTER_SECURITY_SETTINGS_TXT);
openStream(Create.ETC_CLUSTER_SETTINGS_TXT);
openStream(Create.ETC_CONNECTOR_SETTINGS_TXT);
openStream(Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT);
openStream(Create.ETC_JOURNAL_BUFFER_SETTINGS);
openStream(Create.ETC_AMQP_ACCEPTOR_TXT);
openStream(Create.ETC_MQTT_ACCEPTOR_TXT);
openStream(Create.ETC_HORNETQ_ACCEPTOR_TXT);
openStream(Create.ETC_STOMP_ACCEPTOR_TXT);
openStream(Create.ETC_PING_TXT);
openStream(Create.ETC_COMMENTED_PING_TXT);
openStream(Create.ETC_GLOBAL_MAX_SPECIFIED_TXT);
openStream(Create.ETC_GLOBAL_MAX_DEFAULT_TXT);
openStream("etc/" + Create.ETC_JOLOKIA_ACCESS_XML);
openStream(Create.ETC_DATABASE_STORE_TXT);
testStream(Create.class, Create.BIN_ARTEMIS_CMD);
testStream(Create.class, Create.BIN_ARTEMIS_SERVICE_EXE);
testStream(Create.class, Create.BIN_ARTEMIS_SERVICE_XML);
testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_PROFILE_CMD);
testStream(Create.class, Create.BIN_ARTEMIS);
testStream(Create.class, Create.BIN_ARTEMIS_SERVICE);
testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_PROFILE);
testStream(Create.class, "etc/" + Create.ETC_LOGGING_PROPERTIES);
testStream(Create.class, "etc/" + Create.ETC_BOOTSTRAP_XML);
testStream(Create.class, "etc/" + Create.ETC_MANAGEMENT_XML);
testStream(Create.class, "etc/" + Create.ETC_BROKER_XML);
testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_ROLES_PROPERTIES);
testStream(Create.class, "etc/" + Create.ETC_ARTEMIS_USERS_PROPERTIES);
testStream(Create.class, Create.ETC_REPLICATED_SETTINGS_TXT);
testStream(Create.class, Create.ETC_REPLICATED_SETTINGS_TXT);
testStream(Create.class, Create.ETC_SHARED_STORE_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CLUSTER_SECURITY_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CLUSTER_SETTINGS_TXT);
testStream(Create.class, Create.ETC_CONNECTOR_SETTINGS_TXT);
testStream(Create.class, Create.ETC_BOOTSTRAP_WEB_SETTINGS_TXT);
testStream(Create.class, Create.ETC_JOURNAL_BUFFER_SETTINGS);
testStream(Create.class, Create.ETC_AMQP_ACCEPTOR_TXT);
testStream(Create.class, Create.ETC_MQTT_ACCEPTOR_TXT);
testStream(Create.class, Create.ETC_HORNETQ_ACCEPTOR_TXT);
testStream(Create.class, Create.ETC_STOMP_ACCEPTOR_TXT);
testStream(Create.class, Create.ETC_PING_TXT);
testStream(Create.class, Create.ETC_COMMENTED_PING_TXT);
testStream(Create.class, Create.ETC_GLOBAL_MAX_SPECIFIED_TXT);
testStream(Create.class, Create.ETC_GLOBAL_MAX_DEFAULT_TXT);
testStream(Create.class, "etc/" + Create.ETC_JOLOKIA_ACCESS_XML);
testStream(Create.class, Create.ETC_DATABASE_STORE_TXT);
testStream(Producer.class, Producer.DEMO_TEXT);
}
private void openStream(String source) throws Exception {
Create create = new Create();
InputStream in = create.openStream(source);
private void testStream(Class clazz, String source) throws Exception {
InputStream in = clazz.getResourceAsStream(source);
Assert.assertNotNull(source + " not found", in);
in.close();
}