This commit is contained in:
Clebert Suconic 2017-03-27 15:14:44 -04:00
commit 2ef0d26015
4 changed files with 30 additions and 1 deletions

View File

@ -256,6 +256,17 @@ public class AMQPMessage extends RefCountMessage {
if (routingType != null) {
return RoutingType.getType((byte) routingType);
} else {
routingType = getSymbol(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
if (routingType != null) {
if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
return RoutingType.ANYCAST;
} else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
return RoutingType.MULTICAST;
}
} else {
return null;
}
return null;
}
}

View File

@ -575,4 +575,8 @@ public class AMQPSessionCallback implements SessionCallback {
public void removeTemporaryQueue(String address) throws Exception {
serverSession.deleteQueue(SimpleString.toSimpleString(address));
}
public RoutingType getDefaultRoutingType(String address) {
return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType();
}
}

View File

@ -39,6 +39,8 @@ public class AmqpSupport {
// Capabilities used to identify destination type in some requests.
public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
public static final Symbol QUEUE_CAPABILITY = Symbol.valueOf("queue");
public static final Symbol TOPIC_CAPABILITY = Symbol.valueOf("topic");
// Symbols used to announce connection information to remote peer.
public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");

View File

@ -86,7 +86,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
address = sessionSPI.tempQueueName();
try {
sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST);
sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
@ -122,6 +122,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(maxCreditAllocation, minCreditRefresh);
}
private RoutingType getRoutingType(Symbol[] symbols) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
return RoutingType.MULTICAST;
} else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
return RoutingType.ANYCAST;
}
}
return sessionSPI.getDefaultRoutingType(address);
}
/*
* called when Proton receives a message to be delivered via a Delivery.
*