diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 670eca258d..eb51aad727 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -982,7 +982,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return; } - amqSession.start(); + consumersList.forEach((c) -> c.start()); if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { //advisory for temp destinations @@ -1122,6 +1122,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public AMQSession addSession(SessionInfo ss) { AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager, coreMessageObjectPools); amqSession.initialize(); + amqSession.start(); sessions.put(ss.getSessionId(), amqSession); sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 38a249e04c..03c99b74ff 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -137,6 +137,14 @@ public class AMQConsumer { return filterString; } + public void start() { + if (serverConsumer == null) { + throw new IllegalStateException("Cannot start the AMQConsumer until it has been initialized"); + } + + serverConsumer.setStarted(true); + } + public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { SimpleString selector = info.getSelector() == null ? null : new SimpleString(convertOpenWireToActiveMQFilterString(info.getSelector()));