ARTEMIS-1416 Fixing qpid AMQP tests

This will fix tests from https://git-wip-us.apache.org/repos/asf/qpid-interop-test.git

Notice that the previous 2 committs here are needed
This commit is contained in:
Clebert Suconic 2017-11-02 15:55:04 -04:00
parent ec13ed6df0
commit 0e9b39c825
2 changed files with 17 additions and 12 deletions

View File

@ -269,9 +269,11 @@ public class AMQPSessionCallback implements SessionCallback {
queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
}
if (queueQueryResult.getRoutingType() != routingType) {
// if auto-create we will return whatever type was used before
if (!queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
}
return queueQueryResult;
}
@ -407,9 +409,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
//here check queue-autocreation
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
RoutingType routingType = context.getRoutingType(receiver);
RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST);
if (!bindingQuery(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}

View File

@ -58,8 +58,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final AMQPSessionCallback sessionSPI;
private RoutingType defRoutingType;
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
@ -98,6 +96,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
// We don't currently support SECOND so enforce that the answer is anlways FIRST
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
RoutingType defRoutingType;
if (target != null) {
if (target.getDynamic()) {
defRoutingType = getRoutingType(target.getCapabilities());
@ -181,15 +181,16 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(amqpCredits, minCreditRefresh);
}
public RoutingType getRoutingType(Receiver receiver) {
if (receiver == this.receiver) {
return defRoutingType;
}
public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
return target != null ? getRoutingType(target.getCapabilities()) : getRoutingType((Symbol[])null);
return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType);
}
private RoutingType getRoutingType(Symbol[] symbols) {
return getRoutingType(symbols, null);
}
private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) {
if (symbols != null) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@ -200,8 +201,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
if (defaultType != null) {
return defaultType;
} else {
return sessionSPI.getDefaultRoutingType(address);
}
}
/*
* called when Proton receives a message to be delivered via a Delivery.