ARTEMIS-2970 Adding test validaing Broker Connection with socket disconencts and TTL

This commit is contained in:
Clebert Suconic 2020-11-04 16:15:51 -05:00
parent 9896b994b1
commit a52ddb60ca
2 changed files with 204 additions and 3 deletions

View File

@ -40,9 +40,7 @@ public class ExecuteUtil {
} }
public int pid() throws Exception { public int pid() throws Exception {
Field pidField = process.getClass().getDeclaredField("pid"); return getPID(process);
pidField.setAccessible(true);
return (int)pidField.get(process);
} }
public int waitFor(long timeout, TimeUnit unit) throws InterruptedException { public int waitFor(long timeout, TimeUnit unit) throws InterruptedException {
@ -68,6 +66,12 @@ public class ExecuteUtil {
} }
public static int getPID(Process process) throws Exception {
Field pidField = process.getClass().getDeclaredField("pid");
pidField.setAccessible(true);
return (int)pidField.get(process);
}
private static final Logger logger = Logger.getLogger(ExecuteUtil.class); private static final Logger logger = Logger.getLogger(ExecuteUtil.class);
public static int runCommand(boolean logOutput, String... command) throws Exception { public static int runCommand(boolean logOutput, String... command) throws Exception {

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ExecuteUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.junit.Assert;
import org.junit.Test;
public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport {
protected static final int AMQP_PORT_2 = 5673;
private static String DESTINATION_NAME = "AMQPBridgeReconnectTest";
public AMQPBridgeDisconnectTest() {
}
public static void main(String[] arg) {
try {
AMQPBridgeDisconnectTest reconnect = new AMQPBridgeDisconnectTest();
if (arg[0].equals("client")) {
reconnect.runExternalClient();
} else {
reconnect.runExternalSever();
}
} catch (Throwable var2) {
var2.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
public void runExternalClient() throws Exception {
ActiveMQServer externalServer = this.createServer(5673, false);
externalServer.getConfiguration().setPersistenceEnabled(false);
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5672");
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.RECEIVER).setQueueName(DESTINATION_NAME));
externalServer.getConfiguration().addAMQPConnection(connectConfiguration);
externalServer.start();
while (true) {
System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it");
Thread.sleep(5000L);
}
}
public void runExternalSever() throws Exception {
ActiveMQServer externalServer = this.createServer(5673, false);
externalServer.getConfiguration().setPersistenceEnabled(false);
externalServer.start();
while (true) {
System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it");
Thread.sleep(5000L);
}
}
@Override
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
TransportConfiguration configuration = super.addAcceptorConfiguration(server, port);
configuration.getExtraParams().put("amqpIdleTimeout", "1000");
return configuration;
}
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = this.createServer(5672, false);
return server;
}
@Override
protected ActiveMQServer createServer(int port, boolean start) throws Exception {
ActiveMQServer server = super.createServer(port, start);
server.getConfiguration().setPersistenceEnabled(false);
server.getConfiguration().addAddressConfiguration((new CoreAddressConfiguration()).setName(DESTINATION_NAME).addRoutingType(RoutingType.ANYCAST));
server.getConfiguration().addQueueConfiguration((new QueueConfiguration(DESTINATION_NAME)).setAddress(DESTINATION_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
return server;
}
@Test
public void testClientDisconnectAfterKill() throws Exception {
this.testClientDisconnect(false);
}
@Test
public void testClientDisconnectAfterPausedProcess() throws Exception {
this.testClientDisconnect(true);
}
public void testClientDisconnect(boolean pause) throws Exception {
this.server.start();
ActiveMQServer var10000 = this.server;
Wait.assertTrue(var10000::isActive);
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "client");
try {
Queue queue = this.server.locateQueue(DESTINATION_NAME);
Assert.assertNotNull(queue);
Wait.assertEquals(1, queue::getConsumerCount);
Wait.assertEquals(1, () -> {
return this.server.getRemotingService().getConnections().size();
});
if (pause) {
int pid = ExecuteUtil.getPID(process);
ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)});
} else {
process.destroy();
}
Wait.assertEquals(0, () -> {
return this.server.getRemotingService().getConnections().size();
}, 5000L);
Wait.assertEquals(0, queue::getConsumerCount, 5000L);
} finally {
try {
process.destroyForcibly();
} catch (Exception var10) {
}
}
}
@Test
public void testServerDisconnectAfterKill() throws Exception {
this.testServerDisconnect(false);
}
@Test
public void testServerDisconnectAfterPausedProcess() throws Exception {
this.testServerDisconnect(true);
}
public void testServerDisconnect(boolean pause) throws Exception {
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5673?amqpIdleTimeout=1000");
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.SENDER).setQueueName(DESTINATION_NAME)).setRetryInterval(100).setReconnectAttempts(-1);
this.server.getConfiguration().addAMQPConnection(connectConfiguration);
this.server.start();
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "server");
try {
ActiveMQServer var10000 = this.server;
Wait.assertTrue(var10000::isActive);
Queue queue = this.server.locateQueue(DESTINATION_NAME);
Assert.assertNotNull(queue);
Wait.assertEquals(1, queue::getConsumerCount);
Wait.assertEquals(1, () -> {
return this.server.getRemotingService().getConnections().size();
});
if (pause) {
int pid = ExecuteUtil.getPID(process);
ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)});
} else {
process.destroy();
}
Wait.assertEquals(0, () -> {
return this.server.getRemotingService().getConnections().size();
}, 5000L);
Wait.assertEquals(0, queue::getConsumerCount, 5000L);
} finally {
try {
process.destroyForcibly();
} catch (Exception var11) {
}
}
}
}