jdbc variant of https://issues.apache.org/activemq/browse/AMQ-2985 - jdbc store cannot ack out of order, the cleanup task query needed to be priority aware. fix for https://issues.apache.org/activemq/browse/AMQ-2980 JDBC - priority needed to be considered in count and recovery, additional tests included

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1025939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-10-21 10:58:51 +00:00
parent ea82d5de5b
commit 3432a75110
24 changed files with 211 additions and 86 deletions

View File

@ -78,6 +78,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
*/
public void unmatched(MessageReference node) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
ack.setMessageID(node.getMessageId());
node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
}
@ -111,11 +112,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void activate(SystemUsage memoryManager, ConnectionContext context,
ConsumerInfo info) throws Exception {
LOG.debug("Activating " + this);
if (!active) {
this.active = true;
this.context = context;
this.info = info;
LOG.debug("Activating " + this);
int prefetch = info.getPrefetchSize();
if (prefetch>0) {
prefetch += prefetch/2;
@ -150,7 +151,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
public void deactivate(boolean keepDurableSubsActive) throws Exception {
LOG.debug("Dectivating " + this);
LOG.debug("Deactivating " + this);
active = false;
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {

View File

@ -502,7 +502,7 @@ public class Topic extends BaseDestination implements Task {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
}
messageConsumed(context, node);
}

View File

@ -248,6 +248,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected final synchronized void fillBatch() {
if (LOG.isTraceEnabled()) {
LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
+ ", hasMessages=" + this.storeHasMessages + ", size=" + this.size);
}
if (batchResetNeeded) {
resetBatch();
this.batchResetNeeded = false;

View File

@ -59,6 +59,11 @@ public class MessageAck extends BaseCommand {
*/
public static final byte INDIVIDUAL_ACK_TYPE = 4;
/**
* The ack case where a durable topic subscription does not match a selector.
*/
public static final byte UNMATCHED_ACK_TYPE = 5;
protected byte ackType;
protected ConsumerId consumerId;
protected MessageId firstMessageId;
@ -118,6 +123,10 @@ public class MessageAck extends BaseCommand {
return ackType == INDIVIDUAL_ACK_TYPE;
}
public boolean isUnmatchedAck() {
return ackType == UNMATCHED_ACK_TYPE;
}
/**
* @openwire:property version=1 cache=true
*/

View File

@ -74,8 +74,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId) throws IOException {
delegate.acknowledge(context, clientId, subscriptionName, messageId);
MessageId messageId, MessageAck ack) throws IOException {
delegate.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.store;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@ -39,7 +40,7 @@ public interface TopicMessageStore extends MessageStore {
* @param subscriptionPersistentId
* @throws IOException
*/
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException;
/**
* @param clientId

View File

@ -79,7 +79,8 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
/**
*/
public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck originalAck) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
@ -140,7 +141,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
try {
SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
if (sub != null) {
topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null);
return true;
}
} catch (Throwable e) {

View File

@ -83,9 +83,9 @@ public interface JDBCAdapter {
int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;

View File

@ -44,7 +44,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected final WireFormat wireFormat;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
protected ActiveMQMessageAudit audit;
@ -144,7 +145,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
// Get a connection and remove the message from the DB
TransactionContext c = persistenceAdapter.getTransactionContext(context);
@ -225,14 +226,15 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, new JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
if (listener.hasSpace()) {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
lastStoreSequenceId.set(sequenceId);
lastRecoveredSequenceId.set(sequenceId);
lastRecoveredPriority.set(msg.getPriority());
return true;
}
return false;
@ -259,32 +261,35 @@ public class JDBCMessageStore extends AbstractMessageStore {
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
public void resetBatching() {
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get());
if (LOG.isTraceEnabled()) {
LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
}
lastStoreSequenceId.set(-1);
lastRecoveredSequenceId.set(-1);
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
}
@Override
public void setBatch(MessageId messageId) {
long storeSequenceId = -1;
try {
storeSequenceId = getStoreSequenceIdForMessageId(messageId);
long[] storedValues = getStoreSequenceIdForMessageId(messageId);
lastRecoveredSequenceId.set(storedValues[0]);
lastRecoveredPriority.set(storedValues[1]);
} catch (IOException ignoredAsAlreadyLogged) {
// reset batch in effect with default -1 value
lastRecoveredSequenceId.set(-1);
lastRecoveredPriority.set(Byte.MAX_VALUE -1);
}
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get());
if (LOG.isTraceEnabled()) {
LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
+ ", priority: " + lastRecoveredPriority.get());
}
lastStoreSequenceId.set(storeSequenceId);
}
private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
long result = -1;
private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
long[] result = new long[]{-1, Byte.MAX_VALUE -1};
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, destination, messageId)[0];
result = adapter.getStoreSequenceId(c, destination, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);

View File

@ -527,6 +527,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
getAdapter().doDropTables(c);
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
getAdapter().doCreateTables(c);
LOG.info("Persistence store purged.");
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create(e);

View File

@ -26,6 +26,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
@ -33,12 +34,15 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.6 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class);
private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
private Map<String, AtomicLong> subscriberLastPriorityMap = new ConcurrentHashMap<String, AtomicLong>();
@ -46,12 +50,21 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
super(persistenceAdapter, adapter, wireFormat, topic, audit);
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
if (ack != null && ack.isUnmatchedAck()) {
if (LOG.isTraceEnabled()) {
LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
}
return;
}
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
long[] res = adapter.getStoreSequenceId(c, destination, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
if (LOG.isTraceEnabled()) {
LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]);
}
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
@ -93,12 +106,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
AtomicLong last = subscriberLastMessageMap.get(subcriberId);
AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
if (last == null) {
long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
last = new AtomicLong(lastAcked);
long[] lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
last = new AtomicLong(lastAcked[0]);
subscriberLastMessageMap.put(subcriberId, last);
priority = new AtomicLong(Byte.MAX_VALUE - 1);
priority = new AtomicLong(lastAcked[1]);
subscriberLastMessageMap.put(subcriberId, priority);
}
if (LOG.isTraceEnabled()) {
LOG.trace("recoverNextMessage - last: " + last.get() + ", priority: " + priority);
}
final AtomicLong finalLast = last;
final AtomicLong finalPriority = priority;
try {
@ -137,10 +153,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
subscriberLastPriorityMap.remove(subcriberId);
}
/**
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
*/
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@ -207,6 +219,9 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
} finally {
c.close();
}
if (LOG.isTraceEnabled()) {
LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
}
return result;
}

View File

@ -307,7 +307,7 @@ public class Statements {
+ getFullAckTableName()
+ " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY";
}
return durableSubscriberMessageCountStatement;
}
@ -336,11 +336,17 @@ public class Statements {
public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID < "
+ "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) "
+ "FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName()
+ ".CONTAINER)";
+ " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+ " OR (ID < "
+ " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+ " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER )"
+ " AND PRIORITY >= "
+ " ( SELECT min(" + getFullAckTableName() + ".PRIORITY) "
+ " FROM " + getFullAckTableName() + " WHERE "
+ getFullAckTableName() + ".CONTAINER="
+ getFullMessageTableName() + ".CONTAINER ))";
}
return deleteOldMessagesStatement;
}
@ -391,7 +397,9 @@ public class Statements {
public String getFindNextMessagesByPriorityStatement() {
if (findNextMessagesByPriorityStatement == null) {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? ORDER BY PRIORITY DESC, ID";
+ " WHERE CONTAINER=?"
+ " AND ((ID > ? AND PRIORITY = ?) OR PRIORITY < ?)"
+ " ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;
}
@ -401,9 +409,11 @@ public class Statements {
*/
public String getLastAckedDurableSubscriberMessageStatement() {
if (lastAckedDurableSubscriberMessageStatement == null) {
lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM "
lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID), PRIORITY FROM "
+ getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
+ " GROUP BY PRIORITY"
+ " ORDER BY PRIORITY ASC";
}
return lastAckedDurableSubscriberMessageStatement;
}

View File

@ -508,8 +508,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
* @param retroactive
* @throws SQLException
* @throws IOException
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
* org.apache.activemq.service.SubscriptionInfo)
*/
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
throws SQLException, IOException {
@ -644,11 +642,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
public long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriberName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
long result = -1;
long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
try {
s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
@ -656,7 +654,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(3, subscriberName);
rs = s.executeQuery();
if (rs.next()) {
result = rs.getLong(1);
result[0] = rs.getLong(1);
result[1] = rs.getLong(2);
}
rs.close();
s.close();
@ -784,7 +783,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
@ -795,8 +794,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
if (!isPrioritizedMessages()) {
s.setLong(2, nextSeq);
if (isPrioritizedMessages()) {
s.setLong(3, priority);
s.setLong(4, priority);
}
rs = s.executeQuery();
int count = 0;

View File

@ -25,6 +25,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
@ -82,7 +83,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
final MessageId messageId) throws IOException {
final MessageId messageId, MessageAck originalAck) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
@ -138,7 +139,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
try {
SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
if (sub != null) {
longTermStore.acknowledge(context, clientId, subscritionName, messageId);
longTermStore.acknowledge(context, clientId, subscritionName, messageId, null);
}
} catch (Throwable e) {
LOG.debug("Could not replay acknowledge for message '" + messageId
@ -177,7 +178,7 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
subscriptionKey.subscriptionName, identity);
subscriptionKey.subscriptionName, identity, null);
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@ -79,7 +80,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
}
public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId) throws IOException {
MessageId messageId, MessageAck ack) throws IOException {
String subcriberId = getSubscriptionKey(clientId, subscriptionName);
TopicSubContainer container = subscriberMessages.get(subcriberId);
if (container != null) {

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@ -224,7 +225,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
public void acknowledge(ConnectionContext context,
String clientId, String subscriptionName, MessageId messageId) throws IOException {
String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
acknowledgeReference(context, clientId, subscriptionName, messageId);
}

View File

@ -611,7 +611,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId)
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack)
throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isConcurrentStoreAndDispatchTopics()) {

View File

@ -277,7 +277,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
super(destination);
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));

View File

@ -25,6 +25,7 @@ import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
@ -66,7 +67,8 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
MemoryTopicSub sub = topicSubMap.get(key);
if (sub != null) {

View File

@ -74,7 +74,7 @@ public abstract class TestSupport extends CombinationTestSupport {
* Returns the name of the destination used in this test case
*/
protected String getDestinationString() {
return getClass().getName() + "." + getName();
return getClass().getName() + "." + getName(true);
}
/**

View File

@ -47,14 +47,15 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
ActiveMQConnectionFactory factory;
Connection conn;
Session sess;
protected Session sess;
public boolean useCache;
public boolean dispatchAsync = false;
public int prefetchVal = 500;
int MSG_NUM = 1000;
int HIGH_PRI = 7;
int LOW_PRI = 3;
public int MSG_NUM = 600;
public int HIGH_PRI = 7;
public int LOW_PRI = 3;
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
@ -78,6 +79,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
prefetch.setAll(prefetchVal);
factory.setPrefetchPolicy(prefetch);
factory.setWatchTopicAdvisories(false);
factory.setDispatchAsync(dispatchAsync);
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
@ -110,7 +112,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
class ProducerThread extends Thread {
protected class ProducerThread extends Thread {
int priority;
int messageCount;
@ -154,7 +156,8 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
MessageConsumer queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < MSG_NUM * 2; i++) {
Message msg = queueConsumer.receive(1000);
Message msg = queueConsumer.receive(5000);
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
@ -196,6 +199,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public void initCombosForTestDurableSubsReconnect() {
addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testDurableSubsReconnect() throws Exception {
@ -217,7 +221,8 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
final int closeFrequency = MSG_NUM/4;
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 2; i++) {
Message msg = sub.receive(5000);
Message msg = sub.receive(30000);
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
if (i>0 && i%closeFrequency==0) {

View File

@ -17,14 +17,20 @@
package org.apache.activemq.store.jdbc;
import javax.jms.Message;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCMessagePriorityTest extends MessagePriorityTest {
private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class);
@Override
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
@ -33,16 +39,56 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest {
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
jdbc.setCleanupPeriod(1000);
return jdbc;
}
// this cannot be a general test as kahaDB just has support for 3 priority levels
public void testDurableSubsReconnectWithFourLevels() throws Exception {
ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
final String subName = "priorityDisconnect";
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
sub.close();
final int MED_PRI = LOW_PRI + 1;
final int MED_HIGH_PRI = HIGH_PRI - 1;
ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
ProducerThread medPri = new ProducerThread(topic, MSG_NUM, MED_PRI);
ProducerThread medHighPri = new ProducerThread(topic, MSG_NUM, MED_HIGH_PRI);
ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
lowPri.start();
highPri.start();
medPri.start();
medHighPri.start();
lowPri.join();
highPri.join();
medPri.join();
medHighPri.join();
final int closeFrequency = MSG_NUM;
final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI};
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 4; i++) {
Message msg = sub.receive(30000);
LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority());
if (i > 0 && i % closeFrequency == 0) {
LOG.info("Closing durable sub.. on: " + i);
sub.close();
sub = sess.createDurableSubscriber(topic, subName);
}
}
LOG.info("closing on done!");
sub.close();
}
public static Test suite() {
return suite(JDBCMessagePriorityTest.class);
}
// pending fix...
@Override
public void testDurableSubsReconnect() throws Exception {
// TODO: fix jdbc durable sub recovery
}
}

View File

@ -16,9 +16,12 @@
*/
package org.apache.activemq.usecases;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -27,11 +30,12 @@ import java.io.File;
public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
public Boolean usePrioritySupport = Boolean.TRUE;
private BrokerService broker;
private ActiveMQTopic topic;
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://" + getName());
return new ActiveMQConnectionFactory("vm://" + getName(true));
}
@Override
@ -42,6 +46,10 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
return con;
}
public static Test suite() {
return suite(DurableSubscriptionOfflineTest.class);
}
protected void setUp() throws Exception {
topic = (ActiveMQTopic) createDestination();
createBroker();
@ -54,15 +62,19 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
}
private void createBroker() throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
broker.setBrokerName(getName());
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("activemq-data-kaha/" + getName()));
broker.setPersistenceAdapter(persistenceAdapter);
if (usePrioritySupport) {
PolicyEntry policy = new PolicyEntry();
policy.setPrioritizedMessages(true);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
broker.setDestinationPolicy(policyMap);
}
setDefaultPersistenceAdapter(broker);
broker.start();
}
@ -71,6 +83,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
broker.stop();
}
public void initCombosForTestOfflineSubscription() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport",
new Object[]{ Boolean.TRUE, Boolean.FALSE});
}
public void testOfflineSubscription() throws Exception {
// create durable subscription
Connection con = createConnection();

View File

@ -21,7 +21,7 @@
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq=DEBUG
#log4j.logger.org.apache.activemq=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG