https://issues.apache.org/jira/browse/AMQ-3305 and https://issues.apache.org/jira/browse/AMQ-3872 - fix up durable subs with xa recovery for kahadb and tidy up test assumptions re redelivery for jdbc. Shared tests pass with both stores now

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1346594 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-06-05 21:13:34 +00:00
parent f0d0ce07ab
commit b07f31ebc6
6 changed files with 49 additions and 43 deletions

View File

@ -29,6 +29,7 @@ import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@ -756,4 +757,22 @@ public abstract class BaseDestination implements Destination {
answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
return answer;
}
protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
// the original ack may be a ranged ack, but we are trying to delete
// a specific
// message store here so we need to convert to a non ranged ack.
if (ack.getMessageCount() > 0) {
// Dup the ack
MessageAck a = new MessageAck();
ack.copy(a);
ack = a;
// Convert to non-ranged.
ack.setFirstMessageId(node.getMessageId());
ack.setLastMessageId(node.getMessageId());
ack.setMessageCount(1);
}
return ack;
}
}

View File

@ -833,21 +833,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
throws IOException {
messageConsumed(context, node);
if (store != null && node.isPersistent()) {
// the original ack may be a ranged ack, but we are trying to delete
// a specific
// message store here so we need to convert to a non ranged ack.
if (ack.getMessageCount() > 0) {
// Dup the ack
MessageAck a = new MessageAck();
ack.copy(a);
ack = a;
// Convert to non-ranged.
ack.setFirstMessageId(node.getMessageId());
ack.setLastMessageId(node.getMessageId());
ack.setMessageCount(1);
}
store.removeAsyncMessage(context, ack);
store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
}
}

View File

@ -518,7 +518,8 @@ public class Topic extends BaseDestination implements Task {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
convertToNonRangedAck(ack, node));
}
messageConsumed(context, node);
}
@ -763,7 +764,7 @@ public class Topic extends BaseDestination implements Task {
}
public void clearPendingMessages(SubscriptionKey subscriptionKey) {
private void clearPendingMessages(SubscriptionKey subscriptionKey) {
dispatchLock.readLock().lock();
try {
DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);

View File

@ -337,12 +337,6 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
ack,
subscriptionName, clientId);
jdbcTopicMessageStore.complete(clientId, subscriptionName);
Map<ActiveMQDestination, Destination> destinations = ((JDBCPersistenceAdapter) persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap();
Topic topic = (Topic) destinations.get(topicMessageStore.getDestination());
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
topic.getDurableTopicSubs().get(key).getPending().rollback(ack.getLastMessageId());
topic.clearPendingMessages(key);
}

View File

@ -711,6 +711,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
} else {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
}
store(command, false, null, null);
}
@ -819,6 +822,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
listener.recoverMessage(loadMessage(entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
@ -858,6 +864,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++;
}

View File

@ -559,11 +559,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
}
public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
// REVISIT for kahadb
if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
LOG.warn("only works on jdbc");
return;
}
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
@ -581,7 +576,8 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
final int numMessages = 4;
for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
@ -591,7 +587,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
final int messageCount = expectedMessageCount(4, destination);
final int messageCount = expectedMessageCount(numMessages, destination);
Message m = null;
for (int i = 0; i < messageCount; i++) {
m = receiveMessage(connection);
@ -741,12 +737,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
// REVISIT for kahadb
if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
LOG.warn("only works on jdbc");
return;
}
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
@ -843,12 +833,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
// REVISIT for kahadb
if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
LOG.warn("only works on jdbc");
return;
}
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
@ -893,7 +877,20 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
LOG.info("new tx for redelivery");
LOG.info("new consumer/tx for redelivery");
connection.request(closeConnectionInfo(connectionInfo));
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
// setup durable subs
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));