improve jdbc durable sub performance for long running subs and resolve regression in selector test, related to https://issues.apache.org/activemq/browse/AMQ-2985, https://issues.apache.org/activemq/browse/AMQ-2980, expose maxRows on jdbc persistence adapter to allow it to be increased for really large selectors

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1035202 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-15 10:00:56 +00:00
parent 4679c8a788
commit 4fbf92de82
5 changed files with 37 additions and 10 deletions

View File

@ -258,11 +258,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
@Override
public void setMaxBatchSize(int maxBatchSize) {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.setMaxBatchSize(maxBatchSize);
public void setMaxBatchSize(int newMaxBatchSize) {
if (newMaxBatchSize > getMaxBatchSize()) {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.setMaxBatchSize(newMaxBatchSize);
}
super.setMaxBatchSize(newMaxBatchSize);
}
super.setMaxBatchSize(maxBatchSize);
}
@Override

View File

@ -93,4 +93,8 @@ public interface JDBCAdapter {
long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
public int getMaxRows();
public void setMaxRows(int maxRows);
}

View File

@ -99,6 +99,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
protected ActiveMQMessageAudit audit;
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
public JDBCPersistenceAdapter() {
}
@ -464,6 +465,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void setAdapter(JDBCAdapter adapter) {
this.adapter = adapter;
this.adapter.setStatements(getStatements());
this.adapter.setMaxRows(getMaxRows());
}
public WireFormat getWireFormat() {
@ -715,5 +717,16 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
}
}
public int getMaxRows() {
return maxRows;
}
/*
* the max rows return from queries, with sparse selectors this may need to be increased
*/
public void setMaxRows(int maxRows) {
this.maxRows = maxRows;
}
}

View File

@ -56,12 +56,13 @@ import org.apache.commons.logging.LogFactory;
*/
public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
public static final int MAX_ROWS = 10000;
protected Statements statements;
protected boolean batchStatments = true;
protected boolean prioritizedMessages;
protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
// needs to be min twice the prefetch for a durable sub
protected int maxRows = 2000;
// needs to be min twice the prefetch for a durable sub and large enough for selector range
protected int maxRows = MAX_ROWS;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@ -507,7 +508,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s.setMaxRows(maxReturned * 2);
s.setMaxRows(Math.max(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@ -917,7 +918,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} else {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
}
s.setMaxRows(maxReturned * 2);
s.setMaxRows(Math.max(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setLong(2, nextSeq);
if (isPrioritizedMessages) {

View File

@ -33,6 +33,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.Wait;
public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
@ -71,7 +73,8 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup
openConsumer();
sendMessage(true);
Thread.sleep(1000);
Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000);
assertEquals("Message is not recieved.", 1, received);
@ -140,6 +143,10 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup
broker.setDeleteAllMessagesOnStartup(true);
}
setDefaultPersistenceAdapter(broker);
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000);
}
broker.start();
}