mirror of https://github.com/apache/activemq.git
Add better cleanup of connections on test failure.
(cherry picked from commit 1ac89543a8
)
This commit is contained in:
parent
37c20ed19f
commit
c42f81514b
|
@ -18,8 +18,11 @@ package org.apache.activemq.transport.amqp.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.transport.amqp.AmqpTestSupport;
|
||||
import org.junit.After;
|
||||
|
||||
/**
|
||||
* Test support class for tests that will be using the AMQP Proton wrapper client.
|
||||
|
@ -29,6 +32,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
private String connectorScheme = "amqp";
|
||||
private boolean useSSL;
|
||||
|
||||
private List<AmqpConnection> connections = new ArrayList<AmqpConnection>();
|
||||
|
||||
public AmqpClientTestSupport() {
|
||||
}
|
||||
|
||||
|
@ -37,6 +42,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
this.useSSL = useSSL;
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (AmqpConnection connection : connections) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Exception ex) {}
|
||||
}
|
||||
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
public String getConnectorScheme() {
|
||||
return connectorScheme;
|
||||
}
|
||||
|
@ -135,6 +152,11 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
public AmqpConnection trackConnection(AmqpConnection connection) {
|
||||
connections.add(connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
public AmqpConnection createAmqpConnection() throws Exception {
|
||||
return createAmqpConnection(getBrokerAmqpConnectionURI());
|
||||
}
|
||||
|
@ -148,7 +170,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
}
|
||||
|
||||
public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
|
||||
return createAmqpClient(brokerURI, username, password).connect();
|
||||
return trackConnection(createAmqpClient(brokerURI, username, password).connect());
|
||||
}
|
||||
|
||||
public AmqpClient createAmqpClient() throws Exception {
|
||||
|
|
|
@ -41,7 +41,7 @@ public class AmqpAnonymousSenderTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
client.setTraceFrames(false);
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender();
|
||||
|
|
|
@ -77,7 +77,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
assertNotNull(connection);
|
||||
|
||||
connection.getStateInspector().assertValid();
|
||||
|
@ -98,7 +98,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setIdleTimeout(TEST_IDLE_TIMEOUT * 4);
|
||||
assertNotNull(connection);
|
||||
|
||||
|
@ -115,7 +115,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
assertNotNull(connection);
|
||||
|
||||
connection.setIdleProcessingDisabled(true);
|
||||
|
@ -151,7 +151,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
assertNotNull(connection);
|
||||
|
||||
connection.setListener(new AmqpConnectionListener() {
|
||||
|
|
|
@ -76,7 +76,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
assertNotNull(connection);
|
||||
|
||||
connection.getStateInspector().assertValid();
|
||||
|
@ -91,7 +91,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
|
||||
assertNotNull(connection);
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ public class AmqpConfiguredMaxConnectionsTest extends AmqpClientTestSupport {
|
|||
assertEquals(MAX_CONNECTIONS, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
||||
try {
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setConnectTimeout(3000);
|
||||
connection.connect();
|
||||
fail("Should not be able to create one more connection");
|
||||
|
|
|
@ -73,7 +73,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
assertNotNull(connection);
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
@ -127,7 +127,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
assertNotNull(connection);
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
@ -156,7 +156,7 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
assertNotNull(connection);
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
@ -172,8 +172,8 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection1 = client.createConnection();
|
||||
AmqpConnection connection2 = client.createConnection();
|
||||
AmqpConnection connection1 = trackConnection(client.createConnection());
|
||||
AmqpConnection connection2 = trackConnection(client.createConnection());
|
||||
|
||||
connection1.setContainerId(getTestName() + "-Client:1");
|
||||
connection2.setContainerId(getTestName() + "-Client:2");
|
||||
|
@ -196,8 +196,8 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection1 = client.createConnection();
|
||||
AmqpConnection connection2 = client.createConnection();
|
||||
AmqpConnection connection1 = trackConnection(client.createConnection());
|
||||
AmqpConnection connection2 = trackConnection(client.createConnection());
|
||||
|
||||
connection1.setContainerId(getTestName());
|
||||
connection2.setContainerId(getTestName());
|
||||
|
|
|
@ -132,7 +132,7 @@ public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport {
|
|||
|
||||
public void doTestCorrelationIdPreservation(Object messageId) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -161,7 +161,7 @@ public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport {
|
|||
|
||||
public void doTestCorrelationIdPreservationOnBrokerRestart(Object messageId) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
|
|
@ -44,7 +44,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
|
|||
random.setSeed(System.nanoTime());
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
|
@ -83,7 +83,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
|
|||
random.setSeed(System.nanoTime());
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
|
@ -122,7 +122,7 @@ public class AmqpCorruptedFrameHandlingTest extends AmqpClientTestSupport {
|
|||
random.setSeed(System.nanoTime());
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
|
||||
connection.setContainerId("ClientID:" + getTestName());
|
||||
connection.connect();
|
||||
|
|
|
@ -69,7 +69,7 @@ public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
|
|
@ -77,7 +77,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendMessageWithDescribedTypeInBody() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -103,7 +103,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
|
|||
public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -133,7 +133,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
|
|||
public void testDescribedTypeMessageRoundTrips() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
// Send with AMQP client.
|
||||
|
|
|
@ -52,7 +52,7 @@ public class AmqpDisabledInactivityMonitorTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
assertNotNull(connection);
|
||||
|
||||
connection.getStateInspector().assertValid();
|
||||
|
@ -64,7 +64,7 @@ public class AmqpDisabledInactivityMonitorTest extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
|
||||
assertNotNull(connection);
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
public void testCreateDurableReceiver() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -74,7 +74,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
public void testDetachedDurableReceiverRemainsActive() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -98,7 +98,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
public void testCloseDurableReceiverRemovesSubscription() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -123,7 +123,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -157,7 +157,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -200,7 +200,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -249,7 +249,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -298,7 +298,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
@ -354,7 +354,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setContainerId(getTestName());
|
||||
connection.connect();
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
|
|||
final CountDownLatch failed = new CountDownLatch(1);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setListener(new AmqpConnectionListener() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -132,7 +132,7 @@ public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
|
|||
|
||||
public void doTestMessageIdPreservation(Object messageId) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -161,7 +161,7 @@ public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
|
|||
|
||||
public void doTestMessageIdPreservationOnBrokerRestart(Object messageId) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
|
|
@ -42,7 +42,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -68,7 +68,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
public void testPullWithNoMessageGetDrained() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -94,7 +94,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -122,7 +122,7 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testMultipleZeroResultPulls() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
|
|
@ -64,7 +64,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testCreateQueueReceiver() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -104,7 +104,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -141,7 +141,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -157,7 +157,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testCreateTopicReceiver() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getTopics().length);
|
||||
|
@ -178,7 +178,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), 1, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -203,7 +203,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
|
||||
|
@ -230,7 +230,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
|
||||
|
@ -271,7 +271,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -308,7 +308,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -367,7 +367,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
|
||||
|
@ -447,7 +447,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
source.setDurable(TerminusDurability.NONE);
|
||||
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -485,7 +485,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
sendMessages(getTestName(), msgCount, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
|
|
@ -82,7 +82,7 @@ public class AmqpSaslPlainTest extends AmqpClientTestSupport {
|
|||
client.setMechanismRestriction(PlainMechanism.MECH_NAME);
|
||||
|
||||
// Expect connection to succeed
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Exercise it for verification
|
||||
exerciseConnection(connection);
|
||||
|
|
|
@ -60,7 +60,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -88,7 +88,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendRecvWithDeliveryTime() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -130,7 +130,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendScheduledReceiveOverOpenWire() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -165,7 +165,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
|||
@Test
|
||||
public void testScheduleWithDelay() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -206,7 +206,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
|||
final int NUMBER = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -276,5 +276,4 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
|
|||
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
public void testSimpleSendOneReceiveOne() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -95,7 +95,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 20;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -137,7 +137,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testReceiveWithJMSSelectorFilter() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpMessage message1 = new AmqpMessage();
|
||||
|
@ -173,7 +173,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 20;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -235,7 +235,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 20;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -319,7 +319,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
final String address;
|
||||
|
@ -382,7 +382,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
public void run() {
|
||||
try {
|
||||
LOG.info("Starting consumer connection");
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpReceiver receiver = session.createReceiver(address);
|
||||
receiver.flow(1);
|
||||
|
@ -412,7 +412,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
public void run() {
|
||||
try {
|
||||
receiverReady.await(20, TimeUnit.SECONDS);
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(address);
|
||||
|
@ -437,7 +437,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testMessageDurabliltyFollowsSpec() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -480,7 +480,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendMessageToQueueNoPrefixReceiveWithPrefix() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendMessageToQueueWithPrefixReceiveWithNoPrefix() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -558,7 +558,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
final String address;
|
||||
|
@ -621,7 +621,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 100;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
final String address = "queue://" + getTestName();
|
||||
|
|
|
@ -39,7 +39,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testCreateQueueSender() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getQueues().length);
|
||||
|
@ -58,7 +58,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testCreateTopicSender() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
assertEquals(0, brokerService.getAdminView().getTopics().length);
|
||||
|
@ -77,7 +77,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendMessageToQueue() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -100,7 +100,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 100;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -124,7 +124,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 1000;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("topic://" + getTestName(), false);
|
||||
|
@ -157,7 +157,7 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
|
|||
final int MSG_COUNT = 1000;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("topic://" + getTestName(), true);
|
||||
|
|
|
@ -32,7 +32,7 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
|
|||
@Test
|
||||
public void testCreateSession() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
assertNotNull(session);
|
||||
connection.close();
|
||||
|
|
|
@ -58,7 +58,7 @@ public class AmqpSlowReceiverTest extends AmqpClientTestSupport {
|
|||
public void testSlowConsumerIsAborted() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(100);
|
||||
|
@ -87,7 +87,7 @@ public class AmqpSlowReceiverTest extends AmqpClientTestSupport {
|
|||
strategy.setMaxSlowDuration(60*1000); // so jmx does the abort
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
final AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(100);
|
||||
|
|
|
@ -81,7 +81,7 @@ public class AmqpSocketProxyIdleTimeoutTests extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
|
||||
assertNotNull(connection);
|
||||
|
||||
|
@ -125,7 +125,7 @@ public class AmqpSocketProxyIdleTimeoutTests extends AmqpClientTestSupport {
|
|||
AmqpClient client = createAmqpClient();
|
||||
assertNotNull(client);
|
||||
|
||||
AmqpConnection connection = client.createConnection();
|
||||
AmqpConnection connection = trackConnection(client.createConnection());
|
||||
connection.setCloseTimeout(1000); // Socket will have silently gone away, don't wait to long.
|
||||
assertNotNull(connection);
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
protected void doTestCannotCreateSenderWithNamedTempDestination(boolean topic) throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
String address = null;
|
||||
|
@ -92,7 +92,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
protected void doTestCannotCreateReceiverWithNamedTempDestination(boolean topic) throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
String address = null;
|
||||
|
@ -126,7 +126,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(target);
|
||||
|
@ -157,7 +157,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(target);
|
||||
|
@ -196,7 +196,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(source);
|
||||
|
@ -227,7 +227,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(source);
|
||||
|
@ -266,7 +266,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(target);
|
||||
|
@ -319,7 +319,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport {
|
|||
final BrokerViewMBean brokerView = getProxyToBroker();
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(source);
|
||||
|
|
|
@ -47,7 +47,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 30000)
|
||||
public void testBeginAndCommitTransaction() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
assertNotNull(session);
|
||||
|
||||
|
@ -61,7 +61,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 30000)
|
||||
public void testBeginAndRollbackTransaction() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
assertNotNull(session);
|
||||
|
||||
|
@ -75,7 +75,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendMessageToQueueWithCommit() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -99,7 +99,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testSendMessageToQueueWithRollback() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -123,7 +123,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testReceiveMessageWithCommit() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -154,7 +154,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testReceiveAfterConnectionClose() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
@ -198,7 +198,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testReceiveMessageWithRollback() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -229,7 +229,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Load up the Queue with some messages
|
||||
{
|
||||
|
@ -286,7 +286,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Load up the Queue with some messages
|
||||
{
|
||||
|
@ -343,7 +343,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
@ -383,7 +383,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
@Test(timeout = 60000)
|
||||
public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
@ -453,7 +453,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
private void doTestAcceptedButNotSettledInTXRemainsAquired(Outcome outcome) throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -521,7 +521,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
public void testTransactionallyAcquiredMessageCanBeTransactionallyConsumed() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
@ -584,7 +584,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
final int NUM_MESSAGES = 5;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
@ -637,7 +637,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
final int NUM_MESSAGES = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
@ -713,7 +713,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
final int NUM_MESSAGES = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
@ -782,7 +782,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
final int NUM_MESSAGES = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
@ -859,7 +859,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
final int NUM_MESSAGES = 10;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
|
||||
// Root TXN session controls all TXN send lifetimes.
|
||||
AmqpSession txnSession = connection.createSession();
|
||||
|
|
Loading…
Reference in New Issue