ARTEMIS-4648 support typed properties from CLI producer
This commit is contained in:
parent
21368cf741
commit
009687ef7c
|
@ -154,6 +154,11 @@
|
|||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- The johnzon-core and json-api contents are repackaged in -commons,
|
||||
However maven can still need them during tests, which run against
|
||||
|
|
|
@ -61,6 +61,9 @@ public class Producer extends DestAbstract {
|
|||
@Option(names = "--data", description = "Messages will be read from the specified file. Other message options will be ignored.")
|
||||
String file = null;
|
||||
|
||||
@Option(names = "--properties", description = "The properties to set on the message in JSON, e.g.: [{\"type\":\"string\",\"key\":\"myKey1\",\"value\":\"myValue1\"},{\"type\":\"string\",\"key\":\"myKey2\",\"value\":\"myValue2\"}]. Valid types are boolean, byte, short, int, long, float, double, and string.")
|
||||
String properties = null;
|
||||
|
||||
public boolean isNonpersistent() {
|
||||
return nonpersistent;
|
||||
}
|
||||
|
@ -88,6 +91,15 @@ public class Producer extends DestAbstract {
|
|||
return this;
|
||||
}
|
||||
|
||||
public String getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public Producer setProperties(String properties) {
|
||||
this.properties = properties;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getTextMessageSize() {
|
||||
return textMessageSize;
|
||||
}
|
||||
|
@ -205,6 +217,7 @@ public class Producer extends DestAbstract {
|
|||
.setMessageSize(messageSize)
|
||||
.setTextMessageSize(textMessageSize)
|
||||
.setMessage(message)
|
||||
.setProperties(properties)
|
||||
.setObjectSize(objectSize)
|
||||
.setMsgTTL(msgTTL)
|
||||
.setMsgGroupID(msgGroupID)
|
||||
|
|
|
@ -27,10 +27,14 @@ import javax.jms.TextMessage;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.StringReader;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.json.JsonArray;
|
||||
import org.apache.activemq.artemis.json.JsonObject;
|
||||
import org.apache.activemq.artemis.utils.JsonLoader;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
public class ProducerThread extends Thread {
|
||||
|
@ -54,6 +58,7 @@ public class ProducerThread extends Thread {
|
|||
int transactions = 0;
|
||||
final AtomicLong sentCount = new AtomicLong(0);
|
||||
String message = null;
|
||||
String properties = null;
|
||||
String messageText = null;
|
||||
String payloadUrl = null;
|
||||
byte[] payload = null;
|
||||
|
@ -193,9 +198,52 @@ public class ProducerThread extends Thread {
|
|||
|
||||
answer.setLongProperty("count", i);
|
||||
answer.setStringProperty("ThreadSent", threadName);
|
||||
|
||||
if (properties != null && properties.length() != 0) {
|
||||
applyProperties(answer);
|
||||
}
|
||||
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected void applyProperties(Message message) throws JMSException {
|
||||
JsonArray propertyArray = JsonLoader.readArray(new StringReader(properties));
|
||||
for (int j = 0; j < propertyArray.size(); j++) {
|
||||
JsonObject propertyEntry = propertyArray.getJsonObject(j);
|
||||
String type = propertyEntry.getString("type");
|
||||
String key = propertyEntry.getString("key");
|
||||
String value = propertyEntry.getString("value");
|
||||
switch (type.toLowerCase()) {
|
||||
case "boolean":
|
||||
message.setBooleanProperty(key, Boolean.parseBoolean(value));
|
||||
break;
|
||||
case "int":
|
||||
message.setIntProperty(key, Integer.parseInt(value));
|
||||
break;
|
||||
case "long":
|
||||
message.setLongProperty(key, Long.parseLong(value));
|
||||
break;
|
||||
case "byte":
|
||||
message.setByteProperty(key, Byte.parseByte(value));
|
||||
break;
|
||||
case "short":
|
||||
message.setShortProperty(key, Short.parseShort(value));
|
||||
break;
|
||||
case "float":
|
||||
message.setFloatProperty(key, Float.parseFloat(value));
|
||||
break;
|
||||
case "double":
|
||||
message.setDoubleProperty(key, Double.parseDouble(value));
|
||||
break;
|
||||
case "string":
|
||||
message.setStringProperty(key, value);
|
||||
break;
|
||||
default:
|
||||
context.err.println("Unable to set property: " + key + ". Did not recognize type: " + type + ". Supported types are: boolean, int, long, byte, short, float, double, string.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String readInputStream(InputStream is, int size, long messageNumber) throws IOException {
|
||||
try (InputStreamReader reader = new InputStreamReader(is)) {
|
||||
char[] buffer;
|
||||
|
@ -333,6 +381,11 @@ public class ProducerThread extends Thread {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ProducerThread setProperties(String properties) {
|
||||
this.properties = properties;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isRunIndefinitely() {
|
||||
return runIndefinitely;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,234 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.cli.test.TestActionContext;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class ProducerThreadTest {
|
||||
|
||||
ProducerThread producer;
|
||||
Message mockMessage;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
producer = new ProducerThread(null, ActiveMQDestination.createQueue(RandomUtil.randomString()), 0, null);
|
||||
mockMessage = Mockito.mock(Message.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBooleanPropertyTrue() throws Exception {
|
||||
doBooleanPropertyTestImpl("myTrueBoolean", true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBooleanPropertyFalse() throws Exception {
|
||||
doBooleanPropertyTestImpl("myFalseBoolean", false);
|
||||
}
|
||||
|
||||
private void doBooleanPropertyTestImpl(String key, boolean value) throws JMSException {
|
||||
producer.setProperties(createJsonProperty(boolean.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setBooleanProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIntProperty() throws Exception {
|
||||
String key = "myInt";
|
||||
int value = RandomUtil.randomInt();
|
||||
|
||||
producer.setProperties(createJsonProperty(int.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setIntProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
|
||||
// Now with a bad int value
|
||||
producer.setProperties(createJsonProperty(int.class.getSimpleName(), key, "badInt"));
|
||||
try {
|
||||
producer.applyProperties(mockMessage);
|
||||
fail("should have thrown");
|
||||
} catch (NumberFormatException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongProperty() throws Exception {
|
||||
String key = "myLong";
|
||||
long value = RandomUtil.randomLong();
|
||||
|
||||
producer.setProperties(createJsonProperty(long.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setLongProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
|
||||
// Now with a bad long value
|
||||
producer.setProperties(createJsonProperty(long.class.getSimpleName(), key, "badLong"));
|
||||
try {
|
||||
producer.applyProperties(mockMessage);
|
||||
fail("should have thrown");
|
||||
} catch (NumberFormatException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteProperty() throws Exception {
|
||||
String key = "myByte";
|
||||
byte value = RandomUtil.randomByte();
|
||||
|
||||
producer.setProperties(createJsonProperty(byte.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setByteProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
|
||||
// Now with a bad byte value
|
||||
producer.setProperties(createJsonProperty(byte.class.getSimpleName(), key, "128"));
|
||||
try {
|
||||
producer.applyProperties(mockMessage);
|
||||
fail("should have thrown");
|
||||
} catch (NumberFormatException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShortProperty() throws Exception {
|
||||
String key = "myShort";
|
||||
short value = RandomUtil.randomShort();
|
||||
|
||||
producer.setProperties(createJsonProperty(short.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setShortProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
|
||||
// Now with a bad short value
|
||||
producer.setProperties(createJsonProperty(short.class.getSimpleName(), key, "badShort"));
|
||||
try {
|
||||
producer.applyProperties(mockMessage);
|
||||
fail("should have thrown");
|
||||
} catch (NumberFormatException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFloatProperty() throws Exception {
|
||||
String key = "myFloat";
|
||||
float value = RandomUtil.randomFloat();
|
||||
|
||||
producer.setProperties(createJsonProperty(float.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setFloatProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
|
||||
// Now with a bad float value
|
||||
producer.setProperties(createJsonProperty(float.class.getSimpleName(), key, "badFloat"));
|
||||
try {
|
||||
producer.applyProperties(mockMessage);
|
||||
fail("should have thrown");
|
||||
} catch (NumberFormatException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleProperty() throws Exception {
|
||||
String key = "myDouble";
|
||||
double value = RandomUtil.randomDouble();
|
||||
|
||||
producer.setProperties(createJsonProperty(double.class.getSimpleName(), key, String.valueOf(value)));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setDoubleProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
|
||||
// Now with a bad double value
|
||||
producer.setProperties(createJsonProperty(double.class.getSimpleName(), key, "badDouble"));
|
||||
try {
|
||||
producer.applyProperties(mockMessage);
|
||||
fail("should have thrown");
|
||||
} catch (NumberFormatException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringProperty() throws Exception {
|
||||
doStringPropertyTestImpl("string");
|
||||
}
|
||||
|
||||
private void doStringPropertyTestImpl(String type) throws JMSException {
|
||||
String key = "myString";
|
||||
String value = RandomUtil.randomString();
|
||||
|
||||
producer.setProperties(createJsonProperty(type, key, value));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
Mockito.verify(mockMessage).setStringProperty(key, value);
|
||||
Mockito.verifyNoMoreInteractions(mockMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropertyTypeIsCaseInsensitive() throws Exception {
|
||||
doStringPropertyTestImpl("String");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadMessagePropertyType() throws Exception {
|
||||
TestActionContext context = new TestActionContext();
|
||||
producer = new ProducerThread(null, ActiveMQDestination.createQueue(RandomUtil.randomString()), 0, context);
|
||||
|
||||
producer.setProperties(createJsonProperty("myType", "myKey", "myValue"));
|
||||
producer.applyProperties(mockMessage);
|
||||
|
||||
assertEquals("Unable to set property: myKey. Did not recognize type: myType. Supported types are: boolean, int, long, byte, short, float, double, string.\n", context.getStderr());
|
||||
}
|
||||
|
||||
private static String createJsonProperty(String type, String key, String value) {
|
||||
return ("[{'type':'" + type + "','key':'" + key + "','value':'" + value + "'}]").replaceAll("'", "\"");
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.cli.commands.messages.Producer;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -31,6 +32,7 @@ import javax.jms.TextMessage;
|
|||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class CliProducerTest extends CliTestBase {
|
||||
private Connection connection;
|
||||
|
@ -55,8 +57,13 @@ public class CliProducerTest extends CliTestBase {
|
|||
}
|
||||
|
||||
private void produceMessages(String address, String message, int msgCount) throws Exception {
|
||||
produceMessages(address, message, msgCount, null);
|
||||
}
|
||||
|
||||
private void produceMessages(String address, String message, int msgCount, String properties) throws Exception {
|
||||
new Producer()
|
||||
.setMessage(message)
|
||||
.setProperties(properties)
|
||||
.setMessageCount(msgCount)
|
||||
.setDestination(address)
|
||||
.setUser("admin")
|
||||
|
@ -88,6 +95,28 @@ public class CliProducerTest extends CliTestBase {
|
|||
checkSentMessages(session, address, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMessageWithProperties() throws Exception {
|
||||
String address = "test";
|
||||
Session session = createSession(connection);
|
||||
|
||||
String myBooleanKey = "myBooleanKey";
|
||||
String myStringKey = "myStringKey";
|
||||
String myStringValue = RandomUtil.randomString();
|
||||
String propertiesJson = ("[{'type':'boolean','key':'" + myBooleanKey + "','value':'true'},{'type':'string','key':'" + myStringKey + "','value':'" + myStringValue + "'}]").replaceAll("'", "\"");
|
||||
|
||||
produceMessages(address, null, 1, propertiesJson);
|
||||
|
||||
List<Message> consumedMessages = consumeMessages(session, address, 1, false);
|
||||
assertEquals(1, consumedMessages.size());
|
||||
|
||||
Message msg = consumedMessages.get(0);
|
||||
assertTrue(msg.propertyExists(myBooleanKey));
|
||||
assertTrue(msg.getBooleanProperty(myBooleanKey));
|
||||
assertTrue(msg.propertyExists(myStringKey));
|
||||
assertEquals(myStringValue, msg.getStringProperty(myStringKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMessageFQQN() throws Exception {
|
||||
String address = "test";
|
||||
|
|
Loading…
Reference in New Issue