ARTEMIS-1051 using ServerSession's own lock

This commit is contained in:
Clebert Suconic 2017-03-20 21:56:07 -04:00
parent 8394fec104
commit e2b2e247d9

View File

@ -182,8 +182,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private Set<Closeable> closeables;
private final Object sendLock = new Object();
public ServerSessionImpl(final String name,
final String username,
final String password,
@ -1287,61 +1285,59 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public RoutingStatus send(Transaction tx,
final Message message,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
public synchronized RoutingStatus send(Transaction tx,
final Message message,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
synchronized (sendLock) {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
}
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
// This will re-encode the message
message.setMessageID(id);
}
if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
}
SimpleString address = message.getAddressSimpleString();
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
// We don't want to force a re-encode when the message gets sent to the consumer
message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
return result;
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
}
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.
if (!message.isLargeMessage()) {
long id = storageManager.generateID();
// This will re-encode the message
message.setMessageID(id);
}
if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) {
message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
}
SimpleString address = message.getAddressSimpleString();
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
// We don't want to force a re-encode when the message gets sent to the consumer
message.setAddress(defaultAddress);
}
if (logger.isTraceEnabled()) {
logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
}
if (message.getAddress() == null) {
// This could happen with some tests that are ignoring messages
throw ActiveMQMessageBundle.BUNDLE.noAddress();
}
if (message.getAddressSimpleString().equals(managementAddress)) {
// It's a management message
handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
}
return result;
}
@ -1615,16 +1611,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public RoutingStatus doSend(final Transaction tx,
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
public synchronized RoutingStatus doSend(final Transaction tx,
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
synchronized (sendLock) {
RoutingStatus result = RoutingStatus.OK;
RoutingStatus result = RoutingStatus.OK;
RoutingType routingType = msg.getRouteType();
RoutingType routingType = msg.getRouteType();
/* TODO-now: How to address here with AMQP?
if (originalAddress != null) {
@ -1635,43 +1630,42 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
} */
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
// Consumer
// check the user has write access to this address.
try {
securityCheck(art.getA(), CheckType.SEND, this);
} catch (ActiveMQException e) {
if (!autoCommitSends && tx != null) {
tx.markAsRollbackOnly(e);
}
throw e;
// Consumer
// check the user has write access to this address.
try {
securityCheck(art.getA(), CheckType.SEND, this);
} catch (ActiveMQException e) {
if (!autoCommitSends && tx != null) {
tx.markAsRollbackOnly(e);
}
if (tx == null || autoCommitSends) {
} else {
routingContext.setTransaction(tx);
}
try {
routingContext.setAddress(art.getA());
routingContext.setRoutingType(art.getB());
result = postOffice.route(msg, routingContext, direct);
Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
} else {
value.setA(msg.getUserID());
value.getB().incrementAndGet();
}
} finally {
routingContext.clear();
}
return result;
throw e;
}
if (tx == null || autoCommitSends) {
} else {
routingContext.setTransaction(tx);
}
try {
routingContext.setAddress(art.getA());
routingContext.setRoutingType(art.getB());
result = postOffice.route(msg, routingContext, direct);
Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
} else {
value.setA(msg.getUserID());
value.getB().incrementAndGet();
}
} finally {
routingContext.clear();
}
return result;
}
@Override
@ -1699,7 +1693,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
RoutingType defaultRoutingType) {
RoutingType defaultRoutingType) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
}
@ -1708,7 +1702,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes) {
Set<RoutingType> defaultRoutingTypes) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes);
}