diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 2a28662ce5..9cd6e17446 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; @@ -59,7 +60,7 @@ import org.slf4j.LoggerFactory; */ public class DefaultJDBCAdapter implements JDBCAdapter { private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); - public static final int MAX_ROWS = 10000; + public static final int MAX_ROWS = BaseDestination.MAX_PAGE_SIZE; protected Statements statements; protected boolean batchStatments = true; protected boolean prioritizedMessages; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index 2f5fb5f99b..1603ad2515 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -414,7 +414,9 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { priority = message.getJMSPriority(); } catch (JMSException ignored) {} if (!messageLists.containsKey(priority)) { - messageLists.put(priority, new MessageIdList()); + MessageIdList perPriorityList = new MessageIdList(); + perPriorityList.setParent(allMessagesList); + messageLists.put(priority, perPriorityList); } messageLists.get(priority).onMessage(message); if (count.incrementAndGet() == 1) { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java index d6f2a4b896..53c2ffd5a4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java @@ -31,6 +31,8 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; @@ -144,9 +146,17 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup } setDefaultPersistenceAdapter(broker); + /* use maxPageSize policy in place of always pulling from the broker in maxRows chunks if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { - ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000); - } + ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000); + }*/ + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setMaxPageSize(5000); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); }