This commit is contained in:
Clebert Suconic 2019-09-17 14:04:37 -04:00
commit 3cbd5a3c05
3 changed files with 51 additions and 10 deletions

View File

@ -1178,16 +1178,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
// Don't let new consumers or producers get added while we are closing
// this down.
session.shutdown();
// Cascade the connection stop producers.
// we don't stop consumer because in core
// closing the session will do the job
for (ProducerId producerId : session.getProducerIds()) {
try {
processRemoveProducer(producerId);
} catch (Throwable e) {
// LOG.warn("Failed to remove producer: {}", producerId, e);
}
processRemoveProducer(producerId);
}
for (ConsumerId consumerId : session.getConsumerIds()) {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
}
state.removeSession(id);
propagateLastSequenceId(session, lastDeliveredSequenceId);
removeSession(context, session.getInfo());

View File

@ -682,4 +682,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
return mappedDestination;
}
public List<OpenWireConnection> getConnections() {
return connections;
}
}

View File

@ -47,6 +47,7 @@ import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
@ -64,12 +65,17 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.state.SessionState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@ -244,8 +250,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
Connection connection = factory.createConnection();
Collection<Session> sessions = new LinkedList<>();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
@ -270,6 +274,40 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
}
@Test
public void testSessionCloseWithOpenConnection() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
session.createConsumer(queue);
session.createConsumer(queue);
connection.start();
Field infoField = ActiveMQSession.class.getDeclaredField("info");
infoField.setAccessible(true);
SessionInfo info = (SessionInfo) infoField.get(session);
NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("netty");
OpenWireProtocolManager protocolManager = (OpenWireProtocolManager) acceptor.getProtocolMap().get("OPENWIRE");
List<OpenWireConnection> connections = protocolManager.getConnections();
assertEquals(1, connections.size());
OpenWireConnection conn = connections.get(0);
SessionState sessionState = conn.getState().getSessionState(info.getSessionId());
Wait.assertEquals(2, sessionState.getConsumerIds()::size, 5000);
session.close();
Wait.assertEquals(0, sessionState.getConsumerIds()::size, 5000);
}
}
@Test
public void testRollback() throws Exception {
try (Connection connection = factory.createConnection()) {