mirror of https://github.com/apache/activemq.git
further resolution to https://issues.apache.org/activemq/browse/AMQ-2980, concurrent producers was still problematic as the store needed to be traversed multiple times, requiring ack locations per priority. Resolution to https://issues.apache.org/activemq/browse/AMQ-2551 fell out of decreasing the cleanup interval in some of the tests, cleanup needs an exclusive lock so it won't cause contention
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1030928 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e96229803d
commit
06cbebc8da
|
@ -670,7 +670,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
|||
/**
|
||||
* set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
|
||||
* This allowable dirty isolation level may not be achievable in clustered DB environments
|
||||
* so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ
|
||||
* so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
|
||||
* see isolation level constants in {@link java.sql.Connection}
|
||||
* @param transactionIsolation the isolation level to use
|
||||
*/
|
||||
|
|
|
@ -70,6 +70,10 @@ public class Statements {
|
|||
private boolean useLockCreateWhereClause;
|
||||
private String findAllMessageIdsStatement;
|
||||
private String lastProducerSequenceIdStatement;
|
||||
private String selectDurablePriorityAckStatement;
|
||||
|
||||
private String insertDurablePriorityAckStatement;
|
||||
private String updateDurableLastAckStatement;
|
||||
|
||||
public String[] getCreateSchemaStatements() {
|
||||
if (createSchemaStatements == null) {
|
||||
|
@ -93,7 +97,9 @@ public class Statements {
|
|||
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
|
||||
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
|
||||
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
|
||||
"ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType,
|
||||
"ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " NOT NULL DEFAULT 5",
|
||||
"ALTER TABLE " + getFullAckTableName() + " DROP PRIMARY KEY",
|
||||
"ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
|
||||
};
|
||||
}
|
||||
return createSchemaStatements;
|
||||
|
@ -207,7 +213,7 @@ public class Statements {
|
|||
public String getFindDurableSubStatement() {
|
||||
if (findDurableSubStatement == null) {
|
||||
findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
|
||||
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
|
||||
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND SUB_DEST IS NOT NULL";
|
||||
}
|
||||
return findDurableSubStatement;
|
||||
}
|
||||
|
@ -215,15 +221,15 @@ public class Statements {
|
|||
public String getFindAllDurableSubsStatement() {
|
||||
if (findAllDurableSubsStatement == null) {
|
||||
findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM "
|
||||
+ getFullAckTableName() + " WHERE CONTAINER=?";
|
||||
+ getFullAckTableName() + " WHERE CONTAINER=? AND SUB_DEST IS NOT NULL";
|
||||
}
|
||||
return findAllDurableSubsStatement;
|
||||
}
|
||||
|
||||
public String getUpdateLastAckOfDurableSubStatement() {
|
||||
if (updateLastAckOfDurableSubStatement == null) {
|
||||
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?, PRIORITY=?"
|
||||
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
|
||||
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
|
||||
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
|
||||
}
|
||||
return updateLastAckOfDurableSubStatement;
|
||||
}
|
||||
|
@ -264,7 +270,19 @@ public class Statements {
|
|||
+ getFullAckTableName() + " D "
|
||||
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
|
||||
+ " AND M.CONTAINER=D.CONTAINER AND "
|
||||
+ "((M.ID > ? AND M.PRIORITY = ?) OR M.PRIORITY < ?)"
|
||||
+ "((M.ID > ? AND M.PRIORITY = ?) "
|
||||
+ " OR (M.PRIORITY <> ? "
|
||||
+ " AND ( M.ID >"
|
||||
+ " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
|
||||
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
|
||||
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
|
||||
+ " OR "
|
||||
+ " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
|
||||
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
|
||||
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
|
||||
+ " )"
|
||||
+ " )"
|
||||
+ ")"
|
||||
+ " ORDER BY M.PRIORITY DESC, M.ID";
|
||||
}
|
||||
return findDurableSubMessagesByPriorityStatement;
|
||||
|
@ -306,8 +324,17 @@ public class Statements {
|
|||
+ " M, "
|
||||
+ getFullAckTableName()
|
||||
+ " D "
|
||||
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
|
||||
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY";
|
||||
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL"
|
||||
+ " AND M.CONTAINER=D.CONTAINER "
|
||||
+ " AND ( M.ID >"
|
||||
+ " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
|
||||
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
|
||||
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )"
|
||||
+ " OR "
|
||||
+ " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName()
|
||||
+ " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
|
||||
+ " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)"
|
||||
+ " )";
|
||||
}
|
||||
return durableSubscriberMessageCountStatement;
|
||||
}
|
||||
|
@ -338,15 +365,20 @@ public class Statements {
|
|||
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
|
||||
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
|
||||
+ " OR (ID < "
|
||||
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
|
||||
+ " FROM " + getFullAckTableName() + " WHERE "
|
||||
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
|
||||
+ " FROM " + getFullAckTableName() + " WHERE "
|
||||
+ getFullAckTableName() + ".CONTAINER="
|
||||
+ getFullMessageTableName() + ".CONTAINER )"
|
||||
+ " AND PRIORITY >= "
|
||||
+ " ( SELECT min(" + getFullAckTableName() + ".PRIORITY) "
|
||||
+ " FROM " + getFullAckTableName() + " WHERE "
|
||||
+ getFullMessageTableName() + ".CONTAINER"
|
||||
+ " AND " + getFullAckTableName() + ".SUB_DEST IS NULL"
|
||||
+ " AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )"
|
||||
+ " AND ID <"
|
||||
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
|
||||
+ " FROM " + getFullAckTableName() + " WHERE "
|
||||
+ getFullAckTableName() + ".CONTAINER="
|
||||
+ getFullMessageTableName() + ".CONTAINER ))";
|
||||
+ getFullMessageTableName() + ".CONTAINER"
|
||||
+ " AND " + getFullAckTableName() + ".SUB_DEST IS NOT NULL )"
|
||||
+ " )";
|
||||
|
||||
}
|
||||
return deleteOldMessagesStatement;
|
||||
}
|
||||
|
@ -418,6 +450,35 @@ public class Statements {
|
|||
return lastAckedDurableSubscriberMessageStatement;
|
||||
}
|
||||
|
||||
public String getSelectDurablePriorityAckStatement() {
|
||||
if (selectDurablePriorityAckStatement == null) {
|
||||
selectDurablePriorityAckStatement = "SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
|
||||
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
|
||||
+ " AND PRIORITY = ?";
|
||||
}
|
||||
return selectDurablePriorityAckStatement;
|
||||
}
|
||||
|
||||
public String getInsertDurablePriorityAckStatement() {
|
||||
if (insertDurablePriorityAckStatement == null) {
|
||||
insertDurablePriorityAckStatement = "INSERT INTO "
|
||||
+ getFullAckTableName()
|
||||
+ "(CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)"
|
||||
+ " VALUES (?, ?, ?, ?)";
|
||||
}
|
||||
return insertDurablePriorityAckStatement;
|
||||
}
|
||||
|
||||
|
||||
public String getUpdateDurableLastAckStatement() {
|
||||
if (updateDurableLastAckStatement == null) {
|
||||
updateDurableLastAckStatement = "UPDATE " + getFullAckTableName()
|
||||
+ " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
|
||||
+ " AND PRIORITY = " + (Byte.MAX_VALUE - 1);
|
||||
}
|
||||
return updateDurableLastAckStatement;
|
||||
}
|
||||
|
||||
public String getFullMessageTableName() {
|
||||
return getTablePrefix() + getMessageTableName();
|
||||
}
|
||||
|
@ -709,4 +770,15 @@ public class Statements {
|
|||
this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement;
|
||||
}
|
||||
|
||||
public void setSelectDurablePriorityAckStatement(String selectDurablePriorityAckStatement) {
|
||||
this.selectDurablePriorityAckStatement = selectDurablePriorityAckStatement;
|
||||
}
|
||||
|
||||
public void setInsertDurablePriorityAckStatement(String insertDurablePriorityAckStatement) {
|
||||
this.insertDurablePriorityAckStatement = insertDurablePriorityAckStatement;
|
||||
}
|
||||
|
||||
public void setUpdateDurableLastAckStatement(String updateDurableLastAckStatement) {
|
||||
this.updateDurableLastAckStatement = updateDurableLastAckStatement;
|
||||
}
|
||||
}
|
|
@ -28,6 +28,8 @@ import java.util.ArrayList;
|
|||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
|
@ -62,6 +64,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
protected Statements statements;
|
||||
protected boolean batchStatments = true;
|
||||
protected boolean prioritizedMessages;
|
||||
protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
|
||||
|
||||
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
|
||||
s.setBytes(index, data);
|
||||
|
@ -73,6 +76,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
|
||||
public void doCreateTables(TransactionContext c) throws SQLException, IOException {
|
||||
Statement s = null;
|
||||
cleanupExclusiveLock.writeLock().lock();
|
||||
try {
|
||||
// Check to see if the table already exists. If it does, then don't
|
||||
// log warnings during startup.
|
||||
|
@ -112,6 +116,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
c.getConnection().commit();
|
||||
} finally {
|
||||
cleanupExclusiveLock.writeLock().unlock();
|
||||
try {
|
||||
s.close();
|
||||
} catch (Throwable e) {
|
||||
|
@ -121,6 +126,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
|
||||
public void doDropTables(TransactionContext c) throws SQLException, IOException {
|
||||
Statement s = null;
|
||||
cleanupExclusiveLock.writeLock().lock();
|
||||
try {
|
||||
s = c.getConnection().createStatement();
|
||||
String[] dropStatments = this.statements.getDropSchemaStatements();
|
||||
|
@ -139,6 +145,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
c.getConnection().commit();
|
||||
} finally {
|
||||
cleanupExclusiveLock.writeLock().unlock();
|
||||
try {
|
||||
s.close();
|
||||
} catch (Throwable e) {
|
||||
|
@ -149,6 +156,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
|
||||
rs = s.executeQuery();
|
||||
|
@ -171,6 +179,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
long seq = Math.max(seq1, seq2);
|
||||
return seq;
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -179,6 +188,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(
|
||||
this.statements.getFindMessageByIdStatement());
|
||||
|
@ -189,6 +199,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
return getBinaryData(rs, 1);
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -198,6 +209,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
|
||||
long expiration, byte priority) throws SQLException, IOException {
|
||||
PreparedStatement s = c.getAddMessageStatement();
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
if (s == null) {
|
||||
s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
|
||||
|
@ -218,6 +230,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throw new SQLException("Failed add a message");
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
if (!this.batchStatments) {
|
||||
if (s != null) {
|
||||
s.close();
|
||||
|
@ -229,6 +242,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
|
||||
long expirationTime, String messageRef) throws SQLException, IOException {
|
||||
PreparedStatement s = c.getAddMessageStatement();
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
if (s == null) {
|
||||
s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
|
||||
|
@ -248,6 +262,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throw new SQLException("Failed add a message");
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
if (!this.batchStatments) {
|
||||
s.close();
|
||||
}
|
||||
|
@ -257,6 +272,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
|
||||
s.setString(1, messageID.getProducerId().toString());
|
||||
|
@ -268,6 +284,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
return new long[]{rs.getLong(1), rs.getLong(2)};
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -276,6 +293,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
|
||||
s.setString(1, id.getProducerId().toString());
|
||||
|
@ -286,6 +304,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
return getBinaryData(rs, 1);
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -294,6 +313,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
|
||||
s.setLong(1, seq);
|
||||
|
@ -303,6 +323,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
return rs.getString(1);
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -310,6 +331,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
|
||||
public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
|
||||
PreparedStatement s = c.getRemovedMessageStatement();
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
if (s == null) {
|
||||
s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
|
||||
|
@ -324,6 +346,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throw new SQLException("Failed to remove message");
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
if (!this.batchStatments && s != null) {
|
||||
s.close();
|
||||
}
|
||||
|
@ -334,6 +357,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throws Exception {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -352,6 +376,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -361,6 +386,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
JDBCMessageIdScanListener listener) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
|
||||
s.setMaxRows(limit);
|
||||
|
@ -377,6 +403,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
listener.messageId(id);
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -384,7 +411,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
|
||||
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
|
||||
String subscriptionName, long seq, long prio) throws SQLException, IOException {
|
||||
doCreatePriorityAckRow(c, destination, clientId, subscriptionName, prio);
|
||||
doUpdateLatestAckRow(c, destination, clientId, subscriptionName, seq, prio);
|
||||
PreparedStatement s = c.getUpdateLastAckStatement();
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
if (s == null) {
|
||||
s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
|
||||
|
@ -393,28 +423,94 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
}
|
||||
s.setLong(1, seq);
|
||||
s.setLong(2, prio);
|
||||
s.setString(3, destination.getQualifiedName());
|
||||
s.setString(4, clientId);
|
||||
s.setString(5, subscriptionName);
|
||||
s.setString(2, destination.getQualifiedName());
|
||||
s.setString(3, clientId);
|
||||
s.setString(4, subscriptionName);
|
||||
s.setLong(5, prio);
|
||||
if (this.batchStatments) {
|
||||
s.addBatch();
|
||||
} else if (s.executeUpdate() != 1) {
|
||||
throw new SQLException("Failed add a message");
|
||||
throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
if (!this.batchStatments) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doCreatePriorityAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
|
||||
String subscriptionName,long priority) throws SQLException, IOException{
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
boolean exists = false;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getSelectDurablePriorityAckStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
s.setString(2, clientId);
|
||||
s.setString(3, subscriptionName);
|
||||
s.setLong(4, priority);
|
||||
|
||||
rs = s.executeQuery();
|
||||
exists = rs.next();
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
||||
if (!exists) {
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getInsertDurablePriorityAckStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
s.setString(2, clientId);
|
||||
s.setString(3, subscriptionName);
|
||||
s.setLong(4, priority);
|
||||
if (s.executeUpdate() != 1) {
|
||||
throw new IOException("Could not insert initial ack entry for priority: "
|
||||
+ priority + ", for sub: " + subscriptionName);
|
||||
}
|
||||
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doUpdateLatestAckRow(TransactionContext c, ActiveMQDestination destination, String clientId,
|
||||
String subscriptionName, long seq, long priority) throws SQLException, IOException{
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
|
||||
s.setLong(1, seq);
|
||||
s.setString(2, destination.getQualifiedName());
|
||||
s.setString(3, clientId);
|
||||
s.setString(4, subscriptionName);
|
||||
|
||||
if (s.executeUpdate() != 1) {
|
||||
throw new IOException("Could not update last ack seq : "
|
||||
+ seq + ", for sub: " + subscriptionName);
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
|
||||
public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
|
||||
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
|
||||
// dumpTables(c,
|
||||
// destination.getQualifiedName(),clientId,subscriptionName);
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -435,6 +531,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -445,6 +542,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
if (isPrioritizedMessages()) {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
|
||||
|
@ -476,6 +574,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -486,6 +585,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
int result = 0;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -496,6 +596,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
result = rs.getInt(1);
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -514,6 +615,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
// dumpTables(c, destination.getQualifiedName(), clientId,
|
||||
// subscriptionName);
|
||||
PreparedStatement s = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
long lastMessageId = -1;
|
||||
long priority = Byte.MAX_VALUE - 1;
|
||||
|
@ -542,6 +644,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throw new IOException("Could not create durable subscription for: " + info.getClientId());
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
|
@ -550,6 +653,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
String clientId, String subscriptionName) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -568,6 +672,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
ActiveMQDestination.QUEUE_TYPE));
|
||||
return subscription;
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -577,6 +682,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -594,6 +700,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
return rc.toArray(new SubscriptionInfo[rc.size()]);
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -602,6 +709,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
|
||||
IOException {
|
||||
PreparedStatement s = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
|
||||
s.setString(1, destinationName.getQualifiedName());
|
||||
|
@ -611,6 +719,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
s.setString(1, destinationName.getQualifiedName());
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
|
@ -618,6 +727,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
|
||||
String subscriptionName) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -625,12 +735,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
s.setString(3, subscriptionName);
|
||||
s.executeUpdate();
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
|
||||
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
cleanupExclusiveLock.writeLock().lock();
|
||||
try {
|
||||
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
|
||||
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
|
||||
|
@ -638,6 +750,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
int i = s.executeUpdate();
|
||||
LOG.debug("Deleted " + i + " old message(s).");
|
||||
} finally {
|
||||
cleanupExclusiveLock.writeLock().unlock();
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
|
@ -647,6 +760,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -657,9 +771,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
result[0] = rs.getLong(1);
|
||||
result[1] = rs.getLong(2);
|
||||
}
|
||||
rs.close();
|
||||
s.close();
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -684,6 +797,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
|
||||
rs = s.executeQuery();
|
||||
|
@ -691,6 +805,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -747,6 +862,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
String clientId, String subscriberName) throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -759,6 +875,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
return getBinaryData(rs, 1);
|
||||
} finally {
|
||||
close(rs);
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(s);
|
||||
}
|
||||
}
|
||||
|
@ -768,6 +885,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
int result = 0;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
|
||||
s.setString(1, destination.getQualifiedName());
|
||||
|
@ -776,6 +894,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
result = rs.getInt(1);
|
||||
}
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -786,6 +905,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
if (isPrioritizedMessages()) {
|
||||
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
|
||||
|
@ -823,6 +943,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
@ -887,6 +1008,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
throws SQLException, IOException {
|
||||
PreparedStatement s = null;
|
||||
ResultSet rs = null;
|
||||
cleanupExclusiveLock.readLock().lock();
|
||||
try {
|
||||
s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
|
||||
s.setString(1, id.toString());
|
||||
|
@ -897,6 +1019,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
}
|
||||
return seq;
|
||||
} finally {
|
||||
cleanupExclusiveLock.readLock().unlock();
|
||||
close(rs);
|
||||
close(s);
|
||||
}
|
||||
|
|
|
@ -49,8 +49,9 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
Connection conn;
|
||||
protected Session sess;
|
||||
|
||||
public boolean useCache;
|
||||
public boolean dispatchAsync = false;
|
||||
public boolean useCache = true;
|
||||
public boolean dispatchAsync = true;
|
||||
public boolean prioritizeMessages = true;
|
||||
public int prefetchVal = 500;
|
||||
|
||||
public int MSG_NUM = 600;
|
||||
|
@ -66,7 +67,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
adapter = createPersistenceAdapter(true);
|
||||
broker.setPersistenceAdapter(adapter);
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setPrioritizedMessages(true);
|
||||
policy.setPrioritizedMessages(prioritizeMessages);
|
||||
policy.setUseCache(useCache);
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setDefaultEntry(policy);
|
||||
|
@ -87,11 +88,14 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
sess.close();
|
||||
conn.close();
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
try {
|
||||
sess.close();
|
||||
conn.close();
|
||||
} catch (Exception ignored) {
|
||||
} finally {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
public void testStoreConfigured() throws Exception {
|
||||
|
@ -164,7 +168,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
}
|
||||
|
||||
protected Message createMessage(int priority) throws Exception {
|
||||
final String text = "Message with priority " + priority;
|
||||
final String text = "priority " + priority;
|
||||
Message msg = sess.createTextMessage(text);
|
||||
LOG.info("Sending " + text);
|
||||
return msg;
|
||||
|
@ -199,7 +203,9 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
|
||||
public void initCombosForTestDurableSubsReconnect() {
|
||||
addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
|
||||
addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||
// REVISIT = is dispatchAsync = true a problem or is it just the test?
|
||||
addCombinationValues("dispatchAsync", new Object[] {Boolean.FALSE});
|
||||
addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testDurableSubsReconnect() throws Exception {
|
||||
|
@ -221,7 +227,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
final int closeFrequency = MSG_NUM/4;
|
||||
sub = sess.createDurableSubscriber(topic, subName);
|
||||
for (int i = 0; i < MSG_NUM * 2; i++) {
|
||||
Message msg = sub.receive(30000);
|
||||
Message msg = sub.receive(15000);
|
||||
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
|
||||
assertNotNull("Message " + i + " was null", msg);
|
||||
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
|
||||
package org.apache.activemq.store.jdbc;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import junit.framework.Test;
|
||||
|
@ -40,7 +43,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
|
|||
dataSource.setShutdownDatabase("false");
|
||||
jdbc.setDataSource(dataSource);
|
||||
jdbc.deleteAllMessages();
|
||||
jdbc.setCleanupPeriod(1000);
|
||||
jdbc.setCleanupPeriod(2000);
|
||||
return jdbc;
|
||||
}
|
||||
|
||||
|
@ -74,8 +77,10 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
|
|||
final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI};
|
||||
sub = sess.createDurableSubscriber(topic, subName);
|
||||
for (int i = 0; i < MSG_NUM * 4; i++) {
|
||||
Message msg = sub.receive(30000);
|
||||
LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null));
|
||||
Message msg = sub.receive(10000);
|
||||
LOG.info("received i=" + i + ", m=" + (msg!=null?
|
||||
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
|
||||
: null) );
|
||||
assertNotNull("Message " + i + " was null", msg);
|
||||
assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority());
|
||||
if (i > 0 && i % closeFrequency == 0) {
|
||||
|
@ -88,6 +93,53 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
|
|||
sub.close();
|
||||
}
|
||||
|
||||
public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
|
||||
addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
|
||||
ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
|
||||
final String subName = "priorityDisconnect";
|
||||
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
|
||||
sub.close();
|
||||
|
||||
final int maxPriority = 5;
|
||||
|
||||
final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
|
||||
Vector<ProducerThread> producers = new Vector<ProducerThread>();
|
||||
for (int priority=0; priority <maxPriority; priority++) {
|
||||
producers.add(new ProducerThread(topic, MSG_NUM, priority));
|
||||
messageCounts[priority] = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
for (ProducerThread producer : producers) {
|
||||
producer.start();
|
||||
}
|
||||
|
||||
final int closeFrequency = MSG_NUM/2;
|
||||
|
||||
sub = sess.createDurableSubscriber(topic, subName);
|
||||
for (int i=0; i < MSG_NUM * maxPriority; i++) {
|
||||
Message msg = sub.receive(10000);
|
||||
LOG.info("received i=" + i + ", m=" + (msg!=null?
|
||||
msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
|
||||
: null) );
|
||||
assertNotNull("Message " + i + " was null", msg);
|
||||
messageCounts[msg.getJMSPriority()].incrementAndGet();
|
||||
if (i > 0 && i % closeFrequency == 0) {
|
||||
LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts));
|
||||
sub.close();
|
||||
sub = sess.createDurableSubscriber(topic, subName);
|
||||
}
|
||||
}
|
||||
LOG.info("closing on done!");
|
||||
sub.close();
|
||||
|
||||
for (ProducerThread producer : producers) {
|
||||
producer.join();
|
||||
}
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(JDBCMessagePriorityTest.class);
|
||||
}
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||
|
||||
public class AMQStoreDurableSubscriptionSelectorTest extends DurableSubscriptionSelectorTest {
|
||||
|
||||
@Override
|
||||
public PersistenceAdapter createPersistenceAdapter() throws Exception {
|
||||
return new AMQPersistenceAdapter();
|
||||
}
|
||||
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
|
||||
import javax.jms.*;
|
||||
|
@ -87,6 +88,10 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
}
|
||||
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
|
||||
// ensure it kicks in during tests
|
||||
((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
|
||||
}
|
||||
broker.start();
|
||||
}
|
||||
|
||||
|
@ -295,6 +300,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals("offline consumer got all", sent, listener.count);
|
||||
}
|
||||
|
||||
public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
this.addCombinationValues("usePrioritySupport",
|
||||
new Object[]{ Boolean.TRUE, Boolean.FALSE});
|
||||
}
|
||||
|
||||
public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
|
||||
Connection con = createConnection("offCli1");
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
|
|
@ -28,6 +28,7 @@ import junit.framework.Test;
|
|||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
|
||||
public class DurableSubscriptionReactivationTest extends EmbeddedBrokerTestSupport {
|
||||
|
||||
|
@ -79,8 +80,13 @@ public class DurableSubscriptionReactivationTest extends EmbeddedBrokerTestSuppo
|
|||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = super.createBroker();
|
||||
answer.setKeepDurableSubsActive(keepDurableSubsActive);
|
||||
answer.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected boolean isPersistent() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(DurableSubscriptionReactivationTest.class);
|
||||
|
|
|
@ -27,13 +27,14 @@ import javax.jms.TopicSubscriber;
|
|||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import junit.framework.Test;
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
|
||||
abstract public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
|
||||
public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
|
||||
|
||||
MBeanServer mbs;
|
||||
BrokerService broker = null;
|
||||
|
@ -45,6 +46,14 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem
|
|||
|
||||
private int received = 0;
|
||||
|
||||
public static Test suite() {
|
||||
return suite(DurableSubscriptionSelectorTest.class);
|
||||
}
|
||||
|
||||
public void initCombosForTestSubscription() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
|
||||
}
|
||||
|
||||
public void testSubscription() throws Exception {
|
||||
openConsumer();
|
||||
for (int i = 0; i < 4000; i++) {
|
||||
|
@ -130,7 +139,7 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem
|
|||
if (deleteMessages) {
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
}
|
||||
broker.setPersistenceAdapter(createPersistenceAdapter());
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
broker.start();
|
||||
}
|
||||
|
||||
|
@ -140,8 +149,6 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem
|
|||
broker = null;
|
||||
}
|
||||
|
||||
abstract public PersistenceAdapter createPersistenceAdapter() throws Exception;
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||
return new ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
|
||||
}
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
|
||||
public class KahaDBDurableSubscriptionSelectorTest extends DurableSubscriptionSelectorTest {
|
||||
|
||||
@Override
|
||||
public PersistenceAdapter createPersistenceAdapter() throws Exception {
|
||||
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
|
||||
File dir = new File("target/KahaDB");
|
||||
adapter.setDirectory(dir);
|
||||
return adapter;
|
||||
}
|
||||
|
||||
}
|
|
@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout
|
|||
|
||||
log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
|
||||
#log4j.logger.org.apache.activemq=TRACE
|
||||
#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
|
||||
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
|
||||
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
|
||||
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
|
||||
|
||||
|
|
Loading…
Reference in New Issue