From cd7f52d4ba12a03cbac9f4f2d7a34c5cfe023eba Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 28 Jul 2020 16:15:22 -0500 Subject: [PATCH] ARTEMIS-2856 consumer can be created on closed session Due to a lack of concurrency protections it's possible to create a consumer on a closed session. I've not been able to reproduce this with a test, but I've seen it in the wild. Static code analysis points to a need for better concurrency controls around closing the session and creating consumers. --- .../artemis/core/server/ActiveMQMessageBundle.java | 3 +++ .../artemis/core/server/impl/ServerSessionImpl.java | 12 +++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index d96eb98f0b..5da0521cb9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -492,4 +492,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229231, value = "Divert Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQDivertDoesNotExistException divertDoesNotExist(String divert); + + @Message(id = 229232, value = "Cannot create consumer on {0}. Session is closed.", format = Message.Format.MESSAGE_FORMAT) + ActiveMQIllegalStateException cannotCreateConsumerOnClosedSession(SimpleString queueName); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d2ea24ad99..ff9082db41 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -426,6 +426,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } } + closed = true; } //putting closing of consumers outside the sync block @@ -463,7 +464,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { callback.closed(); } - closed = true; //When the ServerSessionImpl is closed, need to create and send a SESSION_CLOSED notification. sendSessionNotification(CoreNotificationType.SESSION_CLOSED); @@ -559,8 +559,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener { filterString, browseOnly, supportLargeMessage)); } - ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); - consumers.put(consumer.getID(), consumer); + ServerConsumer consumer; + synchronized (this) { + if (closed) { + throw ActiveMQMessageBundle.BUNDLE.cannotCreateConsumerOnClosedSession(queueName); + } + consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); + consumers.put(consumer.getID(), consumer); + } if (server.hasBrokerConsumerPlugins()) { server.callBrokerConsumerPlugins(plugin -> plugin.afterCreateConsumer(consumer));