From 54f5dae458ab7bc1ce554e8c5b74c8fa30401833 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 19 Jan 2024 15:37:15 -0500 Subject: [PATCH] ARTEMIS-4575 Only start the consumers that were added Change from forcing a session start cycle on each consumer add event and start only those consumers that were added which will trigger a prompt delivery action on each. The session should be marked started on create to account for the remove of the start on each consumer add event. --- .../core/protocol/openwire/OpenWireConnection.java | 3 ++- .../artemis/core/protocol/openwire/amq/AMQConsumer.java | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 670eca258d..eb51aad727 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -982,7 +982,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return; } - amqSession.start(); + consumersList.forEach((c) -> c.start()); if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { //advisory for temp destinations @@ -1122,6 +1122,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public AMQSession addSession(SessionInfo ss) { AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager, coreMessageObjectPools); amqSession.initialize(); + amqSession.start(); sessions.put(ss.getSessionId(), amqSession); sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 38a249e04c..03c99b74ff 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -137,6 +137,14 @@ public class AMQConsumer { return filterString; } + public void start() { + if (serverConsumer == null) { + throw new IllegalStateException("Cannot start the AMQConsumer until it has been initialized"); + } + + serverConsumer.setStarted(true); + } + public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { SimpleString selector = info.getSelector() == null ? null : new SimpleString(convertOpenWireToActiveMQFilterString(info.getSelector()));