From c792b8e2741d24aef24f07b78c733ebf5f225ed7 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Sat, 25 Mar 2017 08:13:25 -0500 Subject: [PATCH] ARTEMIS-1068 JMS + AMQP routing --- .../artemis/protocol/amqp/broker/AMQPMessage.java | 11 +++++++++++ .../protocol/amqp/broker/AMQPSessionCallback.java | 4 ++++ .../artemis/protocol/amqp/proton/AmqpSupport.java | 2 ++ .../amqp/proton/ProtonServerReceiverContext.java | 14 +++++++++++++- 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 522ae1663a..d241958212 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -256,6 +256,17 @@ public class AMQPMessage extends RefCountMessage { if (routingType != null) { return RoutingType.getType((byte) routingType); } else { + routingType = getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); + if (routingType != null) { + if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) { + return RoutingType.ANYCAST; + } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) { + return RoutingType.MULTICAST; + } + } else { + return null; + } + return null; } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 034cb720e2..18294e0f4b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -575,4 +575,8 @@ public class AMQPSessionCallback implements SessionCallback { public void removeTemporaryQueue(String address) throws Exception { serverSession.deleteQueue(SimpleString.toSimpleString(address)); } + + public RoutingType getDefaultRoutingType(String address) { + return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 227ee5ded4..3a36f161e6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -39,6 +39,8 @@ public class AmqpSupport { // Capabilities used to identify destination type in some requests. public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); + public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue"); + public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic"); // Symbols used to announce connection information to remote peer. public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 34a522f27c..596e93ac01 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -86,7 +86,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST); + sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities())); } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } @@ -122,6 +122,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(maxCreditAllocation, minCreditRefresh); } + private RoutingType getRoutingType(Symbol[] symbols) { + for (Symbol symbol : symbols) { + if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { + return RoutingType.MULTICAST; + } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) { + return RoutingType.ANYCAST; + } + } + + return sessionSPI.getDefaultRoutingType(address); + } + /* * called when Proton receives a message to be delivered via a Delivery. *