remove priority flag from default jdbc impl as that is shared across dests, leads to conflict between priority and non priority dests, limit rowSize returned from durable fetch so we don't hog memory or return too few matches: https://issues.apache.org/activemq/browse/AMQ-2980

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1034466 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-12 16:51:26 +00:00
parent 2e1a014fec
commit 4679c8a788
6 changed files with 102 additions and 43 deletions

View File

@ -31,8 +31,6 @@ public interface JDBCAdapter {
void setStatements(Statements statementProvider);
void setPrioritizedMessages(boolean prioritizedMessages);
void doCreateTables(TransactionContext c) throws SQLException, IOException;
void doDropTables(TransactionContext c) throws SQLException, IOException;
@ -59,7 +57,10 @@ public interface JDBCAdapter {
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned,
JDBCMessageRecoveryListener listener) throws Exception;
void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive) throws SQLException, IOException;
void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned,
JDBCMessageRecoveryListener listener) throws Exception;
void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive, boolean isPrioritizeMessages) throws SQLException, IOException;
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
@ -69,7 +70,7 @@ public interface JDBCAdapter {
void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName) throws SQLException, IOException;
void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException;
long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;
@ -79,11 +80,11 @@ public interface JDBCAdapter {
SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, boolean isPrioritizeMessages) throws SQLException, IOException;
int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception;
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;

View File

@ -226,7 +226,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, new JDBCMessageRecoveryListener() {
adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) {
@ -301,6 +302,5 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
adapter.setPrioritizedMessages(prioritizedMessages);
}
}

View File

@ -333,7 +333,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
try {
LOG.debug("Cleaning up old messages.");
c = getTransactionContext();
getAdapter().doDeleteOldMessages(c);
getAdapter().doDeleteOldMessages(c, false);
getAdapter().doDeleteOldMessages(c, true);
} catch (IOException e) {
LOG.warn("Old message cleanup failed due to: " + e, e);
} catch (SQLException e) {

View File

@ -104,8 +104,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
0, 0, maxReturned, new JDBCMessageRecoveryListener() {
JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) {
@ -122,7 +121,14 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return listener.recoverMessageReference(new MessageId(reference));
}
});
};
if (isPrioritizedMessages()) {
adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
0, 0, maxReturned, jdbcListener);
} else {
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
0, 0, maxReturned, jdbcListener);
}
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
} finally {
@ -138,7 +144,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
c = persistenceAdapter.getTransactionContext();
adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
@ -192,8 +198,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);

View File

@ -17,11 +17,8 @@
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@ -31,8 +28,6 @@ import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
@ -65,6 +60,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
protected boolean batchStatments = true;
protected boolean prioritizedMessages;
protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
// needs to be min twice the prefetch for a durable sub
protected int maxRows = 2000;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@ -509,12 +506,44 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
if (isPrioritizedMessages()) {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned) {
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
}
}
} else {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
while (rs.next() && count < maxReturned) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
}
}
}
// no set max rows as selectors may need to scan more than maxReturned messages to get what they need
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
}
public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
// maxRows needs to be twice prefetch as the db will replay all unacked, so inflight messages will
// be returned and suppressed by the cursor audit. It is faster this way.
s.setMaxRows(maxRows);
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@ -541,13 +570,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName) throws SQLException, IOException {
String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
cleanupExclusiveLock.readLock().lock();
try {
if (this.isPrioritizedMessages()) {
if (isPrioritizedMessages) {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
} else {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
@ -574,7 +603,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
* @throws SQLException
* @throws IOException
*/
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
throws SQLException, IOException {
// dumpTables(c, destination.getQualifiedName(), clientId,
// subscriptionName);
@ -597,7 +626,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
int maxPriority = 1;
if (this.isPrioritizedMessages()) {
if (isPrioritizedMessages) {
maxPriority = 10;
}
@ -712,11 +741,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.writeLock().lock();
try {
if (this.isPrioritizedMessages()) {
if (isPrioritizedMessages) {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
} else {
@ -815,16 +844,16 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void setStatements(Statements statements) {
this.statements = statements;
}
public int getMaxRows() {
return maxRows;
}
public void setMaxRows(int maxRows) {
this.maxRows = maxRows;
}
public boolean isPrioritizedMessages() {
return prioritizedMessages;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
/**
* @param c
* @param destination
@ -878,12 +907,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
if (isPrioritizedMessages()) {
if (isPrioritizedMessages) {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
} else {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
@ -891,7 +920,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
s.setLong(2, nextSeq);
if (isPrioritizedMessages()) {
if (isPrioritizedMessages) {
s.setLong(3, priority);
s.setLong(4, priority);
}

View File

@ -70,7 +70,8 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
policy.setPrioritizedMessages(prioritizeMessages);
policy.setUseCache(useCache);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
policyMap.put(new ActiveMQQueue("TEST"), policy);
policyMap.put(new ActiveMQTopic("TEST"), policy);
broker.setDestinationPolicy(policyMap);
broker.start();
broker.waitUntilStarted();
@ -198,7 +199,29 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
// verify that same broker/store can deal with non priority dest also
topic = (ActiveMQTopic)sess.createTopic("HAS_NO_PRIORITY");
sub = sess.createDurableSubscriber(topic, "no_priority");
sub.close();
lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
lowPri.start();
highPri.start();
lowPri.join();
highPri.join();
sub = sess.createDurableSubscriber(topic, "no_priority");
// verify we got them all
for (int i = 0; i < MSG_NUM * 2; i++) {
Message msg = sub.receive(5000);
assertNotNull("Message " + i + " was null", msg);
}
}
public void initCombosForTestDurableSubsReconnect() {