This closes #742

This commit is contained in:
Clebert Suconic 2016-08-30 17:12:19 -04:00
commit b51142ae0c
2 changed files with 44 additions and 0 deletions

View File

@ -139,6 +139,13 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
body = new AmqpValue(((ObjectMessage) msg).getObject()); body = new AmqpValue(((ObjectMessage) msg).getObject());
} }
if (body == null && msg instanceof org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage) {
Object s = ((org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage) msg).getInnerMessage().getBodyBuffer().readNullableSimpleString();
if (s != null) {
body = new AmqpValue(s.toString());
}
}
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
if (msg.getJMSType() != null) { if (msg.getJMSType() != null) {

View File

@ -78,6 +78,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import static org.junit.Assume.assumeTrue;
import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientReceiverContext; import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSenderContext;
@ -639,6 +640,42 @@ public class ProtonTest extends ActiveMQTestBase {
assertNotNull(receivedMessage); assertNotNull(receivedMessage);
} }
@Test
public void testManagementQueryOverAMQP() throws Throwable {
assumeTrue(protocol == 0 || protocol == 3); // Only run this test for AMQP protocol
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
String destinationAddress = address + 1;
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender("jms.queue.activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);
//create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", "core.server");
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setApplicationProperty("JMSReplyTo", destinationAddress);
request.setText("[]");
sender.send(request);
AmqpMessage response = receiver.receive();
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
assertTrue(section instanceof AmqpValue);
Object value = ((AmqpValue) section).getValue();
assertTrue(value instanceof String);
assertTrue(((String) value).length() > 0);
assertTrue(((String) value).contains(destinationAddress));
response.accept();
}
finally {
amqpConnection.close();
}
}
@Test @Test
public void testReplyTo() throws Throwable { public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);