ARTEMIS-2915 duplicate temp queues using OpenWire

This commit is contained in:
Justin Bertram 2020-09-24 11:27:20 -05:00 committed by Clebert Suconic
parent 4a33332021
commit cdc283fba5
2 changed files with 50 additions and 3 deletions

View File

@ -853,9 +853,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
if (dest.isTemporary()) {
//Openwire needs to store the DestinationInfo in order to send
//Advisory messages to clients
this.state.addTempDestination(info);
//Openwire needs to store the DestinationInfo in order to send Advisory messages to clients
if (!tempDestinationExists(info.getDestination().getPhysicalName())) {
this.state.addTempDestination(info);
if (logger.isDebugEnabled()) {
logger.debug(this + " added temp destination to state: " + info.getDestination().getPhysicalName() + "; " + state.getTempDestinations().size());
}
}
}
if (created && !AdvisorySupport.isAdvisoryTopic(dest)) {
@ -963,6 +967,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void tempQueueDeleted(SimpleString bindingName) {
ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
state.removeTempDestination(dest);
if (logger.isDebugEnabled()) {
logger.debug(this + " removed temp destination from state: " + bindingName + "; " + state.getTempDestinations().size());
}
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = getContext();
@ -1121,6 +1128,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
private boolean tempDestinationExists(String name) {
boolean result = false;
for (DestinationInfo destinationInfo : state.getTempDestinations()) {
if (destinationInfo.getDestination().getPhysicalName().equals(name)) {
result = true;
break;
}
}
return result;
}
CommandProcessor commandProcessorInstance = new CommandProcessor();
// This will listen for commands through the protocolmanager

View File

@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -114,6 +115,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
connection.close();
}
@Test
public void testDuplicateTemporaryDestination() throws Exception {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createTemporaryQueue();
for (int i = 0; i < 10; i++) {
MessageProducer producer = session.createProducer(queue);
producer.close();
}
int tempDestinationCount = 0;
for (RemotingConnection remotingConnection : server.getRemotingService().getConnections()) {
if (remotingConnection instanceof OpenWireConnection) {
OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection;
if (openWireConnection.getState() != null && openWireConnection.getState().getTempDestinations() != null) {
tempDestinationCount += openWireConnection.getState().getTempDestinations().size();
}
}
}
assertTrue(tempDestinationCount <= 1);
session.close();
connection.close();
}
@Test
public void testTransactionalSimple() throws Exception {
try (Connection connection = factory.createConnection()) {