ARTEMIS-193 - OpenWire protocol only works with messages received over openwire

Fix the address conversion between protocols so its consistent

https://issues.apache.org/jira/browse/ARTEMIS-193
This commit is contained in:
Andy Taylor 2015-08-27 10:23:22 +01:00
parent b1f1c5a961
commit 2f18b4cbfc
4 changed files with 50 additions and 25 deletions

View File

@ -71,7 +71,6 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER"; private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID"; private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE"; private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
private static final String AMQ_MSG_DESTINATION = AMQ_PREFIX + "DESTINATION";
private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID"; private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID";
private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE"; private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID"; private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
@ -253,10 +252,6 @@ public class OpenWireMessageConverter implements MessageConverter {
dsBytes.compact(); dsBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
} }
ActiveMQDestination dest = messageSend.getDestination();
ByteSequence destBytes = marshaller.marshal(dest);
destBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_DESTINATION, destBytes.data);
String groupId = messageSend.getGroupID(); String groupId = messageSend.getGroupID();
if (groupId != null) { if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId); coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
@ -269,18 +264,6 @@ public class OpenWireMessageConverter implements MessageConverter {
midBytes.compact(); midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
ByteSequence origDestBytes = marshaller.marshal(origDest);
origDestBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
}
TransactionId origTxId = messageSend.getOriginalTransactionId();
if (origTxId != null) {
ByteSequence origTxBytes = marshaller.marshal(origTxId);
origTxBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_ORIG_TXID, origTxBytes.data);
}
ProducerId producerId = messageSend.getProducerId(); ProducerId producerId = messageSend.getProducerId();
if (producerId != null) { if (producerId != null) {
ByteSequence producerIdBytes = marshaller.marshal(producerId); ByteSequence producerIdBytes = marshaller.marshal(producerId);
@ -375,7 +358,7 @@ public class OpenWireMessageConverter implements MessageConverter {
public static MessageDispatch createMessageDispatch(ServerMessage message, public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount, int deliveryCount,
AMQConsumer consumer) throws IOException { AMQConsumer consumer) throws IOException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller()); ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
MessageDispatch md = new MessageDispatch(); MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId()); md.setConsumerId(consumer.getId());
@ -387,7 +370,7 @@ public class OpenWireMessageConverter implements MessageConverter {
return md; return md;
} }
private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller) throws IOException { private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null; ActiveMQMessage amqMsg = null;
byte coreType = coreMessage.getType(); byte coreType = coreMessage.getType();
switch (coreType) { switch (coreType) {
@ -582,12 +565,7 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setDataStructure(ds); amqMsg.setDataStructure(ds);
} }
byte[] destBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DESTINATION); amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
if (destBytes != null) {
ByteSequence seq = new ByteSequence(destBytes);
ActiveMQDestination dest = (ActiveMQDestination) marshaller.unmarshal(seq);
amqMsg.setDestination(dest);
}
Object value = coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID); Object value = coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID);
if (value != null) { if (value != null) {

View File

@ -18,11 +18,14 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -44,6 +47,23 @@ public class OpenWireUtil {
} }
} }
/**
* We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
* destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
* set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
* consumer
*/
public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
String address = message.getAddress().toString();
String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
if (actualDestination.isQueue()) {
return new ActiveMQQueue(strippedAddress);
}
else {
return new ActiveMQTopic(strippedAddress);
}
}
/** /**
* Checks to see if this destination exists. If it does not throw an invalid destination exception. * Checks to see if this destination exists. If it does not throw an invalid destination exception.
* *

View File

@ -370,6 +370,10 @@ public class AMQConsumer implements BrowserListener {
session.removeConsumer(nativeId); session.removeConsumer(nativeId);
} }
public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
return actualDest;
}
private class MessagePullHandler { private class MessagePullHandler {
private long next = -1; private long next = -1;

View File

@ -69,11 +69,15 @@ public class GeneralInteropTest extends BasicOpenWireTest {
assertEquals(text, textMessage.getText()); assertEquals(text, textMessage.getText());
assertEquals(destination, textMessage.getJMSDestination());
//map messages //map messages
sendMapMessageUsingCoreJms(queueName); sendMapMessageUsingCoreJms(queueName);
MapMessage mapMessage = (MapMessage) consumer.receive(5000); MapMessage mapMessage = (MapMessage) consumer.receive(5000);
assertEquals(destination, mapMessage.getJMSDestination());
assertTrue(mapMessage.getBoolean("aboolean")); assertTrue(mapMessage.getBoolean("aboolean"));
assertEquals((byte) 4, mapMessage.getByte("abyte")); assertEquals((byte) 4, mapMessage.getByte("abyte"));
byte[] bytes = mapMessage.getBytes("abytes"); byte[] bytes = mapMessage.getBytes("abytes");
@ -105,6 +109,9 @@ public class GeneralInteropTest extends BasicOpenWireTest {
sendStreamMessageUsingCoreJms(queueName); sendStreamMessageUsingCoreJms(queueName);
StreamMessage streamMessage = (StreamMessage) consumer.receive(5000); StreamMessage streamMessage = (StreamMessage) consumer.receive(5000);
assertEquals(destination, streamMessage.getJMSDestination());
assertTrue(streamMessage.readBoolean()); assertTrue(streamMessage.readBoolean());
assertEquals((byte) 2, streamMessage.readByte()); assertEquals((byte) 2, streamMessage.readByte());
@ -146,6 +153,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
sendMessageUsingCoreJms(queueName); sendMessageUsingCoreJms(queueName);
javax.jms.Message genericMessage = consumer.receive(5000); javax.jms.Message genericMessage = consumer.receive(5000);
assertEquals(destination, genericMessage.getJMSDestination());
String value = genericMessage.getStringProperty("stringProperty"); String value = genericMessage.getStringProperty("stringProperty");
assertEquals("HelloMessage", value); assertEquals("HelloMessage", value);
assertFalse(genericMessage.getBooleanProperty("booleanProperty")); assertFalse(genericMessage.getBooleanProperty("booleanProperty"));
@ -171,6 +180,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
TextMessage textMessage = (TextMessage) consumer.receive(5000); TextMessage textMessage = (TextMessage) consumer.receive(5000);
assertEquals(text + i, textMessage.getText()); assertEquals(text + i, textMessage.getText());
assertEquals(destination, textMessage.getJMSDestination());
} }
} }
@ -365,12 +376,15 @@ public class GeneralInteropTest extends BasicOpenWireTest {
TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000); TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000);
assertEquals(text, txtMessage.getText()); assertEquals(text, txtMessage.getText());
assertEquals(txtMessage.getJMSDestination(), queue);
// map messages // map messages
sendMapMessageUsingOpenWire(); sendMapMessageUsingOpenWire();
MapMessage mapMessage = (MapMessage) coreConsumer.receive(5000); MapMessage mapMessage = (MapMessage) coreConsumer.receive(5000);
assertEquals(mapMessage.getJMSDestination(), queue);
assertTrue(mapMessage.getBoolean("aboolean")); assertTrue(mapMessage.getBoolean("aboolean"));
assertEquals((byte) 4, mapMessage.getByte("abyte")); assertEquals((byte) 4, mapMessage.getByte("abyte"));
byte[] bytes = mapMessage.getBytes("abytes"); byte[] bytes = mapMessage.getBytes("abytes");
@ -392,6 +406,9 @@ public class GeneralInteropTest extends BasicOpenWireTest {
sendObjectMessageUsingOpenWire(obj); sendObjectMessageUsingOpenWire(obj);
ObjectMessage objectMessage = (ObjectMessage) coreConsumer.receive(5000); ObjectMessage objectMessage = (ObjectMessage) coreConsumer.receive(5000);
assertEquals(objectMessage.getJMSDestination(), queue);
SimpleSerializable data = (SimpleSerializable) objectMessage.getObject(); SimpleSerializable data = (SimpleSerializable) objectMessage.getObject();
assertEquals(obj.objName, data.objName); assertEquals(obj.objName, data.objName);
@ -402,6 +419,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
sendStreamMessageUsingOpenWire(queueName); sendStreamMessageUsingOpenWire(queueName);
StreamMessage streamMessage = (StreamMessage) coreConsumer.receive(5000); StreamMessage streamMessage = (StreamMessage) coreConsumer.receive(5000);
assertEquals(streamMessage.getJMSDestination(), queue);
assertTrue(streamMessage.readBoolean()); assertTrue(streamMessage.readBoolean());
assertEquals((byte) 2, streamMessage.readByte()); assertEquals((byte) 2, streamMessage.readByte());
@ -426,6 +445,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
sendBytesMessageUsingOpenWire(bytesData); sendBytesMessageUsingOpenWire(bytesData);
BytesMessage bytesMessage = (BytesMessage) coreConsumer.receive(5000); BytesMessage bytesMessage = (BytesMessage) coreConsumer.receive(5000);
assertEquals(bytesMessage.getJMSDestination(), queue);
byte[] rawBytes = new byte[bytesData.length]; byte[] rawBytes = new byte[bytesData.length];
bytesMessage.readBytes(rawBytes); bytesMessage.readBytes(rawBytes);
@ -437,6 +458,7 @@ public class GeneralInteropTest extends BasicOpenWireTest {
sendMessageUsingOpenWire(queueName); sendMessageUsingOpenWire(queueName);
javax.jms.Message genericMessage = coreConsumer.receive(5000); javax.jms.Message genericMessage = coreConsumer.receive(5000);
assertEquals(genericMessage.getJMSDestination(), queue);
String value = genericMessage.getStringProperty("stringProperty"); String value = genericMessage.getStringProperty("stringProperty");
assertEquals("HelloMessage", value); assertEquals("HelloMessage", value);
assertFalse(genericMessage.getBooleanProperty("booleanProperty")); assertFalse(genericMessage.getBooleanProperty("booleanProperty"));
@ -471,6 +493,7 @@ public class GeneralInteropTest extends BasicOpenWireTest {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000); TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000);
assertEquals(txtMessage.getJMSDestination(), queue);
assertEquals(text + i, txtMessage.getText()); assertEquals(text + i, txtMessage.getText());
} }
} }