ARTEMIS-2856 consumer can be created on closed session

Due to a lack of concurrency protections it's possible to create a
consumer on a closed session. I've not been able to reproduce this with
a test, but I've seen it in the wild. Static code analysis points to a
need for better concurrency controls around closing the session and
creating consumers.
This commit is contained in:
Justin Bertram 2020-07-28 16:15:22 -05:00 committed by Clebert Suconic
parent 0d9b581bf5
commit cd7f52d4ba
2 changed files with 12 additions and 3 deletions

View File

@ -492,4 +492,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229231, value = "Divert Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 229231, value = "Divert Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQDivertDoesNotExistException divertDoesNotExist(String divert); ActiveMQDivertDoesNotExistException divertDoesNotExist(String divert);
@Message(id = 229232, value = "Cannot create consumer on {0}. Session is closed.", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException cannotCreateConsumerOnClosedSession(SimpleString queueName);
} }

View File

@ -426,6 +426,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
} }
} }
closed = true;
} }
//putting closing of consumers outside the sync block //putting closing of consumers outside the sync block
@ -463,7 +464,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
callback.closed(); callback.closed();
} }
closed = true;
//When the ServerSessionImpl is closed, need to create and send a SESSION_CLOSED notification. //When the ServerSessionImpl is closed, need to create and send a SESSION_CLOSED notification.
sendSessionNotification(CoreNotificationType.SESSION_CLOSED); sendSessionNotification(CoreNotificationType.SESSION_CLOSED);
@ -559,8 +559,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
filterString, browseOnly, supportLargeMessage)); filterString, browseOnly, supportLargeMessage));
} }
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); ServerConsumer consumer;
consumers.put(consumer.getID(), consumer); synchronized (this) {
if (closed) {
throw ActiveMQMessageBundle.BUNDLE.cannotCreateConsumerOnClosedSession(queueName);
}
consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
consumers.put(consumer.getID(), consumer);
}
if (server.hasBrokerConsumerPlugins()) { if (server.hasBrokerConsumerPlugins()) {
server.callBrokerConsumerPlugins(plugin -> plugin.afterCreateConsumer(consumer)); server.callBrokerConsumerPlugins(plugin -> plugin.afterCreateConsumer(consumer));