diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index e091d798b8..3ad7ba0efc 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -853,9 +853,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } if (dest.isTemporary()) { - //Openwire needs to store the DestinationInfo in order to send - //Advisory messages to clients - this.state.addTempDestination(info); + //Openwire needs to store the DestinationInfo in order to send Advisory messages to clients + if (!tempDestinationExists(info.getDestination().getPhysicalName())) { + this.state.addTempDestination(info); + if (logger.isDebugEnabled()) { + logger.debug(this + " added temp destination to state: " + info.getDestination().getPhysicalName() + "; " + state.getTempDestinations().size()); + } + } } if (created && !AdvisorySupport.isAdvisoryTopic(dest)) { @@ -963,6 +967,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void tempQueueDeleted(SimpleString bindingName) { ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString()); state.removeTempDestination(dest); + if (logger.isDebugEnabled()) { + logger.debug(this + " removed temp destination from state: " + bindingName + "; " + state.getTempDestinations().size()); + } if (!AdvisorySupport.isAdvisoryTopic(dest)) { AMQConnectionContext context = getContext(); @@ -1121,6 +1128,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } + private boolean tempDestinationExists(String name) { + boolean result = false; + + for (DestinationInfo destinationInfo : state.getTempDestinations()) { + if (destinationInfo.getDestination().getPhysicalName().equals(name)) { + result = true; + break; + } + } + + return result; + } + CommandProcessor commandProcessorInstance = new CommandProcessor(); // This will listen for commands through the protocolmanager diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index dac47943a6..979d2765cf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -114,6 +115,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { connection.close(); } + @Test + public void testDuplicateTemporaryDestination() throws Exception { + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createTemporaryQueue(); + for (int i = 0; i < 10; i++) { + MessageProducer producer = session.createProducer(queue); + producer.close(); + } + + int tempDestinationCount = 0; + for (RemotingConnection remotingConnection : server.getRemotingService().getConnections()) { + if (remotingConnection instanceof OpenWireConnection) { + OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection; + if (openWireConnection.getState() != null && openWireConnection.getState().getTempDestinations() != null) { + tempDestinationCount += openWireConnection.getState().getTempDestinations().size(); + } + } + } + + assertTrue(tempDestinationCount <= 1); + + session.close(); + connection.close(); + } + @Test public void testTransactionalSimple() throws Exception { try (Connection connection = factory.createConnection()) {