diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 53b9b4f946..667d57aeee 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -269,9 +269,11 @@ public class AMQPSessionCallback implements SessionCallback { queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); } - if (queueQueryResult.getRoutingType() != routingType) { + // if auto-create we will return whatever type was used before + if (!queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) { throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); } + return queueQueryResult; } @@ -407,9 +409,7 @@ public class AMQPSessionCallback implements SessionCallback { } //here check queue-autocreation - org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); - - RoutingType routingType = context.getRoutingType(receiver); + RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST); if (!bindingQuery(address, routingType)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 014b9f9c21..15318d5b7a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -58,8 +58,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements protected final AMQPSessionCallback sessionSPI; - private RoutingType defRoutingType; - /* The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. @@ -98,6 +96,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements // We don't currently support SECOND so enforce that the answer is anlways FIRST receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + RoutingType defRoutingType; + if (target != null) { if (target.getDynamic()) { defRoutingType = getRoutingType(target.getCapabilities()); @@ -181,15 +181,16 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(amqpCredits, minCreditRefresh); } - public RoutingType getRoutingType(Receiver receiver) { - if (receiver == this.receiver) { - return defRoutingType; - } + public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) { org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); - return target != null ? getRoutingType(target.getCapabilities()) : getRoutingType((Symbol[])null); + return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType); } private RoutingType getRoutingType(Symbol[] symbols) { + return getRoutingType(symbols, null); + } + + private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -200,7 +201,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } - return sessionSPI.getDefaultRoutingType(address); + if (defaultType != null) { + return defaultType; + } else { + return sessionSPI.getDefaultRoutingType(address); + } } /*