ARTEMIS-773 closing connections through finally to avoid thread leakage

This commit is contained in:
Clebert Suconic 2016-10-07 10:57:21 -04:00
parent 330ddf0c6b
commit 54b7dcc48e
1 changed files with 137 additions and 130 deletions

View File

@ -485,73 +485,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
try {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
messages.add(message);
}
// Commit half the consumed messages
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.commit();
// Rollback the other half the consumed messages
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.rollback();
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
// Commit the other half the consumed messages
// This is a variation from the .NET client tests which doesn't settle the
// messages in the TX until commit is called but on ActiveMQ they will be
// redispatched regardless and not stay in the acquired state.
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
txnSession.commit();
// The final message should still be pending.
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
receiver.flow(1);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
} finally {
connection.close();
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
messages.add(message);
}
// Commit half the consumed messages
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.commit();
// Rollback the other half the consumed messages
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.rollback();
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
// Commit the other half the consumed messages
// This is a variation from the .NET client tests which doesn't settle the
// messages in the TX until commit is called but on ActiveMQ they will be
// redispatched regardless and not stay in the acquired state.
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
txnSession.commit();
// The final message should still be pending.
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
receiver.flow(1);
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
connection.close();
}
@Test(timeout = 60000)
@ -630,83 +634,86 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
try {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Root TXN session controls all TXN send lifetimes.
AmqpSession txnSession = connection.createSession();
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
assertNotNull(message);
messages.add(message);
}
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setApplicationProperty("msgId", i);
sender.send(message, txnSession.getTransactionId());
}
// Commit half the consumed messages [0, 1, 2, 3, 4]
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
messages.get(i).accept(txnSession, false);
}
txnSession.commit();
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
assertNotNull(message);
messages.add(message);
}
// Rollback the other half the consumed messages [5, 6, 7, 8, 9]
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
messages.get(i).accept(txnSession, false);
}
txnSession.rollback();
// Commit half the consumed messages [0, 1, 2, 3, 4]
txnSession.begin();
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
messages.get(i).accept(txnSession, false);
}
txnSession.commit();
// After rollback messages should still be acquired so we read last sent message [10]
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
// Rollback the other half the consumed messages [5, 6, 7, 8, 9]
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId"));
messages.get(i).accept(txnSession, false);
}
txnSession.rollback();
// Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.commit();
// After rollback messages should still be acquired so we read last sent message [10]
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.release();
}
// The final message [10] should still be pending as we released it previously and committed
// the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
{
// Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
}
txnSession.commit();
// The final message [10] should still be pending as we released it previously and committed
// the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX
{
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.accept();
}
// We should have now drained the Queue
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
assertNotNull(message);
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
message.accept();
if (message != null) {
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
}
assertNull(message);
} finally {
connection.close();
}
// We should have now drained the Queue
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
if (message != null) {
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
}
assertNull(message);
connection.close();
}
@Test(timeout = 60000)