This commit is contained in:
Clebert Suconic 2017-03-21 09:15:26 -04:00
commit ace43c8ffd
1 changed files with 20 additions and 19 deletions

View File

@ -1285,10 +1285,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public RoutingStatus send(Transaction tx,
final Message message,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
public synchronized RoutingStatus send(Transaction tx,
final Message message,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
@ -1611,23 +1611,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public RoutingStatus doSend(final Transaction tx,
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
public synchronized RoutingStatus doSend(final Transaction tx,
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
RoutingType routingType = msg.getRouteType();
/* TODO-now: How to address here with AMQP?
if (originalAddress != null) {
if (originalAddress.toString().startsWith("anycast:")) {
routingType = RoutingType.ANYCAST;
} else if (originalAddress.toString().startsWith("multicast:")) {
routingType = RoutingType.MULTICAST;
}
} */
/* TODO-now: How to address here with AMQP?
if (originalAddress != null) {
if (originalAddress.toString().startsWith("anycast:")) {
routingType = RoutingType.ANYCAST;
} else if (originalAddress.toString().startsWith("multicast:")) {
routingType = RoutingType.MULTICAST;
}
} */
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
@ -1692,7 +1693,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
RoutingType defaultRoutingType) {
RoutingType defaultRoutingType) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
}
@ -1701,7 +1702,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes) {
Set<RoutingType> defaultRoutingTypes) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes);
}