https://issues.apache.org/jira/browse/AMQ-3238 - Topic-Messages not redelivered to durable subscription after rollback and reconnect. resolve by making durable sub ack transaction aware, issue existed across all stores

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1084550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-23 11:50:50 +00:00
parent 68bcac109a
commit 88c10842df
9 changed files with 229 additions and 12 deletions

View File

@ -20,10 +20,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,6 +45,16 @@ class KahaTransaction {
list.add(tx);
}
public void add(KahaMessageStore destination, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) {
TxCommand tx = new TxCommand();
tx.setCommand(ack);
tx.setMessageStoreKey(destination.getId());
tx.setClientId(clientId);
tx.setSubName(subscriptionName);
tx.setMessageId(messageId);
list.add(tx);
}
Message[] getMessages() {
List<BaseCommand> result = new ArrayList<BaseCommand>();
for (int i = 0; i < list.size(); i++) {
@ -89,6 +102,9 @@ class KahaTransaction {
MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
if (command.isRemove()) {
ms.removeMessage(null, (MessageAck)command.getCommand());
} else if (command.isAck()) {
((TopicMessageStore)ms).acknowledge(null, command.getClientId(), command.getSubscriptionName(),
command.getMessageId(), (MessageAck)command.getCommand());
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.kaha.RuntimeStoreException;
@ -84,6 +85,12 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
}
};
}
@ -98,10 +105,6 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
if(before != null) {
before.run();
@ -182,6 +185,23 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
}
}
final void acknowledge(final TopicMessageStore destination, String clientId,
String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
try {
if (ack.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
} else {
destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
}
} catch (RuntimeStoreException rse) {
if (rse.getCause() instanceof IOException) {
brokerService.handleIOException((IOException)rse.getCause());
}
throw rse;
}
}
protected synchronized KahaTransaction getTx(TransactionId key) {
KahaTransaction result = (KahaTransaction)transactions.get(key);
if (result == null) {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadaptor;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.MessageId;
/**
* Base class for messages/acknowledgements for a transaction
@ -27,6 +28,9 @@ import org.apache.activemq.command.CommandTypes;
class TxCommand {
protected Object messageStoreKey;
protected BaseCommand command;
private String clientId;
private String subscriptionName;
private MessageId messageId;
/**
* @return Returns the messageStoreKey.
@ -67,7 +71,34 @@ class TxCommand {
* @return true if a MessageAck command
*/
public boolean isRemove() {
return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK && subscriptionName == null;
}
public boolean isAck() {
return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK && subscriptionName != null;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public void setSubName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public void setMessageId(MessageId messageId) {
this.messageId = messageId;
}
public String getClientId() {
return clientId;
}
public String getSubscriptionName() {
return subscriptionName;
}
public MessageId getMessageId() {
return messageId;
}
}

View File

@ -642,6 +642,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
}

View File

@ -30,6 +30,7 @@ import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
@ -194,6 +195,14 @@ public class KahaDBTransactionStore implements TransactionStore {
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
subscriptionName, messageId, ack);
}
};
}
@ -216,9 +225,6 @@ public class KahaDBTransactionStore implements TransactionStore {
return tx;
}
/**
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
if (txid != null) {
@ -458,6 +464,31 @@ public class KahaDBTransactionStore implements TransactionStore {
}
}
final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
public MessageAck getMessageAck() {
return ack;
}
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
}
private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
return theStore.createTransactionInfo(txid);
}

View File

@ -25,6 +25,7 @@ import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.AbstractMessageStore;
@ -173,6 +174,13 @@ public class MemoryTransactionStore implements TransactionStore {
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
subscriptionName, messageId, ack);
}
};
}
@ -196,10 +204,6 @@ public class MemoryTransactionStore implements TransactionStore {
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
@ -307,6 +311,29 @@ public class MemoryTransactionStore implements TransactionStore {
}
}
final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
if (doingRecover) {
return;
}
if (ack.isInTransaction()) {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand() {
public MessageAck getMessageAck() {
return ack;
}
public void run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
}
});
} else {
destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
}
}
public void delete() {
inflightTransactions.clear();
preparedTransactions.clear();

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
public class AMQStoreDurableSubscriptionTest extends DurableSubscriptionTestSupport {
protected PersistenceAdapter createPersistenceAdapter() throws IOException {
File dataDir = new File("target/test-data/durableAmq");
AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
adapter.setDirectory(dataDir);
return adapter;
}
}

View File

@ -280,6 +280,38 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
assertNull(consumer.receive(5000));
}
public void testDurableSubscriptionRollbackRedeliver() throws Exception {
// Create the durable sub.
connection.start();
session = connection.createSession(true, javax.jms.Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("TestTopic");
consumer = session.createDurableSubscriber(topic, "sub1");
Session producerSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Msg:1"));
// receive and rollback
assertTextMessageEquals("Msg:1", consumer.receive(5000));
session.rollback();
consumer.close();
session.close();
session = connection.createSession(true, javax.jms.Session.SESSION_TRANSACTED);
// Ensure that consumer will receive messages sent and rolled back
consumer = session.createDurableSubscriber(topic, "sub1");
assertTextMessageEquals("Msg:1", consumer.receive(5000));
session.commit();
assertNull(consumer.receive(5000));
}
public void xtestInactiveDurableSubscriptionOneConnection() throws Exception {
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("TestTopic");

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter;
public class KahaDBDurableSubscriptionTest extends DurableSubscriptionTestSupport {
protected PersistenceAdapter createPersistenceAdapter() throws IOException {
return null; // use default
}
}