rework https://issues.apache.org/jira/browse/AMQ-3305 in the context of https://issues.apache.org/jira/browse/AMQ-4952 dlq processing of duplicates, also possible missed dispatch with interleaved xa completion and new messages - recovered transactions are now individually tracked by the store and replayed in the next batch, which negates the need to flush the cursor and avoids duplicates

This commit is contained in:
gtully 2014-03-05 14:51:05 +00:00
parent a7ff5975ee
commit cfe099d1cc
16 changed files with 258 additions and 40 deletions

View File

@ -794,23 +794,6 @@ public abstract class BaseDestination implements Destination {
public void duplicateFromStore(Message message, Subscription durableSub) {
ConnectionContext connectionContext = createConnectionContext();
TransactionId transactionId = message.getTransactionId();
if (transactionId != null && transactionId.isXATransaction()) {
try {
List<TransactionId> preparedTx = Arrays.asList(broker.getRoot().getPreparedTransactions(connectionContext));
getLog().trace("processing duplicate in {}, prepared {} ", transactionId, preparedTx);
if (!preparedTx.contains(transactionId)) {
// duplicates from past transactions expected after org.apache.activemq.broker.region.Destination#clearPendingMessages
// till they are acked
getLog().debug("duplicate message from store {}, from historical transaction {}, ignoring", message.getMessageId(), transactionId);
return;
}
} catch (Exception ignored) {
getLog().debug("failed to determine state of transaction {} on duplicate message {}", transactionId, message.getMessageId(), ignored);
}
}
getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
Throwable cause = new Throwable("duplicate from store for " + destination);
message.setRegionDestination(this);

View File

@ -1321,11 +1321,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void clearPendingMessages() {
messagesLock.writeLock().lock();
try {
if (store != null) {
store.resetBatching();
}
if (resetNeeded) {
messages.gc();
messages.reset();
resetNeeded = false;
} else {
messages.rebase();
}
asyncWakeup();
} finally {
messagesLock.writeLock().unlock();

View File

@ -335,4 +335,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
public synchronized void setCacheEnabled(boolean val) {
cacheEnabled = val;
}
public void rebase() {
}
}

View File

@ -69,6 +69,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
this.storeHasMessages=this.size > 0;
}
@Override
public void rebase() {
resetSize();
}
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();

View File

@ -300,4 +300,6 @@ public interface PendingMessageCursor extends Service {
*/
public boolean isCacheEnabled();
public void rebase();
}

View File

@ -309,4 +309,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
return cacheEnabled;
}
@Override
public void rebase() {
persistent.rebase();
reset();
}
}

View File

@ -33,7 +33,10 @@ import org.apache.activemq.store.TransactionStore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@ -46,7 +49,7 @@ import java.util.concurrent.Future;
public class MemoryTransactionStore implements TransactionStore {
protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
protected final PersistenceAdapter persistenceAdapter;
private boolean doingRecover;
@ -117,6 +120,8 @@ public class MemoryTransactionStore implements TransactionStore {
MessageStore getMessageStore();
void run(ConnectionContext context) throws IOException;
void setMessageStore(MessageStore messageStore);
}
public interface RemoveMessageCommand {
@ -132,7 +137,7 @@ public class MemoryTransactionStore implements TransactionStore {
}
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
@ -165,6 +170,11 @@ public class MemoryTransactionStore implements TransactionStore {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
onProxyQueueStore(proxyMessageStore);
return proxyMessageStore;
}
protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) {
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
@ -309,6 +319,7 @@ public class MemoryTransactionStore implements TransactionStore {
if (message.getTransactionId() != null) {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
MessageStore messageStore = destination;
public Message getMessage() {
return message;
}
@ -322,6 +333,11 @@ public class MemoryTransactionStore implements TransactionStore {
destination.addMessage(ctx, message);
}
@Override
public void setMessageStore(MessageStore messageStore) {
this.messageStore = messageStore;
}
});
} else {
destination.addMessage(null, message);

View File

@ -301,7 +301,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("restore consumer: " + infoToSend.getConsumerId());
LOG.debug("consumer: " + infoToSend.getConsumerId());
}
transport.oneway(infoToSend);
}

View File

@ -18,6 +18,9 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
@ -65,7 +68,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
protected ActiveMQMessageAudit audit;
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
@ -136,6 +139,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) {
recoveredAdditions.add(sequenceId);
}
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
@ -275,6 +281,18 @@ public class JDBCMessageStore extends AbstractMessageStore {
public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
if (!recoveredAdditions.isEmpty()) {
for (Iterator<Long> iterator = recoveredAdditions.iterator(); iterator.hasNext(); ) {
Long sequenceId = iterator.next();
iterator.remove();
maxReturned--;
if (sequenceId <= lastRecoveredSequenceId.get()) {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(adapter.doGetMessageById(c, sequenceId)));
LOG.trace("recovered add {} {}", this, msg.getMessageId());
listener.recoverMessage(msg);
}
}
}
adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
@ -48,6 +49,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
private HashMap<ActiveMQDestination, MessageStore> queueStores = new HashMap<ActiveMQDestination, MessageStore>();
public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
super(jdbcPersistenceAdapter);
@ -110,6 +112,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
message.getPriority());
}
@Override
public void setMessageStore(MessageStore messageStore) {
throw new RuntimeException("MessageStore already known");
}
});
}
tx.messages = updateFromPreparedStateCommands;
@ -166,6 +173,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
message.getMessageId().setEntryLocator(id);
Tx tx = getPreparedTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
MessageStore messageStore;
@Override
public Message getMessage() {
return message;
@ -173,12 +181,18 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
@Override
public MessageStore getMessageStore() {
return null;
return messageStore;
}
@Override
public void run(ConnectionContext context) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
((JDBCMessageStore)messageStore).onAdd(message.getMessageId(), ((Long)message.getMessageId().getEntryLocator()).longValue(), message.getPriority());
}
@Override
public void setMessageStore(MessageStore messageStore) {
this.messageStore = messageStore;
}
});
@ -289,6 +303,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
}
@Override
protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) {
queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate());
}
@Override
protected void onRecovered(Tx tx) {
for (RemoveMessageCommand removeMessageCommand: tx.acks) {
@ -304,6 +323,10 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
}
}
for (AddMessageCommand addMessageCommand : tx.messages) {
ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination));
}
}
@Override

View File

@ -562,6 +562,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
.hasNext(); ) {
@ -589,7 +590,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = 0;
int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
@ -610,6 +611,31 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
int counter = 0;
String id;
for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
id = iterator.next();
iterator.remove();
Long sequence = sd.messageIdIndex.get(tx, id);
if (sequence != null) {
if (sd.orderIndex.alreadyDispatched(sequence)) {
listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
counter++;
if (counter >= maxReturned) {
break;
}
} else {
LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
}
} else {
LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
}
}
return counter;
}
@Override
public void resetBatching() {
if (pageFile.isLoaded()) {
@ -875,6 +901,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@ -918,7 +945,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
Entry<Long, MessageKeys> entry = null;
int counter = 0;
int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();

View File

@ -295,7 +295,7 @@ public class KahaDBTransactionStore implements TransactionStore {
} else {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
forgetRecoveredAcks(txid);
forgetRecoveredAcks(txid, false);
}
}else {
LOG.error("Null transaction passed on commit");
@ -310,16 +310,16 @@ public class KahaDBTransactionStore implements TransactionStore {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
forgetRecoveredAcks(txid);
forgetRecoveredAcks(txid, true);
} else {
inflightTransactions.remove(txid);
}
}
protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
if (txid.isXATransaction()) {
XATransactionId xaTid = ((XATransactionId) txid);
theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback);
}
}

View File

@ -2302,6 +2302,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("rawtypes")
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final Set<String> ackedAndPrepared = new HashSet<String>();
protected final Set<String> rolledBackAcks = new HashSet<String>();
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store.
@ -2317,12 +2318,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException {
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
if (acks != null) {
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
final String id = ack.getLastMessageId().toProducerKey();
ackedAndPrepared.remove(id);
if (rollback) {
rolledBackAcks.add(id);
}
}
} finally {
this.indexLock.writeLock().unlock();
@ -2945,6 +2950,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return lastGetPriority;
}
public boolean alreadyDispatched(Long sequence) {
return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
(cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
(cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
}
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
Iterator<Entry<Long, MessageKeys>>currentIterator;
final Iterator<Entry<Long, MessageKeys>>highIterator;

View File

@ -30,6 +30,7 @@ import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.*;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.JMXSupport;
@ -233,17 +234,137 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// Commit the prepared transactions.
for (int i = 0; i < dar.getData().length; i++) {
connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
TransactionId transactionId = (TransactionId) dar.getData()[i];
LOG.info("commit: " + transactionId);
connection.request(createCommitTransaction2Phase(connectionInfo, transactionId));
}
// We should get the committed transactions.
final int countToReceive = expectedMessageCount(4, destination);
for (int i = 0; i < countToReceive ; i++) {
Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
LOG.info("received: " + m);
assertNotNull("Got non null message: " + i, m);
}
assertNoMessagesLeft(connection);
assertEmptyDLQ();
}
private void assertEmptyDLQ() throws Exception {
try {
DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
assertEquals("nothing on dlq", 0, destinationView.getQueueSize());
assertEquals("nothing added to dlq", 0, destinationView.getEnqueueCount());
} catch (java.lang.reflect.UndeclaredThrowableException maybeOk) {
if (maybeOk.getUndeclaredThrowable() instanceof javax.management.InstanceNotFoundException) {
// perfect no dlq
} else {
throw maybeOk;
}
}
}
public void testPreparedInterleavedTransactionRecoveredOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
// send non tx message
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.request(message);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
restartBroker();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// consume non transacted message, but don't ack
int countToReceive = expectedMessageCount(1, destination);
for (int i=0; i< countToReceive; i++) {
Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
LOG.info("received: " + m);
assertNotNull("got non tx message after prepared", m);
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
// ensure we can close a connection with prepared transactions
connection.request(closeConnectionInfo(connectionInfo));
// open again to deliver outcome
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
// Commit the prepared transactions.
for (int i = 0; i < dar.getData().length; i++) {
TransactionId transactionId = (TransactionId) dar.getData()[i];
LOG.info("commit: " + transactionId);
connection.request(createCommitTransaction2Phase(connectionInfo, transactionId));
}
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// We should get the committed transactions and the non tx message
countToReceive = expectedMessageCount(5, destination);
for (int i = 0; i < countToReceive ; i++) {
Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
LOG.info("received: " + m);
assertNotNull("Got non null message: " + i, m);
}
assertNoMessagesLeft(connection);
assertEmptyDLQ();
}
public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception {

View File

@ -292,7 +292,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
super.executeBatch();
if (throwSQLException){
throw new SQLException("TEST SQL EXCEPTION from executeBatch");
throw new SQLException("TEST SQL EXCEPTION from executeBatch after super. execution");
}

View File

@ -225,7 +225,7 @@ public class FailoverTransactionTest extends TestSupport {
setDefaultPersistenceAdapter(broker);
broker.start();
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);