https://issues.apache.org/jira/browse/AMQ-4656 - fix regression for FilePendingDurableSubscriberMessageStoragePolicy

This commit is contained in:
gtully 2014-03-11 15:18:08 +00:00
parent c8a5fb769e
commit 6aaf859d22
2 changed files with 24 additions and 1 deletions

View File

@ -29,6 +29,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -160,7 +161,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
synchronized (pendingLock) {
if (!((StoreDurableSubscriberCursor) pending).isStarted() || !keepDurableSubsActive) {
if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.setMaxAuditDepth(getMaxAuditDepth());

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.bugs;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -31,12 +32,20 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(value = Parameterized.class)
public class AMQ4656Test {
private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4656Test.class);
@ -45,9 +54,22 @@ public class AMQ4656Test {
private String connectionUri;
@Parameterized.Parameter
public PendingDurableSubscriberMessageStoragePolicy pendingDurableSubPolicy;
@Parameterized.Parameters(name="{0}")
public static Iterable<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{{new FilePendingDurableSubscriberMessageStoragePolicy()},{new StorePendingDurableSubscriberMessageStoragePolicy()}});
}
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setPendingDurableSubscriberPolicy(pendingDurableSubPolicy);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
brokerService.setDeleteAllMessagesOnStartup(true);