This commit is contained in:
Clebert Suconic 2018-06-20 11:39:18 -04:00
commit 1e5971b70e
1 changed files with 16 additions and 11 deletions

View File

@ -40,6 +40,11 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class AmqpNoHearbeatsTest extends AmqpClientTestSupport { public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
private static final int OK = 0x33;
@Parameterized.Parameter(0)
public boolean useOverride;
@Parameterized.Parameters(name = "useOverride={0}") @Parameterized.Parameters(name = "useOverride={0}")
public static Collection<Object[]> parameters() { public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
@ -47,8 +52,6 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
}); });
} }
@Parameterized.Parameter(0)
public boolean useOverride;
@Override @Override
protected void addConfiguration(ActiveMQServer server) { protected void addConfiguration(ActiveMQServer server) {
@ -88,8 +91,6 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
private static final String QUEUE_NAME = "queue://testHeartless";
// This test is validating a scenario where the client will leave with connection reset // This test is validating a scenario where the client will leave with connection reset
// This is done by setting soLinger=0 on the socket, which will make the system to issue a connection.reset instead of sending a // This is done by setting soLinger=0 on the socket, which will make the system to issue a connection.reset instead of sending a
// disconnect. // disconnect.
@ -112,14 +113,14 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
connection.getStateInspector().assertValid(); connection.getStateInspector().assertValid();
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); AmqpReceiver receiver = session.createReceiver(getQueueName());
// This test needs a remote process exiting without closing the socket // This test needs a remote process exiting without closing the socket
// with soLinger=0 on the socket so it will issue a connection.reset // with soLinger=0 on the socket so it will issue a connection.reset
Process p = SpawnedVMSupport.spawnVM(AmqpNoHearbeatsTest.class.getName(), "testConnectionReset"); Process p = SpawnedVMSupport.spawnVM(AmqpNoHearbeatsTest.class.getName(), getTestName(), getQueueName());
Assert.assertEquals(33, p.waitFor()); Assert.assertEquals(OK, p.waitFor());
AmqpSender sender = session.createSender(QUEUE_NAME); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
AmqpMessage msg = new AmqpMessage(); AmqpMessage msg = new AmqpMessage();
@ -137,18 +138,22 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
} }
public static void main(String[] arg) { public static void main(String[] arg) {
if (arg.length > 0 && arg[0].equals("testConnectionReset")) { if (arg.length == 2 && arg[0].startsWith("testCloseConsumerOnConnectionReset")) {
try { try {
String queueName = arg[1];
AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:5672?transport.soLinger=0"), null, null); AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:5672?transport.soLinger=0"), null, null);
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(10); receiver.flow(10);
System.exit(33); System.exit(OK);
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
System.exit(-1); System.exit(-1);
} }
} else {
System.err.println("Test " + arg[0] + " unkown");
System.exit(-2);
} }
} }