This closes #3328
This commit is contained in:
commit
9e77b1a8a4
|
@ -40,9 +40,7 @@ public class ExecuteUtil {
|
|||
}
|
||||
|
||||
public int pid() throws Exception {
|
||||
Field pidField = process.getClass().getDeclaredField("pid");
|
||||
pidField.setAccessible(true);
|
||||
return (int)pidField.get(process);
|
||||
return getPID(process);
|
||||
}
|
||||
|
||||
public int waitFor(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
|
@ -68,6 +66,12 @@ public class ExecuteUtil {
|
|||
|
||||
}
|
||||
|
||||
public static int getPID(Process process) throws Exception {
|
||||
Field pidField = process.getClass().getDeclaredField("pid");
|
||||
pidField.setAccessible(true);
|
||||
return (int)pidField.get(process);
|
||||
}
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ExecuteUtil.class);
|
||||
|
||||
public static int runCommand(boolean logOutput, String... command) throws Exception {
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.amqp.connect;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ExecuteUtil;
|
||||
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AMQPBridgeDisconnectTest extends AmqpClientTestSupport {
|
||||
|
||||
protected static final int AMQP_PORT_2 = 5673;
|
||||
private static String DESTINATION_NAME = "AMQPBridgeReconnectTest";
|
||||
|
||||
public AMQPBridgeDisconnectTest() {
|
||||
}
|
||||
|
||||
public static void main(String[] arg) {
|
||||
try {
|
||||
AMQPBridgeDisconnectTest reconnect = new AMQPBridgeDisconnectTest();
|
||||
if (arg[0].equals("client")) {
|
||||
reconnect.runExternalClient();
|
||||
} else {
|
||||
reconnect.runExternalSever();
|
||||
}
|
||||
} catch (Throwable var2) {
|
||||
var2.printStackTrace();
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
public void runExternalClient() throws Exception {
|
||||
ActiveMQServer externalServer = this.createServer(5673, false);
|
||||
externalServer.getConfiguration().setPersistenceEnabled(false);
|
||||
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5672");
|
||||
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.RECEIVER).setQueueName(DESTINATION_NAME));
|
||||
externalServer.getConfiguration().addAMQPConnection(connectConfiguration);
|
||||
externalServer.start();
|
||||
|
||||
while (true) {
|
||||
System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it");
|
||||
Thread.sleep(5000L);
|
||||
}
|
||||
}
|
||||
|
||||
public void runExternalSever() throws Exception {
|
||||
ActiveMQServer externalServer = this.createServer(5673, false);
|
||||
externalServer.getConfiguration().setPersistenceEnabled(false);
|
||||
externalServer.start();
|
||||
|
||||
while (true) {
|
||||
System.out.println(AMQPBridgeDisconnectTest.class.getName() + " is running a server until someone kills it");
|
||||
Thread.sleep(5000L);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
|
||||
TransportConfiguration configuration = super.addAcceptorConfiguration(server, port);
|
||||
configuration.getExtraParams().put("amqpIdleTimeout", "1000");
|
||||
return configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActiveMQServer createServer() throws Exception {
|
||||
ActiveMQServer server = this.createServer(5672, false);
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActiveMQServer createServer(int port, boolean start) throws Exception {
|
||||
ActiveMQServer server = super.createServer(port, start);
|
||||
server.getConfiguration().setPersistenceEnabled(false);
|
||||
server.getConfiguration().addAddressConfiguration((new CoreAddressConfiguration()).setName(DESTINATION_NAME).addRoutingType(RoutingType.ANYCAST));
|
||||
server.getConfiguration().addQueueConfiguration((new QueueConfiguration(DESTINATION_NAME)).setAddress(DESTINATION_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
|
||||
return server;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDisconnectAfterKill() throws Exception {
|
||||
this.testClientDisconnect(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDisconnectAfterPausedProcess() throws Exception {
|
||||
this.testClientDisconnect(true);
|
||||
}
|
||||
|
||||
public void testClientDisconnect(boolean pause) throws Exception {
|
||||
this.server.start();
|
||||
ActiveMQServer var10000 = this.server;
|
||||
Wait.assertTrue(var10000::isActive);
|
||||
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "client");
|
||||
|
||||
try {
|
||||
Queue queue = this.server.locateQueue(DESTINATION_NAME);
|
||||
Assert.assertNotNull(queue);
|
||||
Wait.assertEquals(1, queue::getConsumerCount);
|
||||
Wait.assertEquals(1, () -> {
|
||||
return this.server.getRemotingService().getConnections().size();
|
||||
});
|
||||
if (pause) {
|
||||
int pid = ExecuteUtil.getPID(process);
|
||||
ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)});
|
||||
} else {
|
||||
process.destroy();
|
||||
}
|
||||
|
||||
Wait.assertEquals(0, () -> {
|
||||
return this.server.getRemotingService().getConnections().size();
|
||||
}, 5000L);
|
||||
Wait.assertEquals(0, queue::getConsumerCount, 5000L);
|
||||
} finally {
|
||||
try {
|
||||
process.destroyForcibly();
|
||||
} catch (Exception var10) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerDisconnectAfterKill() throws Exception {
|
||||
this.testServerDisconnect(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerDisconnectAfterPausedProcess() throws Exception {
|
||||
this.testServerDisconnect(true);
|
||||
}
|
||||
|
||||
public void testServerDisconnect(boolean pause) throws Exception {
|
||||
AMQPBrokerConnectConfiguration connectConfiguration = new AMQPBrokerConnectConfiguration("bridgeTest", "tcp://localhost:5673?amqpIdleTimeout=1000");
|
||||
connectConfiguration.addElement((new AMQPBrokerConnectionElement()).setType(AMQPBrokerConnectionAddressType.SENDER).setQueueName(DESTINATION_NAME)).setRetryInterval(100).setReconnectAttempts(-1);
|
||||
this.server.getConfiguration().addAMQPConnection(connectConfiguration);
|
||||
this.server.start();
|
||||
Process process = SpawnedVMSupport.spawnVM(AMQPBridgeDisconnectTest.class.getName(), true, "server");
|
||||
|
||||
try {
|
||||
ActiveMQServer var10000 = this.server;
|
||||
Wait.assertTrue(var10000::isActive);
|
||||
Queue queue = this.server.locateQueue(DESTINATION_NAME);
|
||||
Assert.assertNotNull(queue);
|
||||
Wait.assertEquals(1, queue::getConsumerCount);
|
||||
Wait.assertEquals(1, () -> {
|
||||
return this.server.getRemotingService().getConnections().size();
|
||||
});
|
||||
if (pause) {
|
||||
int pid = ExecuteUtil.getPID(process);
|
||||
ExecuteUtil.runCommand(true, new String[]{"kill", "-STOP", Integer.toString(pid)});
|
||||
} else {
|
||||
process.destroy();
|
||||
}
|
||||
|
||||
Wait.assertEquals(0, () -> {
|
||||
return this.server.getRemotingService().getConnections().size();
|
||||
}, 5000L);
|
||||
Wait.assertEquals(0, queue::getConsumerCount, 5000L);
|
||||
} finally {
|
||||
try {
|
||||
process.destroyForcibly();
|
||||
} catch (Exception var11) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue