This closes #2662
This commit is contained in:
commit
f7d5c001ad
|
@ -133,7 +133,7 @@ public abstract class AbstractControl extends StandardMBean {
|
|||
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
|
||||
if (headers != null) {
|
||||
for (Map.Entry<String, String> header : headers.entrySet()) {
|
||||
message.putStringProperty(new SimpleString(header.getKey()), new SimpleString(headers.get(header.getValue())));
|
||||
message.putStringProperty(header.getKey(), header.getValue());
|
||||
}
|
||||
}
|
||||
message.setType((byte) type);
|
||||
|
|
|
@ -20,7 +20,9 @@ import javax.json.JsonArray;
|
|||
import javax.json.JsonString;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
|
@ -392,6 +394,33 @@ public class AddressControlTest extends ManagementTestBase {
|
|||
assertEquals("test", new String(buffer));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMessageWithProperties() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
session.createAddress(address, RoutingType.ANYCAST, false);
|
||||
|
||||
AddressControl addressControl = createManagementControl(address);
|
||||
Assert.assertEquals(0, addressControl.getQueueNames().length);
|
||||
session.createQueue(address, RoutingType.ANYCAST, address);
|
||||
Assert.assertEquals(1, addressControl.getQueueNames().length);
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("myProp1", "myValue1");
|
||||
headers.put("myProp2", "myValue2");
|
||||
addressControl.sendMessage(headers, Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, null, null);
|
||||
|
||||
Wait.waitFor(() -> addressControl.getMessageCount() == 1);
|
||||
Assert.assertEquals(1, addressControl.getMessageCount());
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(address);
|
||||
ClientMessage message = consumer.receive(500);
|
||||
assertNotNull(message);
|
||||
byte[] buffer = new byte[message.getBodyBuffer().readableBytes()];
|
||||
message.getBodyBuffer().readBytes(buffer);
|
||||
assertEquals("test", new String(buffer));
|
||||
assertEquals("myValue1", message.getStringProperty("myProp1"));
|
||||
assertEquals("myValue2", message.getStringProperty("myProp2"));
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
|
|
|
@ -16,8 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.management;
|
||||
|
||||
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
import javax.management.Notification;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
@ -32,12 +37,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
import javax.management.Notification;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.transaction.xa.XAResource;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
|
@ -78,6 +77,9 @@ import org.junit.Test;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
||||
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class QueueControlTest extends ManagementTestBase {
|
||||
|
||||
|
@ -2902,6 +2904,47 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
Assert.assertEquals(new String(body), "theBody");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMessageWithProperties() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
|
||||
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
|
||||
Map<String, String> headers = new HashMap<>();
|
||||
headers.put("myProp1", "myValue1");
|
||||
headers.put("myProp2", "myValue2");
|
||||
queueControl.sendMessage(headers, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
|
||||
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
|
||||
|
||||
Wait.assertEquals(2, () -> getMessageCount(queueControl));
|
||||
|
||||
// the message IDs are set on the server
|
||||
CompositeData[] browse = queueControl.browse(null);
|
||||
|
||||
Assert.assertEquals(2, browse.length);
|
||||
|
||||
byte[] body = (byte[]) browse[0].get(BODY);
|
||||
|
||||
for (Object prop : ((TabularDataSupport)browse[0].get(STRING_PROPERTIES)).values()) {
|
||||
CompositeDataSupport cds = (CompositeDataSupport) prop;
|
||||
Assert.assertTrue(headers.containsKey(cds.get("key")));
|
||||
Assert.assertTrue(headers.containsValue(cds.get("value")));
|
||||
}
|
||||
|
||||
Assert.assertNotNull(body);
|
||||
|
||||
Assert.assertEquals(new String(body), "theBody");
|
||||
|
||||
body = (byte[]) browse[1].get(BODY);
|
||||
|
||||
Assert.assertNotNull(body);
|
||||
|
||||
Assert.assertEquals(new String(body), "theBody");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetGroups() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
|
|
Loading…
Reference in New Issue