https://issues.apache.org/jira/browse/AMQ-1870 - have maxRows implement a limit, policyentry max page size can be used to extend the scan, https://issues.apache.org/jira/browse/AMQ-3557

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1439933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-29 15:27:01 +00:00
parent 960481c7e8
commit d6e8a5b988
1 changed files with 6 additions and 4 deletions

View File

@ -61,12 +61,11 @@ import org.slf4j.LoggerFactory;
*/ */
public class DefaultJDBCAdapter implements JDBCAdapter { public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
public static final int MAX_ROWS = BaseDestination.MAX_PAGE_SIZE; public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
protected Statements statements; protected Statements statements;
protected boolean batchStatments = true; protected boolean batchStatments = true;
protected boolean prioritizedMessages; protected boolean prioritizedMessages;
protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
// needs to be min twice the prefetch for a durable sub and large enough for selector range
protected int maxRows = MAX_ROWS; protected int maxRows = MAX_ROWS;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
@ -573,7 +572,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s.setMaxRows(Math.max(maxReturned * 2, maxRows)); s.setMaxRows(Math.min(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
@ -608,7 +607,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
cleanupExclusiveLock.readLock().lock(); cleanupExclusiveLock.readLock().lock();
try { try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
s.setMaxRows(Math.max(maxReturned * 2, maxRows)); s.setMaxRows(Math.min(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName()); s.setString(1, destination.getQualifiedName());
s.setString(2, clientId); s.setString(2, clientId);
s.setString(3, subscriptionName); s.setString(3, subscriptionName);
@ -918,6 +917,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
return maxRows; return maxRows;
} }
/**
* the max value for statement maxRows, used to limit jdbc queries
*/
public void setMaxRows(int maxRows) { public void setMaxRows(int maxRows) {
this.maxRows = maxRows; this.maxRows = maxRows;
} }