ARTEMIS-2970 Improving test on Broker Connection
This commit is contained in:
parent
9e77b1a8a4
commit
030baaa2d3
|
@ -45,9 +45,9 @@ public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport {
|
|||
try {
|
||||
AMQPBridgeDisconnectTest reconnect = new AMQPBridgeDisconnectTest();
|
||||
if (arg[0].equals("client")) {
|
||||
reconnect.runExternalClient();
|
||||
reconnect.runExternal(true);
|
||||
} else {
|
||||
reconnect.runExternalSever();
|
||||
reconnect.runExternal(false);
|
||||
}
|
||||
} catch (Throwable var2) {
|
||||
var2.printStackTrace();
|
||||
|
@ -57,23 +57,14 @@ public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport {
|
|||
System.exit(0);
|
||||
}
|
||||
|
||||
public void runExternalClient() throws Exception {
|
||||
ActiveMQServer externalServer = this.createServer(5673, false);
|
||||
public void runExternal(boolean startClient) throws Exception {
|
||||
ActiveMQServer externalServer = this.createServer(AMQP_PORT_2, false);
|
||||
externalServer.getConfiguration().setPersistenceEnabled(false);
|
||||
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5672");
|
||||
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.RECEIVER).setQueueName(DESTINATION_NAME));
|
||||
externalServer.getConfiguration().addAMQPConnection(connectConfiguration);
|
||||
externalServer.start();
|
||||
|
||||
while (true) {
|
||||
System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it");
|
||||
Thread.sleep(5000L);
|
||||
if (startClient) {
|
||||
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1);
|
||||
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.RECEIVER).setQueueName(DESTINATION_NAME));
|
||||
externalServer.getConfiguration().addAMQPConnection(connectConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
public void runExternalSever() throws Exception {
|
||||
ActiveMQServer externalServer = this.createServer(5673, false);
|
||||
externalServer.getConfiguration().setPersistenceEnabled(false);
|
||||
externalServer.start();
|
||||
|
||||
while (true) {
|
||||
|
@ -91,7 +82,7 @@ public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport {
|
|||
|
||||
@Override
|
||||
protected ActiveMQServer createServer() throws Exception {
|
||||
ActiveMQServer server = this.createServer(5672, false);
|
||||
ActiveMQServer server = this.createServer(AMQP_PORT, false);
|
||||
return server;
|
||||
}
|
||||
|
||||
|
@ -106,64 +97,32 @@ public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport {
|
|||
|
||||
@Test
|
||||
public void testClientDisconnectAfterKill() throws Exception {
|
||||
this.testClientDisconnect(false);
|
||||
this.testDisconnect(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDisconnectAfterPausedProcess() throws Exception {
|
||||
this.testClientDisconnect(true);
|
||||
}
|
||||
|
||||
public void testClientDisconnect(boolean pause) throws Exception {
|
||||
this.server.start();
|
||||
ActiveMQServer var10000 = this.server;
|
||||
Wait.assertTrue(var10000::isActive);
|
||||
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "client");
|
||||
|
||||
try {
|
||||
Queue queue = this.server.locateQueue(DESTINATION_NAME);
|
||||
Assert.assertNotNull(queue);
|
||||
Wait.assertEquals(1, queue::getConsumerCount);
|
||||
Wait.assertEquals(1, () -> {
|
||||
return this.server.getRemotingService().getConnections().size();
|
||||
});
|
||||
if (pause) {
|
||||
int pid = ExecuteUtil.getPID(process);
|
||||
ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)});
|
||||
} else {
|
||||
process.destroy();
|
||||
}
|
||||
|
||||
Wait.assertEquals(0, () -> {
|
||||
return this.server.getRemotingService().getConnections().size();
|
||||
}, 5000L);
|
||||
Wait.assertEquals(0, queue::getConsumerCount, 5000L);
|
||||
} finally {
|
||||
try {
|
||||
process.destroyForcibly();
|
||||
} catch (Exception var10) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
this.testDisconnect(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerDisconnectAfterKill() throws Exception {
|
||||
this.testServerDisconnect(false);
|
||||
this.testDisconnect(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerDisconnectAfterPausedProcess() throws Exception {
|
||||
this.testServerDisconnect(true);
|
||||
this.testDisconnect(true, true);
|
||||
}
|
||||
|
||||
public void testServerDisconnect(boolean pause) throws Exception {
|
||||
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5673?amqpIdleTimeout=1000");
|
||||
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.SENDER).setQueueName(DESTINATION_NAME)).setRetryInterval(100).setReconnectAttempts(-1);
|
||||
this.server.getConfiguration().addAMQPConnection(connectConfiguration);
|
||||
public void testDisconnect(boolean pause, boolean startClient) throws Exception {
|
||||
if (startClient) {
|
||||
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:" + AMQP_PORT_2 + "?amqpIdleTimeout=1000");
|
||||
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.SENDER).setQueueName(DESTINATION_NAME)).setRetryInterval(100).setReconnectAttempts(-1);
|
||||
this.server.getConfiguration().addAMQPConnection(connectConfiguration);
|
||||
}
|
||||
this.server.start();
|
||||
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "server");
|
||||
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, startClient ? "server" : "client");
|
||||
|
||||
try {
|
||||
ActiveMQServer var10000 = this.server;
|
||||
|
|
Loading…
Reference in New Issue