mirror of https://github.com/apache/activemq.git
Whem possible restore the orignal MessageId value from the incoming message.
This commit is contained in:
parent
efc9a8d578
commit
af13292916
|
@ -39,6 +39,8 @@ import javax.jms.TemporaryTopic;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||||
|
@ -174,7 +176,14 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
||||||
props.setSubject(msg.getJMSType());
|
props.setSubject(msg.getJMSType());
|
||||||
}
|
}
|
||||||
if (msg.getJMSMessageID() != null) {
|
if (msg.getJMSMessageID() != null) {
|
||||||
props.setMessageId(msg.getJMSMessageID());
|
ActiveMQMessage amqMsg = (ActiveMQMessage) msg;
|
||||||
|
|
||||||
|
MessageId msgId = amqMsg.getMessageId();
|
||||||
|
if (msgId.getTextView() != null) {
|
||||||
|
props.setMessageId(msgId.getTextView());
|
||||||
|
} else {
|
||||||
|
props.setMessageId(msgId.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (msg.getJMSDestination() != null) {
|
if (msg.getJMSDestination() != null) {
|
||||||
props.setTo(vendor.toAddress(msg.getJMSDestination()));
|
props.setTo(vendor.toAddress(msg.getJMSDestination()));
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -36,7 +35,6 @@ import org.junit.Test;
|
||||||
*/
|
*/
|
||||||
public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
@Ignore("Test fails when JMS transformer is in play")
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testCloseBusyReceiver() throws Exception {
|
public void testCloseBusyReceiver() throws Exception {
|
||||||
final int MSG_COUNT = 20;
|
final int MSG_COUNT = 20;
|
||||||
|
@ -71,7 +69,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
|
AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
|
||||||
receiver2.flow(200);
|
receiver2.flow(200);
|
||||||
for (int i = 0; i < MSG_COUNT; ++i) {
|
for (int i = 0; i < MSG_COUNT; ++i) {
|
||||||
received = receiver1.receive(5, TimeUnit.SECONDS);
|
received = receiver2.receive(5, TimeUnit.SECONDS);
|
||||||
assertEquals("msg" + i, received.getMessageId());
|
assertEquals("msg" + i, received.getMessageId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue