ARTEMIS-2332 fix sendMessage w/headers from console

This commit is contained in:
Justin Bertram 2019-05-07 12:46:28 -05:00 committed by Clebert Suconic
parent f7b3dd46d7
commit 8257bac49b
3 changed files with 81 additions and 9 deletions

View File

@ -133,7 +133,7 @@ public abstract class AbstractControl extends StandardMBean {
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
for (Map.Entry<String, String> header : headers.entrySet()) {
message.putStringProperty(new SimpleString(header.getKey()), new SimpleString(headers.get(header.getValue())));
message.putStringProperty(header.getKey(), header.getValue());
}
}
message.setType((byte) type);

View File

@ -20,7 +20,9 @@ import javax.json.JsonArray;
import javax.json.JsonString;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -392,6 +394,33 @@ public class AddressControlTest extends ManagementTestBase {
assertEquals("test", new String(buffer));
}
@Test
public void testSendMessageWithProperties() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
Assert.assertEquals(0, addressControl.getQueueNames().length);
session.createQueue(address, RoutingType.ANYCAST, address);
Assert.assertEquals(1, addressControl.getQueueNames().length);
Map<String, String> headers = new HashMap<>();
headers.put("myProp1", "myValue1");
headers.put("myProp2", "myValue2");
addressControl.sendMessage(headers, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
Wait.waitFor(() -> addressControl.getMessageCount() == 1);
Assert.assertEquals(1, addressControl.getMessageCount());
ClientConsumer consumer = session.createConsumer(address);
ClientMessage message = consumer.receive(500);
assertNotNull(message);
byte[] buffer = new byte[message.getBodyBuffer().readableBytes()];
message.getBodyBuffer().readBytes(buffer);
assertEquals("test", new String(buffer));
assertEquals("myValue1", message.getStringProperty("myProp1"));
assertEquals("myValue2", message.getStringProperty("myProp2"));
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -16,8 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularDataSupport;
import javax.transaction.xa.XAResource;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -32,12 +37,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
import javax.transaction.xa.XAResource;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
@ -78,6 +77,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES;
@RunWith(value = Parameterized.class)
public class QueueControlTest extends ManagementTestBase {
@ -2902,6 +2904,47 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testSendMessageWithProperties() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
QueueControl queueControl = createManagementControl(address, queue);
Map<String, String> headers = new HashMap<>();
headers.put("myProp1", "myValue1");
headers.put("myProp2", "myValue2");
queueControl.sendMessage(headers, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
Wait.assertEquals(2, () -> getMessageCount(queueControl));
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(2, browse.length);
byte[] body = (byte[]) browse[0].get(BODY);
for (Object prop : ((TabularDataSupport)browse[0].get(STRING_PROPERTIES)).values()) {
CompositeDataSupport cds = (CompositeDataSupport) prop;
Assert.assertTrue(headers.containsKey(cds.get("key")));
Assert.assertTrue(headers.containsValue(cds.get("value")));
}
Assert.assertNotNull(body);
Assert.assertEquals(new String(body), "theBody");
body = (byte[]) browse[1].get(BODY);
Assert.assertNotNull(body);
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testResetGroups() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();