This commit is contained in:
Clebert Suconic 2018-06-19 17:58:53 -04:00
commit b7c3e03ca4
1 changed files with 72 additions and 0 deletions

View File

@ -16,15 +16,23 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -80,4 +88,68 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
connection.close();
}
private static final String QUEUE_NAME = "queue://testHeartless";
// This test is validating a scenario where the client will leave with connection reset
// This is done by setting soLinger=0 on the socket, which will make the system to issue a connection.reset instead of sending a
// disconnect.
@Test(timeout = 60000)
public void testCloseConsumerOnConnectionReset() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout());
}
});
AmqpConnection connection = addConnection(client.connect());
assertNotNull(connection);
connection.getStateInspector().assertValid();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(QUEUE_NAME);
// This test needs a remote process exiting without closing the socket
// with soLinger=0 on the socket so it will issue a connection.reset
Process p = SpawnedVMSupport.spawnVM(AmqpNoHearbeatsTest.class.getName(), "testConnectionReset");
Assert.assertEquals(33, p.waitFor());
AmqpSender sender = session.createSender(QUEUE_NAME);
for (int i = 0; i < 10; i++) {
AmqpMessage msg = new AmqpMessage();
msg.setBytes(new byte[] {0});
sender.send(msg);
}
receiver.flow(20);
for (int i = 0; i < 10; i++) {
AmqpMessage msg = receiver.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
msg.accept();
}
}
public static void main(String[] arg) {
if (arg.length > 0 && arg[0].equals("testConnectionReset")) {
try {
AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:5672?transport.soLinger=0"), null, null);
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(QUEUE_NAME);
receiver.flow(10);
System.exit(33);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
}
}