This commit is contained in:
Justin Bertram 2021-01-25 13:22:53 -06:00
commit df7658f5a8
2 changed files with 47 additions and 0 deletions

View File

@ -812,6 +812,20 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override @Override
public abstract int getMemoryEstimate(); public abstract int getMemoryEstimate();
@Override
public Map<String, Object> toPropertyMap() {
Map map = new HashMap<>();
for (SimpleString name : getPropertyNames()) {
Object value = getObjectProperty(name.toString());
//some property is Binary, which is not available for management console
if (value instanceof Binary) {
value = ((Binary)value).getArray();
}
map.put(name.toString(), value);
}
return map;
}
@Override @Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
try { try {

View File

@ -20,6 +20,13 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Binary;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import javax.jms.Connection; import javax.jms.Connection;
@ -83,6 +90,32 @@ public class JMXManagementTest extends JMSClientTestSupport {
connection2.close(); connection2.close();
} }
@Test
public void testGetFirstMessage() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
session.begin();
AmqpMessage message = new AmqpMessage();
message.setApplicationProperty("TEST_BINARY", new Binary("TEST".getBytes()));
message.setApplicationProperty("TEST_STRING", "TEST");
message.setText("TEST");
sender.send(message);
session.commit();
SimpleString queue = new SimpleString(getQueueName());
QueueControl queueControl = createManagementControl(queue, queue);
String firstMessageAsJSON = queueControl.getFirstMessageAsJSON();
Assert.assertNotNull(firstMessageAsJSON);
} finally {
connection.close();
}
}
protected QueueControl createManagementControl(final SimpleString address, protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception { final SimpleString queue) throws Exception {
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer); QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);