From c55f6a84d7d57df248a9a9f1492b3ecdc895a0a5 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 24 Aug 2012 14:21:22 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3557 - Performance of consumption with JDBC persistance and Microsoft SQL Server. Fix up default value for maxRows to allign with default page size. Use a large maxPageSize via destination policy for durable subs that have sparse selectors. Same as queues with sparse selectors git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1376934 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/jdbc/adapter/DefaultJDBCAdapter.java | 3 ++- .../ConcurrentProducerDurableConsumerTest.java | 4 +++- .../usecases/DurableSubscriptionSelectorTest.java | 14 ++++++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 2a28662ce5..9cd6e17446 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; @@ -59,7 +60,7 @@ import org.slf4j.LoggerFactory; */ public class DefaultJDBCAdapter implements JDBCAdapter { private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); - public static final int MAX_ROWS = 10000; + public static final int MAX_ROWS = BaseDestination.MAX_PAGE_SIZE; protected Statements statements; protected boolean batchStatments = true; protected boolean prioritizedMessages; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index 2f5fb5f99b..1603ad2515 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -414,7 +414,9 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { priority = message.getJMSPriority(); } catch (JMSException ignored) {} if (!messageLists.containsKey(priority)) { - messageLists.put(priority, new MessageIdList()); + MessageIdList perPriorityList = new MessageIdList(); + perPriorityList.setParent(allMessagesList); + messageLists.put(priority, perPriorityList); } messageLists.get(priority).onMessage(message); if (count.incrementAndGet() == 1) { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java index d6f2a4b896..53c2ffd5a4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java @@ -31,6 +31,8 @@ import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; @@ -144,9 +146,17 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup } setDefaultPersistenceAdapter(broker); + /* use maxPageSize policy in place of always pulling from the broker in maxRows chunks if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { - ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000); - } + ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000); + }*/ + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setMaxPageSize(5000); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); }