[AMQ-6891] test and fix non tx variant of this leak

This commit is contained in:
gtully 2018-01-30 10:51:18 +00:00
parent 80ef6d3129
commit dd2572bcb1
2 changed files with 43 additions and 6 deletions

View File

@ -774,11 +774,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
public void rollbackPendingCursorAdditions(MessageContext messageContext) {
public void rollbackPendingCursorAdditions(MessageId messageId) {
synchronized (indexOrderedCursorUpdates) {
for (int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) {
MessageContext mc = indexOrderedCursorUpdates.get(i);
if (mc.message.getMessageId().equals(messageContext.message.getMessageId())) {
if (mc.message.getMessageId().equals(messageId)) {
indexOrderedCursorUpdates.remove(mc);
if (mc.onCompletion != null) {
mc.onCompletion.run();
@ -854,7 +854,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
@Override
public void afterRollback() throws Exception {
if (store != null && messageContext.message.isPersistent()) {
rollbackPendingCursorAdditions(messageContext);
rollbackPendingCursorAdditions(messageContext.message.getMessageId());
}
messageContext.message.decrementReferenceCount();
}
@ -888,6 +888,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// before restarting normal broker operations
resetNeeded = true;
pendingSends.decrementAndGet();
rollbackPendingCursorAdditions(message.getMessageId());
throw e;
}
}

View File

@ -182,7 +182,7 @@ public class JmsTransactionCommitFailureTest {
// Set failure flag on persistence adapter
persistenceAdapter.setCommitFailureEnabled(true);
try {
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 10; i++) {
try {
sendMessage(queueName, 2);
} catch (JMSException jmse) {
@ -202,10 +202,44 @@ public class JmsTransactionCommitFailureTest {
}
}
@Test
public void testQueueMemoryLeakNoTx() throws Exception {
String queueName = "testMemoryLeak";
sendMessage(queueName, 1);
// Set failure flag on persistence adapter
persistenceAdapter.setCommitFailureEnabled(true);
try {
for (int i = 0; i < 10; i++) {
try {
sendMessage(queueName, 2, false);
} catch (JMSException jmse) {
// Expected
}
}
} finally {
persistenceAdapter.setCommitFailureEnabled(false);
}
Destination destination = broker.getDestination(new ActiveMQQueue(queueName));
if (destination instanceof org.apache.activemq.broker.region.Queue) {
org.apache.activemq.broker.region.Queue queue = (org.apache.activemq.broker.region.Queue) destination;
Field listField = org.apache.activemq.broker.region.Queue.class.getDeclaredField("indexOrderedCursorUpdates");
listField.setAccessible(true);
List<?> list = (List<?>) listField.get(queue);
Assert.assertEquals(0, list.size());
}
}
private void sendMessage(String queueName, int count) throws JMSException {
sendMessage(queueName, count, true);
}
private void sendMessage(String queueName, int count, boolean transacted) throws JMSException {
Connection con = connectionFactory.createConnection();
try {
Session session = con.createSession(true, Session.SESSION_TRANSACTED);
Session session = con.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
try {
Queue destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
@ -216,7 +250,9 @@ public class JmsTransactionCommitFailureTest {
message.setText("Message-" + messageCounter++);
producer.send(message);
}
session.commit();
if (transacted) {
session.commit();
}
} finally {
producer.close();
}