Properly decrementing the message reference count in
DemandForwardingBridgeSupport when messages supression is checked for
durable subscriptions
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-10-14 12:15:06 -04:00
parent a9f9d4a4d2
commit 406a34294b
2 changed files with 92 additions and 0 deletions

View File

@ -1158,6 +1158,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
messageEvalContext.setMessageReference(md.getMessage()); messageEvalContext.setMessageReference(md.getMessage());
messageEvalContext.setDestination(md.getDestination()); messageEvalContext.setDestination(md.getDestination());
suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
//AMQ-6465 - Need to decrement the reference count after checking matches() as
//the call above will increment the reference count by 1
messageEvalContext.getMessageReference().decrementReferenceCount();
} }
return suppress; return suppress;
} }

View File

@ -43,9 +43,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.apache.activemq.xbean.BrokerFactoryBean; import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -198,6 +200,93 @@ public class SimpleNetworkTest {
})); }));
} }
//Added for AMQ-6465 to make sure memory usage decreased back to 0 after messages are forwarded
//to the other broker
@Test(timeout = 60 * 1000)
public void testDurableTopicSubForwardMemoryUsage() throws Exception {
// create a remote durable consumer to create demand
MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);
Thread.sleep(1000);
MessageProducer producer = localSession.createProducer(included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
Thread.sleep(1000);
//Make sure stats are set
assertEquals(MESSAGE_COUNT,
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
}
}, 10000, 500));
remoteConsumer.close();
}
//Added for AMQ-6465 to make sure memory usage decreased back to 0 after messages are forwarded
//to the other broker
@Test(timeout = 60 * 1000)
public void testTopicSubForwardMemoryUsage() throws Exception {
// create a remote durable consumer to create demand
MessageConsumer remoteConsumer = remoteSession.createConsumer(included);
Thread.sleep(1000);
MessageProducer producer = localSession.createProducer(included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
Thread.sleep(1000);
//Make sure stats are set
assertEquals(MESSAGE_COUNT,
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
}
}, 10000, 500));
remoteConsumer.close();
}
//Added for AMQ-6465 to make sure memory usage decreased back to 0 after messages are forwarded
//to the other broker
@Test(timeout = 60 * 1000)
public void testQueueSubForwardMemoryUsage() throws Exception {
ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
MessageConsumer remoteConsumer = remoteSession.createConsumer(queue);
Thread.sleep(1000);
MessageProducer producer = localSession.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
Thread.sleep(1000);
//Make sure stats are set
assertEquals(MESSAGE_COUNT,
localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount());
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
}
}, 10000, 500));
remoteConsumer.close();
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testDurableStoreAndForward() throws Exception { public void testDurableStoreAndForward() throws Exception {
// create a remote durable consumer // create a remote durable consumer