Add better cleanup of connections on test failure.
This commit is contained in:
Timothy Bish 2016-10-07 18:48:44 -04:00
parent 538ed74510
commit 1ac89543a8
25 changed files with 133 additions and 112 deletions

View File

@ -18,8 +18,11 @@ package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
import org.junit.After;
/**
* Test support class for tests that will be using the AMQP Proton wrapper client.
@ -29,6 +32,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
private String connectorScheme = "amqp";
private boolean useSSL;
private List<AmqpConnection> connections = new ArrayList<AmqpConnection>();
public AmqpClientTestSupport() {
}
@ -37,6 +42,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
this.useSSL = useSSL;
}
@Override
@After
public void tearDown() throws Exception {
for (AmqpConnection connection : connections) {
try {
connection.close();
} catch (Exception ex) {}
}
super.tearDown();
}
public String getConnectorScheme() {
return connectorScheme;
}
@ -135,6 +152,11 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
}
public AmqpConnection trackConnection(AmqpConnection connection) {
connections.add(connection);
return connection;
}
public AmqpConnection createAmqpConnection() throws Exception {
return createAmqpConnection(getBrokerAmqpConnectionURI());
}
@ -148,7 +170,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
return createAmqpClient(brokerURI, username, password).connect();
return trackConnection(createAmqpClient(brokerURI, username, password).connect());
}
public AmqpClient createAmqpClient() throws Exception {

View File

@ -41,7 +41,7 @@ public class AmqpAnonymousSenderTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
client.setTraceFrames(false);
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender();

View File

@ -77,7 +77,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
assertNotNull(connection);
connection.getStateInspector().assertValid();
@ -98,7 +98,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setIdleTimeout(TEST_IDLE_TIMEOUT * 4);
assertNotNull(connection);
@ -115,7 +115,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
assertNotNull(connection);
connection.setIdleProcessingDisabled(true);
@ -151,7 +151,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
assertNotNull(connection);
connection.setListener(new AmqpConnectionListener() {

View File

@ -76,7 +76,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
assertNotNull(connection);
connection.getStateInspector().assertValid();
@ -91,7 +91,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
assertNotNull(connection);

View File

@ -70,7 +70,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
try {
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setConnectTimeout(3000);
connection.connect();
fail("Should not be able to create one more connection");

View File

@ -73,7 +73,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
assertNotNull(connection);
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
@ -127,7 +127,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
assertNotNull(connection);
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
@ -156,7 +156,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
assertNotNull(connection);
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
@ -172,8 +172,8 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection1 = client.createConnection();
AmqpConnection connection2 = client.createConnection();
AmqpConnection connection1 = trackConnection(client.createConnection());
AmqpConnection connection2 = trackConnection(client.createConnection());
connection1.setContainerId(getTestName() + "-Client:1");
connection2.setContainerId(getTestName() + "-Client:2");
@ -196,8 +196,8 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection1 = client.createConnection();
AmqpConnection connection2 = client.createConnection();
AmqpConnection connection1 = trackConnection(client.createConnection());
AmqpConnection connection2 = trackConnection(client.createConnection());
connection1.setContainerId(getTestName());
connection2.setContainerId(getTestName());

View File

@ -132,7 +132,7 @@ public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport {
public void doTestCorrelationIdPreservation(Object messageId) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -161,7 +161,7 @@ public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport {
public void doTestCorrelationIdPreservationOnBrokerRestart(Object messageId) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());

View File

@ -44,7 +44,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
random.setSeed(System.nanoTime());
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId("ClientID:" + getTestName());
connection.connect();
@ -83,7 +83,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
random.setSeed(System.nanoTime());
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId("ClientID:" + getTestName());
connection.connect();
@ -122,7 +122,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
random.setSeed(System.nanoTime());
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId("ClientID:" + getTestName());
connection.connect();

View File

@ -69,7 +69,7 @@ public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());

View File

@ -77,7 +77,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageWithDescribedTypeInBody() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -103,7 +103,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -133,7 +133,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
public void testDescribedTypeMessageRoundTrips() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
// Send with AMQP client.

View File

@ -52,7 +52,7 @@ public class AmqpDisabledInactivityMonitorTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
assertNotNull(connection);
connection.getStateInspector().assertValid();
@ -64,7 +64,7 @@ public class AmqpDisabledInactivityMonitorTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
assertNotNull(connection);

View File

@ -56,7 +56,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
public void testCreateDurableReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -74,7 +74,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
public void testDetachedDurableReceiverRemainsActive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -98,7 +98,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
public void testCloseDurableReceiverRemovesSubscription() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -123,7 +123,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -157,7 +157,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -200,7 +200,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -249,7 +249,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -298,7 +298,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();
@ -354,7 +354,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setContainerId(getTestName());
connection.connect();

View File

@ -80,7 +80,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
final CountDownLatch failed = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setListener(new AmqpConnectionListener() {
@Override

View File

@ -132,7 +132,7 @@ public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
public void doTestMessageIdPreservation(Object messageId) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -161,7 +161,7 @@ public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
public void doTestMessageIdPreservationOnBrokerRestart(Object messageId) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());

View File

@ -42,7 +42,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
@ -68,7 +68,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
public void testPullWithNoMessageGetDrained() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
@ -94,7 +94,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
@ -122,7 +122,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleZeroResultPulls() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());

View File

@ -64,7 +64,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateQueueReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -104,7 +104,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -141,7 +141,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
}
});
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -157,7 +157,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateTopicReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getTopics().length);
@ -178,7 +178,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), 1, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
@ -203,7 +203,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
@ -230,7 +230,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
@ -271,7 +271,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
@ -308,7 +308,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
@ -367,7 +367,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
@ -447,7 +447,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -485,7 +485,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
sendMessages(getTestName(), msgCount, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());

View File

@ -82,7 +82,7 @@ public class AmqpSaslPlainTest extends AmqpClientTestSupport {
client.setMechanismRestriction(PlainMechanism.MECH_NAME);
// Expect connection to succeed
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Exercise it for verification
exerciseConnection(connection);

View File

@ -60,7 +60,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -88,7 +88,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendRecvWithDeliveryTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -130,7 +130,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendScheduledReceiveOverOpenWire() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -165,7 +165,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Test
public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -206,7 +206,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
final int NUMBER = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -276,5 +276,4 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
return scheduler;
}
}

View File

@ -63,7 +63,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
public void testSimpleSendOneReceiveOne() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -95,7 +95,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final int MSG_COUNT = 20;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -137,7 +137,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiveWithJMSSelectorFilter() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpMessage message1 = new AmqpMessage();
@ -173,7 +173,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final int MSG_COUNT = 20;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -235,7 +235,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final int MSG_COUNT = 20;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -319,7 +319,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
final String address;
@ -382,7 +382,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
public void run() {
try {
LOG.info("Starting consumer connection");
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
@ -412,7 +412,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
public void run() {
try {
receiverReady.await(20, TimeUnit.SECONDS);
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
@ -437,7 +437,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageDurabliltyFollowsSpec() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -480,7 +480,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageToQueueNoPrefixReceiveWithPrefix() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageToQueueWithPrefixReceiveWithNoPrefix() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -558,7 +558,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
final String address;
@ -621,7 +621,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final int MSG_COUNT = 100;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = "queue://" + getTestName();

View File

@ -39,7 +39,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateQueueSender() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getQueues().length);
@ -58,7 +58,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateTopicSender() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertEquals(0, brokerService.getAdminView().getTopics().length);
@ -77,7 +77,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageToQueue() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -100,7 +100,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
final int MSG_COUNT = 100;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -124,7 +124,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
final int MSG_COUNT = 1000;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("topic://" + getTestName(), false);
@ -157,7 +157,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
final int MSG_COUNT = 1000;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("topic://" + getTestName(), true);

View File

@ -32,7 +32,7 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
@Test
public void testCreateSession() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
connection.close();

View File

@ -58,7 +58,7 @@ public class AmqpSlowReceiverTest extends AmqpClientTestSupport {
public void testSlowConsumerIsAborted() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(100);
@ -87,7 +87,7 @@ public class AmqpSlowReceiverTest extends AmqpClientTestSupport {
strategy.setMaxSlowDuration(60*1000); // so jmx does the abort
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(100);

View File

@ -81,7 +81,7 @@ public class AmqpSocketProxyIdleTimeoutTests extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
assertNotNull(connection);
@ -125,7 +125,7 @@ public class AmqpSocketProxyIdleTimeoutTests extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
AmqpConnection connection = trackConnection(client.createConnection());
connection.setCloseTimeout(1000); // Socket will have silently gone away, don't wait to long.
assertNotNull(connection);

View File

@ -61,7 +61,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
protected void doTestCannotCreateSenderWithNamedTempDestination(boolean topic) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
String address = null;
@ -92,7 +92,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
protected void doTestCannotCreateReceiverWithNamedTempDestination(boolean topic) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
String address = null;
@ -126,7 +126,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
@ -157,7 +157,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
@ -196,7 +196,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);
@ -227,7 +227,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);
@ -266,7 +266,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(target);
@ -319,7 +319,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
final BrokerViewMBean brokerView = getProxyToBroker();
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);

View File

@ -47,7 +47,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testBeginAndCommitTransaction() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
@ -61,7 +61,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testBeginAndRollbackTransaction() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
@ -75,7 +75,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageToQueueWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -99,7 +99,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageToQueueWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -123,7 +123,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiveMessageWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -154,7 +154,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiveAfterConnectionClose() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
@ -198,7 +198,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiveMessageWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -229,7 +229,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Load up the Queue with some messages
{
@ -286,7 +286,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Load up the Queue with some messages
{
@ -343,7 +343,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
@ -383,7 +383,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
@ -453,7 +453,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
private void doTestAcceptedButNotSettledInTXRemainsAquired(Outcome outcome) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -521,7 +521,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
public void testTransactionallyAcquiredMessageCanBeTransactionallyConsumed() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
@ -584,7 +584,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
final int NUM_MESSAGES = 5;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
@ -637,7 +637,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
@ -713,7 +713,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
@ -782,7 +782,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
@ -859,7 +859,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
final int NUM_MESSAGES = 10;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpConnection connection = trackConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();