diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 3cda78dcd9..08df785c1d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.junit.ActiveMQTestRunner; @@ -46,6 +47,7 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,6 +63,47 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { return true; } + @Test(timeout = 60000) + public void testReceiverCloseSendsRemoteClose() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + final AtomicBoolean closed = new AtomicBoolean(); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectClosedResource(Session session) { + LOG.info("Session closed: {}", session.getContext()); + } + + @Override + public void inspectDetachedResource(Receiver receiver) { + markAsInvalid("Broker should not detach receiver linked to closed session."); + } + + @Override + public void inspectClosedResource(Receiver receiver) { + LOG.info("Receiver closed: {}", receiver.getContext()); + closed.set(true); + } + }); + + AmqpConnection connection = trackConnection(client.connect()); + assertNotNull(connection); + AmqpSession session = connection.createSession(); + assertNotNull(session); + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + assertNotNull(receiver); + + receiver.close(); + + assertTrue("Did not process remote close as expected", closed.get()); + connection.getStateInspector().assertValid(); + + connection.close(); + } + @Test(timeout = 60000) public void testCreateQueueReceiver() throws Exception { AmqpClient client = createAmqpClient();