ARTEMIS-1068 routingType + AMQP fixes

This commit is contained in:
Justin Bertram 2017-03-24 15:05:18 -05:00
parent 4203ae89ca
commit 427039ef38
16 changed files with 130 additions and 54 deletions

View File

@ -168,7 +168,13 @@ public interface Message {
// only on core
}
RoutingType getRouteType();
default RoutingType getRoutingType() {
return null;
}
default Message setRoutingType(RoutingType routingType) {
return this;
}
default SimpleString getLastValueProperty() {
return null;

View File

@ -154,13 +154,23 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
public RoutingType getRouteType() {
public RoutingType getRoutingType() {
if (containsProperty(Message.HDR_ROUTING_TYPE)) {
return RoutingType.getType(getByteProperty(Message.HDR_ROUTING_TYPE));
}
return null;
}
@Override
public Message setRoutingType(RoutingType routingType) {
if (routingType == null) {
removeProperty(Message.HDR_ROUTING_TYPE);
} else {
putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
}
return this;
}
@Override
public CoreMessage setReplyTo(SimpleString address) {

View File

@ -399,13 +399,10 @@ public class ActiveMQMessage implements javax.jms.Message {
if (dest == null) {
SimpleString address = message.getAddressSimpleString();
String prefix = "";
if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
if (routingType.equals(RoutingType.ANYCAST)) {
prefix = QUEUE_QUALIFIED_PREFIX;
} else if (routingType.equals(RoutingType.MULTICAST)) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
prefix = QUEUE_QUALIFIED_PREFIX;
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());

View File

@ -494,8 +494,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType);
coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST);
try {
/**

View File

@ -250,22 +250,24 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
public RoutingType getRouteType() {
public RoutingType getRoutingType() {
Object routingType = getSymbol(AMQPMessageSupport.ROUTING_TYPE);
/* TODO-now How to use this properly
switch (((Byte)type).byteValue()) {
case AMQPMessageSupport.QUEUE_TYPE:
case AMQPMessageSupport.TEMP_QUEUE_TYPE:
return RoutingType.ANYCAST;
if (routingType != null) {
return RoutingType.getType((byte) routingType);
} else {
return null;
}
}
case AMQPMessageSupport.TOPIC_TYPE:
case AMQPMessageSupport.TEMP_TOPIC_TYPE:
return RoutingType.MULTICAST;
default:
return null;
} */
return null;
@Override
public org.apache.activemq.artemis.api.core.Message setRoutingType(RoutingType routingType) {
parseHeaders();
if (routingType == null) {
removeSymbol(AMQPMessageSupport.ROUTING_TYPE);
}
setSymbol(AMQPMessageSupport.ROUTING_TYPE, routingType.getType());
return this;
}
@Override

View File

@ -64,6 +64,11 @@ public final class AMQPMessageSupport {
*/
public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol ROUTING_TYPE = Symbol.getSymbol("x-opt-routing-type");
/**
* Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message
* which has no body.

View File

@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister;
@ -42,11 +41,6 @@ public class OpenwireMessage implements Message {
}
@Override
public RoutingType getRouteType() {
return null;
}
@Override
public SimpleString getReplyTo() {
return null;

View File

@ -329,9 +329,9 @@ public class AMQSession implements SessionCallback {
if (actualDestinations[i].isQueue()) {
checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
coreMsg.setRoutingType(RoutingType.MULTICAST);
}
PagingStore store = server.getPagingManager().getPageStore(address);

View File

@ -180,7 +180,7 @@ public abstract class VersionedStompFrameHandler {
CoreMessage message = connection.createServerMessage();
if (routingType != null) {
message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType());
message.setRoutingType(routingType);
}
message.setTimestamp(timestamp);
message.setAddress(SimpleString.toSimpleString(destination));

View File

@ -108,13 +108,13 @@ public class DivertImpl implements Divert {
switch (routingType) {
case ANYCAST:
copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
copy.setRoutingType(RoutingType.ANYCAST);
break;
case MULTICAST:
copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
copy.setRoutingType(RoutingType.MULTICAST);
break;
case STRIP:
copy.removeProperty(Message.HDR_ROUTING_TYPE);
copy.setRoutingType(null);
break;
case PASS:
break;

View File

@ -1619,7 +1619,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
RoutingStatus result = RoutingStatus.OK;
RoutingType routingType = msg.getRouteType();
RoutingType routingType = msg.getRoutingType();
/* TODO-now: How to address here with AMQP?
if (originalAddress != null) {

View File

@ -283,11 +283,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
class FakeMessage extends RefCountMessage {
@Override
public RoutingType getRouteType() {
return null;
}
@Override
public SimpleString getReplyTo() {
return null;

View File

@ -35,7 +35,9 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -208,7 +210,33 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
public void testAnycastMessageRoutingExclusivity() throws Exception {
public void testQueueReceiverReadMessageWithDivert() throws Exception {
final String forwardingAddress = getTestName() + "Divert";
final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
sendMessages(getTestName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(forwardingAddress);
Queue queueView = getProxyToQueue(forwardingAddress);
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
@Test(timeout = 60000)
public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
@ -226,8 +254,27 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
}
@Test(timeout = 60000)
public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
sendMessages(addressA, 1, RoutingType.ANYCAST);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
}
@Test
public void testMulticastMessageRoutingExclusivity() throws Exception {
public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
@ -245,6 +292,25 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
@Test
public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
sendMessages(addressA, 1, RoutingType.MULTICAST);
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getTestName(), 1, false);
@ -1107,6 +1173,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
public void sendMessages(String destinationName, int count) throws Exception {
sendMessages(destinationName, count, null);
}
public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
@ -1116,6 +1186,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
if (routingType != null) {
message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
}
sender.send(message);
}
} finally {

View File

@ -341,11 +341,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
final long id;
@Override
public RoutingType getRouteType() {
return null;
}
@Override
public SimpleString getReplyTo() {
return null;

View File

@ -236,7 +236,7 @@ public class RoutingTest extends ActiveMQTestBase {
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
ClientProducer p = sendSession.createProducer(addressA);
ClientMessage message = sendSession.createMessage(false);
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
message.setRoutingType(RoutingType.ANYCAST);
p.send(message);
sendSession.close();
assertEquals(1, server.locateQueue(queueA).getMessageCount() + server.locateQueue(queueB).getMessageCount());
@ -255,7 +255,7 @@ public class RoutingTest extends ActiveMQTestBase {
sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC);
ClientProducer p = sendSession.createProducer(addressA);
ClientMessage message = sendSession.createMessage(false);
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
message.setRoutingType(RoutingType.MULTICAST);
p.send(message);
sendSession.close();
assertEquals(0, server.locateQueue(queueA).getMessageCount());

View File

@ -165,7 +165,7 @@ public class DivertTest extends ActiveMQTestBase {
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session.createMessage(false);
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
message.setRoutingType(RoutingType.MULTICAST);
message.putIntProperty(propKey, i);
@ -238,7 +238,7 @@ public class DivertTest extends ActiveMQTestBase {
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session.createMessage(false);
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
message.setRoutingType(RoutingType.MULTICAST);
message.putIntProperty(propKey, i);