diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java index b35ef7d34d..6e35c02756 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java @@ -16,15 +16,23 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.SpawnedVMSupport; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.engine.Connection; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -80,4 +88,68 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport { connection.close(); } + private static final String QUEUE_NAME = "queue://testHeartless"; + + // This test is validating a scenario where the client will leave with connection reset + // This is done by setting soLinger=0 on the socket, which will make the system to issue a connection.reset instead of sending a + // disconnect. + @Test(timeout = 60000) + public void testCloseConsumerOnConnectionReset() throws Exception { + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout()); + } + }); + + AmqpConnection connection = addConnection(client.connect()); + assertNotNull(connection); + + connection.getStateInspector().assertValid(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); + + // This test needs a remote process exiting without closing the socket + // with soLinger=0 on the socket so it will issue a connection.reset + Process p = SpawnedVMSupport.spawnVM(AmqpNoHearbeatsTest.class.getName(), "testConnectionReset"); + Assert.assertEquals(33, p.waitFor()); + + AmqpSender sender = session.createSender(QUEUE_NAME); + + for (int i = 0; i < 10; i++) { + AmqpMessage msg = new AmqpMessage(); + msg.setBytes(new byte[] {0}); + sender.send(msg); + } + + receiver.flow(20); + + for (int i = 0; i < 10; i++) { + AmqpMessage msg = receiver.receive(1, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + msg.accept(); + } + } + + public static void main(String[] arg) { + if (arg.length > 0 && arg[0].equals("testConnectionReset")) { + try { + AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:5672?transport.soLinger=0"), null, null); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); + receiver.flow(10); + System.exit(33); + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + } + } + }