ARTEMIS-1051 Make ServerSession send thread safe

This commit is contained in:
Martyn Taylor 2017-03-20 17:16:10 +00:00 committed by Clebert Suconic
parent bfa679c17f
commit 8394fec104
1 changed files with 96 additions and 89 deletions

View File

@ -182,6 +182,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private Set<Closeable> closeables;
private final Object sendLock = new Object();
public ServerSessionImpl(final String name,
final String username,
final String password,
@ -1290,54 +1292,56 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
synchronized (sendLock) {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
}
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
// This will re-encode the message
message.setMessageID(id);
}
if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
}
SimpleString address = message.getAddressSimpleString();
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
// We don't want to force a re-encode when the message gets sent to the consumer
message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
return result;
}
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
// This will re-encode the message
message.setMessageID(id);
}
if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
}
SimpleString address = message.getAddressSimpleString();
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
// We don't want to force a re-encode when the message gets sent to the consumer
message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
return result;
}
@ -1616,55 +1620,58 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
RoutingType routingType = msg.getRouteType();
synchronized (sendLock) {
RoutingStatus result = RoutingStatus.OK;
/* TODO-now: How to address here with AMQP?
if (originalAddress != null) {
if (originalAddress.toString().startsWith("anycast:")) {
routingType = RoutingType.ANYCAST;
} else if (originalAddress.toString().startsWith("multicast:")) {
routingType = RoutingType.MULTICAST;
RoutingType routingType = msg.getRouteType();
/* TODO-now: How to address here with AMQP?
if (originalAddress != null) {
if (originalAddress.toString().startsWith("anycast:")) {
routingType = RoutingType.ANYCAST;
} else if (originalAddress.toString().startsWith("multicast:")) {
routingType = RoutingType.MULTICAST;
}
} */
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
// Consumer
// check the user has write access to this address.
try {
securityCheck(art.getA(), CheckType.SEND, this);
} catch (ActiveMQException e) {
if (!autoCommitSends && tx != null) {
tx.markAsRollbackOnly(e);
}
throw e;
}
} */
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
// Consumer
// check the user has write access to this address.
try {
securityCheck(art.getA(), CheckType.SEND, this);
} catch (ActiveMQException e) {
if (!autoCommitSends && tx != null) {
tx.markAsRollbackOnly(e);
}
throw e;
}
if (tx == null || autoCommitSends) {
} else {
routingContext.setTransaction(tx);
}
try {
routingContext.setAddress(art.getA());
routingContext.setRoutingType(art.getB());
result = postOffice.route(msg, routingContext, direct);
Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
if (tx == null || autoCommitSends) {
} else {
value.setA(msg.getUserID());
value.getB().incrementAndGet();
routingContext.setTransaction(tx);
}
} finally {
routingContext.clear();
try {
routingContext.setAddress(art.getA());
routingContext.setRoutingType(art.getB());
result = postOffice.route(msg, routingContext, direct);
Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
} else {
value.setA(msg.getUserID());
value.getB().incrementAndGet();
}
} finally {
routingContext.clear();
}
return result;
}
return result;
}
@Override