diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index bc306db208..893b3fe1b2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -258,11 +258,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } @Override - public void setMaxBatchSize(int maxBatchSize) { - for (PendingMessageCursor storePrefetch : storePrefetches) { - storePrefetch.setMaxBatchSize(maxBatchSize); + public void setMaxBatchSize(int newMaxBatchSize) { + if (newMaxBatchSize > getMaxBatchSize()) { + for (PendingMessageCursor storePrefetch : storePrefetches) { + storePrefetch.setMaxBatchSize(newMaxBatchSize); + } + super.setMaxBatchSize(newMaxBatchSize); } - super.setMaxBatchSize(maxBatchSize); } @Override diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 8c49ba2f74..1dc14c338c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -93,4 +93,8 @@ public interface JDBCAdapter { long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException; void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException; + + public int getMaxRows(); + + public void setMaxRows(int maxRows); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index c85bc06202..ce46c3657c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -99,6 +99,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist protected ActiveMQMessageAudit audit; protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); + protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; public JDBCPersistenceAdapter() { } @@ -464,6 +465,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist public void setAdapter(JDBCAdapter adapter) { this.adapter = adapter; this.adapter.setStatements(getStatements()); + this.adapter.setMaxRows(getMaxRows()); } public WireFormat getWireFormat() { @@ -715,5 +717,16 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist synchronized(sequenceGenerator) { return sequenceGenerator.getNextSequenceId(); } - } + } + + public int getMaxRows() { + return maxRows; + } + + /* + * the max rows return from queries, with sparse selectors this may need to be increased + */ + public void setMaxRows(int maxRows) { + this.maxRows = maxRows; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index f63df51c3e..201c23c61c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -56,12 +56,13 @@ import org.apache.commons.logging.LogFactory; */ public class DefaultJDBCAdapter implements JDBCAdapter { private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); + public static final int MAX_ROWS = 10000; protected Statements statements; protected boolean batchStatments = true; protected boolean prioritizedMessages; protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); - // needs to be min twice the prefetch for a durable sub - protected int maxRows = 2000; + // needs to be min twice the prefetch for a durable sub and large enough for selector range + protected int maxRows = MAX_ROWS; protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { s.setBytes(index, data); @@ -507,7 +508,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); - s.setMaxRows(maxReturned * 2); + s.setMaxRows(Math.max(maxReturned * 2, maxRows)); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); @@ -917,7 +918,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } else { s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); } - s.setMaxRows(maxReturned * 2); + s.setMaxRows(Math.max(maxReturned * 2, maxRows)); s.setString(1, destination.getQualifiedName()); s.setLong(2, nextSeq); if (isPrioritizedMessages) { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java index b664fbc54e..83f3bb6e0e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java @@ -33,6 +33,8 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.Wait; public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport { @@ -71,7 +73,8 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup openConsumer(); sendMessage(true); - Thread.sleep(1000); + + Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000); assertEquals("Message is not recieved.", 1, received); @@ -140,6 +143,10 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup broker.setDeleteAllMessagesOnStartup(true); } setDefaultPersistenceAdapter(broker); + + if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { + ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000); + } broker.start(); }