From a52ddb60ca94dff33f7b921ec8aaa8bb86aa14f8 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 4 Nov 2020 16:15:51 -0500 Subject: [PATCH] ARTEMIS-2970 Adding test validaing Broker Connection with socket disconencts and TTL --- .../activemq/artemis/utils/ExecuteUtil.java | 10 +- .../connect/AMQPBridgeDisconnectTest.java | 197 ++++++++++++++++++ 2 files changed, 204 insertions(+), 3 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeDisconnectTest.java diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java index 19a046a7ec..921b5ab82d 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java @@ -40,9 +40,7 @@ public class ExecuteUtil { } public int pid() throws Exception { - Field pidField = process.getClass().getDeclaredField("pid"); - pidField.setAccessible(true); - return (int)pidField.get(process); + return getPID(process); } public int waitFor(long timeout, TimeUnit unit) throws InterruptedException { @@ -68,6 +66,12 @@ public class ExecuteUtil { } + public static int getPID(Process process) throws Exception { + Field pidField = process.getClass().getDeclaredField("pid"); + pidField.setAccessible(true); + return (int)pidField.get(process); + } + private static final Logger logger = Logger.getLogger(ExecuteUtil.class); public static int runCommand(boolean logOutput, String... command) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeDisconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeDisconnectTest.java new file mode 100644 index 0000000000..dd9168fc5d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeDisconnectTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ExecuteUtil; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.junit.Assert; +import org.junit.Test; + +public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + private static String DESTINATION_NAME = "AMQPBridgeReconnectTest"; + + public AMQPBridgeDisconnectTest() { + } + + public static void main(String[] arg) { + try { + AMQPBridgeDisconnectTest reconnect = new AMQPBridgeDisconnectTest(); + if (arg[0].equals("client")) { + reconnect.runExternalClient(); + } else { + reconnect.runExternalSever(); + } + } catch (Throwable var2) { + var2.printStackTrace(); + System.exit(-1); + } + + System.exit(0); + } + + public void runExternalClient() throws Exception { + ActiveMQServer externalServer = this.createServer(5673, false); + externalServer.getConfiguration().setPersistenceEnabled(false); + AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5672"); + connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.RECEIVER).setQueueName(DESTINATION_NAME)); + externalServer.getConfiguration().addAMQPConnection(connectConfiguration); + externalServer.start(); + + while (true) { + System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it"); + Thread.sleep(5000L); + } + } + + public void runExternalSever() throws Exception { + ActiveMQServer externalServer = this.createServer(5673, false); + externalServer.getConfiguration().setPersistenceEnabled(false); + externalServer.start(); + + while (true) { + System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it"); + Thread.sleep(5000L); + } + } + + @Override + protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) { + TransportConfiguration configuration = super.addAcceptorConfiguration(server, port); + configuration.getExtraParams().put("amqpIdleTimeout", "1000"); + return configuration; + } + + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = this.createServer(5672, false); + return server; + } + + @Override + protected ActiveMQServer createServer(int port, boolean start) throws Exception { + ActiveMQServer server = super.createServer(port, start); + server.getConfiguration().setPersistenceEnabled(false); + server.getConfiguration().addAddressConfiguration((new CoreAddressConfiguration()).setName(DESTINATION_NAME).addRoutingType(RoutingType.ANYCAST)); + server.getConfiguration().addQueueConfiguration((new QueueConfiguration(DESTINATION_NAME)).setAddress(DESTINATION_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + return server; + } + + @Test + public void testClientDisconnectAfterKill() throws Exception { + this.testClientDisconnect(false); + } + + @Test + public void testClientDisconnectAfterPausedProcess() throws Exception { + this.testClientDisconnect(true); + } + + public void testClientDisconnect(boolean pause) throws Exception { + this.server.start(); + ActiveMQServer var10000 = this.server; + Wait.assertTrue(var10000::isActive); + Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "client"); + + try { + Queue queue = this.server.locateQueue(DESTINATION_NAME); + Assert.assertNotNull(queue); + Wait.assertEquals(1, queue::getConsumerCount); + Wait.assertEquals(1, () -> { + return this.server.getRemotingService().getConnections().size(); + }); + if (pause) { + int pid = ExecuteUtil.getPID(process); + ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)}); + } else { + process.destroy(); + } + + Wait.assertEquals(0, () -> { + return this.server.getRemotingService().getConnections().size(); + }, 5000L); + Wait.assertEquals(0, queue::getConsumerCount, 5000L); + } finally { + try { + process.destroyForcibly(); + } catch (Exception var10) { + } + + } + + } + + @Test + public void testServerDisconnectAfterKill() throws Exception { + this.testServerDisconnect(false); + } + + @Test + public void testServerDisconnectAfterPausedProcess() throws Exception { + this.testServerDisconnect(true); + } + + public void testServerDisconnect(boolean pause) throws Exception { + AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5673?amqpIdleTimeout=1000"); + connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.SENDER).setQueueName(DESTINATION_NAME)).setRetryInterval(100).setReconnectAttempts(-1); + this.server.getConfiguration().addAMQPConnection(connectConfiguration); + this.server.start(); + Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "server"); + + try { + ActiveMQServer var10000 = this.server; + Wait.assertTrue(var10000::isActive); + Queue queue = this.server.locateQueue(DESTINATION_NAME); + Assert.assertNotNull(queue); + Wait.assertEquals(1, queue::getConsumerCount); + Wait.assertEquals(1, () -> { + return this.server.getRemotingService().getConnections().size(); + }); + if (pause) { + int pid = ExecuteUtil.getPID(process); + ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)}); + } else { + process.destroy(); + } + + Wait.assertEquals(0, () -> { + return this.server.getRemotingService().getConnections().size(); + }, 5000L); + Wait.assertEquals(0, queue::getConsumerCount, 5000L); + } finally { + try { + process.destroyForcibly(); + } catch (Exception var11) { + } + + } + + } +}