This commit is contained in:
Clebert Suconic 2020-01-15 11:18:13 -05:00
commit 1c1359f9f9
3 changed files with 63 additions and 1 deletions

View File

@ -86,6 +86,8 @@ public class AMQPSessionContext extends ProtonInitializable {
public void disconnect(Object consumer, String queueName) {
ProtonServerSenderContext protonConsumer = senders.remove(consumer);
if (protonConsumer != null) {
serverSenders.remove(protonConsumer.getBrokerConsumer());
try {
protonConsumer.close(false);
} catch (ActiveMQAMQPException e) {
@ -132,6 +134,7 @@ public class AMQPSessionContext extends ProtonInitializable {
}
}
senders.clear();
serverSenders.clear();
try {
if (sessionSPI != null) {
sessionSPI.close();
@ -178,6 +181,9 @@ public class AMQPSessionContext extends ProtonInitializable {
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
if (protonSender.getBrokerConsumer() != null) {
serverSenders.remove(protonSender.getBrokerConsumer());
}
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
connection.runNow(() -> {
@ -188,7 +194,6 @@ public class AMQPSessionContext extends ProtonInitializable {
}
public void removeSender(Sender sender) throws ActiveMQAMQPException {
senders.remove(sender);
ProtonServerSenderContext senderRemoved = senders.remove(sender);
if (senderRemoved != null) {
serverSenders.remove(senderRemoved.getBrokerConsumer());
@ -217,4 +222,12 @@ public class AMQPSessionContext extends ProtonInitializable {
});
}
}
public int getReceiverCount() {
return receivers.size();
}
public int getSenderCount() {
return senders.size();
}
}

View File

@ -558,6 +558,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
try {
closed = true;
protonSession.removeSender(sender);
sessionSPI.closeSender(brokerConsumer);
// if this is a link close rather than a connection close or detach, we need to delete
// any durable resources for say pub subs
@ -899,4 +900,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sender.drained();
connection.flush();
}
public AMQPSessionContext getSessionContext() {
return protonSession;
}
}

View File

@ -20,6 +20,8 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
@ -59,7 +61,11 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
@ -315,6 +321,44 @@ public class ConsumerTest extends ActiveMQTestBase {
}
}
@Test
public void testContextOnConsumerAMQP() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
ConnectionFactory factory = createFactory(2);
JMSContext context = factory.createContext("admin", "admin", Session.AUTO_ACKNOWLEDGE);
try {
javax.jms.Queue queue = context.createQueue("queue");
JMSConsumer consumer = context.createConsumer(queue);
ServerConsumer serverConsumer = null;
for (ServerSession session : server.getSessions()) {
for (ServerConsumer sessionConsumer : session.getServerConsumers()) {
serverConsumer = sessionConsumer;
}
}
consumer.close();
Assert.assertTrue(serverConsumer.getProtocolContext() instanceof ProtonServerSenderContext);
final AMQPSessionContext sessionContext = ((ProtonServerSenderContext)
serverConsumer.getProtocolContext()).getSessionContext();
Wait.assertEquals(0, () -> sessionContext.getSenderCount(), 1000, 10);
} finally {
context.stop();
context.close();
}
}
@Test
public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable {
if (!isNetty()) {