ARTEMIS-2969 / ARTEMIS-2937 Controlling connecting state on AMQP Broker Connection

- Fixed an issue where I needed to set connection to null after closing it
- Added more tests on QpidDispatchPeerTest (tests i would have done manually, and reproduced a few issues along the way)
This commit is contained in:
Clebert Suconic 2020-11-02 07:34:35 -05:00
parent 8bf8bbaf77
commit c0b12b14c8
3 changed files with 61 additions and 14 deletions

View File

@ -22,6 +22,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
public class ExecuteUtil {
@ -38,6 +39,12 @@ public class ExecuteUtil {
inputStreamReader.join();
}
public int pid() throws Exception {
Field pidField = process.getClass().getDeclaredField("pid");
pidField.setAccessible(true);
return (int)pidField.get(process);
}
public int waitFor(long timeout, TimeUnit unit) throws InterruptedException {
if (!process.waitFor(timeout, unit)) {
logger.warn("could not complete execution in time");

View File

@ -92,6 +92,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
private volatile boolean started = false;
private final AMQPBrokerConnectionManager bridgeManager;
private int retryCounter = 0;
private boolean connecting = false;
private volatile ScheduledFuture reconnectFuture;
private Set<Queue> senders = new HashSet<>();
private Set<Queue> receivers = new HashSet<>();
@ -209,6 +210,8 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
private void doConnect() {
try {
connecting = true;
List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
TransportConfiguration tpConfig = configurationList.get(0);
@ -283,6 +286,8 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
protonRemotingConnection.getAmqpConnection().flush();
bridgeManager.connected(connection, this);
connecting = false;
} catch (Throwable e) {
error(e);
}
@ -464,6 +469,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
protected void error(Throwable e) {
connecting = false;
logger.warn(e.getMessage(), e);
redoConnection();
}
@ -511,16 +517,29 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
private void redoConnection() {
try {
if (connection != null) {
connection.close();
// we need to use the connectExecutor to initiate a redoConnection
// otherwise we would need to add synchronized blocks along this class
// to control when connecting becomes true and when it becomes false
// keeping a single executor thread to this purpose would simplify things
connectExecutor.execute(() -> {
if (connecting) {
logger.debug("Broker connection " + this.getName() + " was already in retry mode, exception or retry no captured");
return;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
connecting = true;
retryConnection();
try {
if (connection != null) {
connection.close();
connection = null;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
retryConnection();
});
}
@Override

View File

@ -83,32 +83,49 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
qpidProcess.kill();
}
public void pauseThenKill(int timeToWait) throws Exception {
int pid = qpidProcess.pid();
int result = ExecuteUtil.runCommand(true, "kill", "-STOP", Long.toString(pid));
Assert.assertEquals(0, result);
logger.info("\n*******************************************************************************************************************************\n" +
"Paused" +
"\n*******************************************************************************************************************************");
Thread.sleep(timeToWait);
result = ExecuteUtil.runCommand(true, "kill", "-9", Long.toString(pid));
Assert.assertEquals(0, result);
}
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
internalMultipleQueues(true, true, true);
internalMultipleQueues(true, true, true, false);
}
@Test
public void testWithMatchingDifferentNamesOnQueuePause() throws Exception {
internalMultipleQueues(true, true, false, true);
}
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueue() throws Exception {
internalMultipleQueues(true, true, false);
internalMultipleQueues(true, true, false, false);
}
@Test(timeout = 60_000)
public void testWithMatching() throws Exception {
internalMultipleQueues(true, false, false);
internalMultipleQueues(true, false, false, false);
}
@Test(timeout = 60_000)
public void testwithQueueName() throws Exception {
internalMultipleQueues(false, false, false);
internalMultipleQueues(false, false, false, false);
}
@Test(timeout = 60_000)
public void testwithQueueNameDistinctName() throws Exception {
internalMultipleQueues(false, true, false);
internalMultipleQueues(false, true, false, false);
}
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill) throws Exception {
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill, boolean pause) throws Exception {
final int numberOfMessages = 100;
final int numberOfQueues = 10;
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1);
@ -148,10 +165,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
}
if (kill) {
stopQpidRouter();
qpidProcess.kill();
startQpidRouter();
} else if (pause) {
pauseThenKill(3_000);
startQpidRouter();
}
for (int dest = 0; dest < numberOfQueues; dest++) {
ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
Connection connectionConsumer = createConnectionDumbRetry(factoryConsumer);