ARTEMIS-3791 Openwire failover leaving sessions leaked

This commit is contained in:
Clebert Suconic 2022-04-20 17:39:13 -04:00 committed by clebertsuconic
parent 857f9f1db8
commit d1e1faacc4
4 changed files with 49 additions and 14 deletions

View File

@ -846,7 +846,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
context.setClientMaster(info.isClientMaster());
context.setFaultTolerant(info.isFaultTolerant());
context.setReconnect(true);
context.incRefCount();
}
/**

View File

@ -229,12 +229,11 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
synchronized (clientIdSet) {
String clientId = info.getClientId();
if (clientId != null) {
AMQConnectionContext context = this.clientIdSet.get(clientId);
if (context != null && context.decRefCount() == 0) {
AMQConnectionContext context = this.clientIdSet.remove(clientId);
if (context != null) {
//connection is still there and need to close
context.getConnection().disconnect(error != null);
this.connections.remove(context.getConnection());
this.clientIdSet.remove(clientId);
}
} else {
throw new InvalidClientIDException("No clientID specified for connection disconnect request");

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
@ -49,7 +48,6 @@ public class AMQConnectionContext {
private boolean clientMaster = true;
private ConnectionState connectionState;
private XATransactionId xid;
private AtomicInteger refCount = new AtomicInteger(1);
private Command lastCommand;
public AMQConnectionContext() {
@ -252,14 +250,6 @@ public class AMQConnectionContext {
return false;
}
public void incRefCount() {
refCount.incrementAndGet();
}
public int decRefCount() {
return refCount.decrementAndGet();
}
public void setLastCommand(Command lastCommand) {
this.lastCommand = lastCommand;
}

View File

@ -24,8 +24,13 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import io.netty.channel.ChannelFuture;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;
@ -54,6 +59,48 @@ public class ReconnectFailoverTest extends BasicOpenWireTest {
} finally {
connection.close();
}
Wait.assertEquals(0, () -> server.getSessions().size());
}
// I was trying to reproduce ARTEMIS-3791 where sessions leaked after openwire reconnects.
// even though I was not able to reproduce the issue after many tries
// I am still keeping the test to make sure I am not breaking anything
@Test
public void testReconnectPacket() throws Exception {
ConnectionFactory failoverFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
ActiveMQConnection connection = (ActiveMQConnection)failoverFactory.createConnection();
try {
ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue tempQueue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(tempQueue);
server.getRemotingService().getConnections().forEach(r -> {
NettyConnection nettyConnection = (NettyConnection) r.getTransportConnection();
ChannelFuture future = nettyConnection.getChannel().close();
try {
while (!future.isDone()) {
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
}
});
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("hello"));
}
connection.start();
MessageConsumer consumer = session.createConsumer(tempQueue);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("hello", message.getText());
}
} finally {
connection.close();
}
Wait.assertEquals(0, () -> server.getSessions().size());
}
}