diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 7bbe360c3e..3f925b4ffa 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -794,23 +794,6 @@ public abstract class BaseDestination implements Destination { public void duplicateFromStore(Message message, Subscription durableSub) { ConnectionContext connectionContext = createConnectionContext(); - - TransactionId transactionId = message.getTransactionId(); - if (transactionId != null && transactionId.isXATransaction()) { - try { - List preparedTx = Arrays.asList(broker.getRoot().getPreparedTransactions(connectionContext)); - getLog().trace("processing duplicate in {}, prepared {} ", transactionId, preparedTx); - if (!preparedTx.contains(transactionId)) { - // duplicates from past transactions expected after org.apache.activemq.broker.region.Destination#clearPendingMessages - // till they are acked - getLog().debug("duplicate message from store {}, from historical transaction {}, ignoring", message.getMessageId(), transactionId); - return; - } - } catch (Exception ignored) { - getLog().debug("failed to determine state of transaction {} on duplicate message {}", transactionId, message.getMessageId(), ignored); - } - } - getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); Throwable cause = new Throwable("duplicate from store for " + destination); message.setRegionDestination(this); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 0ae4463c73..a8a864e58a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1321,11 +1321,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void clearPendingMessages() { messagesLock.writeLock().lock(); try { - if (store != null) { - store.resetBatching(); + if (resetNeeded) { + messages.gc(); + messages.reset(); + resetNeeded = false; + } else { + messages.rebase(); } - messages.gc(); - messages.reset(); asyncWakeup(); } finally { messagesLock.writeLock().unlock(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index b962a1a8d3..e9c3c75efd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -335,4 +335,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs public synchronized void setCacheEnabled(boolean val) { cacheEnabled = val; } + + public void rebase() { + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index bb77afb12a..d0b1a39cab 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -69,6 +69,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i this.storeHasMessages=this.size > 0; } + @Override + public void rebase() { + resetSize(); + } + public final synchronized void stop() throws Exception { resetBatch(); super.stop(); @@ -289,7 +294,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return !batchList.isEmpty(); } - + public final synchronized int size() { if (size < 0) { this.size = getStoreSize(); @@ -307,7 +312,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected abstract void doFillBatch() throws Exception; protected abstract void resetBatch(); - + protected abstract int getStoreSize(); protected abstract boolean isStoreEmpty(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 52e5174381..1fecb95102 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -299,5 +299,7 @@ public interface PendingMessageCursor extends Service { * @return true if cache is being used */ public boolean isCacheEnabled(); - + + public void rebase(); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index 24f31c7095..b3f4261eae 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -309,4 +309,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } return cacheEnabled; } + + @Override + public void rebase() { + persistent.rebase(); + reset(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java index a5de7fa20e..7e026940c5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java @@ -33,7 +33,10 @@ import org.apache.activemq.store.TransactionStore; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -46,7 +49,7 @@ import java.util.concurrent.Future; public class MemoryTransactionStore implements TransactionStore { protected ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); - protected ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); + protected Map preparedTransactions = Collections.synchronizedMap(new LinkedHashMap()); protected final PersistenceAdapter persistenceAdapter; private boolean doingRecover; @@ -117,6 +120,8 @@ public class MemoryTransactionStore implements TransactionStore { MessageStore getMessageStore(); void run(ConnectionContext context) throws IOException; + + void setMessageStore(MessageStore messageStore); } public interface RemoveMessageCommand { @@ -132,7 +137,7 @@ public class MemoryTransactionStore implements TransactionStore { } public MessageStore proxy(MessageStore messageStore) { - return new ProxyMessageStore(messageStore) { + ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) { @Override public void addMessage(ConnectionContext context, final Message send) throws IOException { MemoryTransactionStore.this.addMessage(getDelegate(), send); @@ -165,6 +170,11 @@ public class MemoryTransactionStore implements TransactionStore { MemoryTransactionStore.this.removeMessage(getDelegate(), ack); } }; + onProxyQueueStore(proxyMessageStore); + return proxyMessageStore; + } + + protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) { } public TopicMessageStore proxy(TopicMessageStore messageStore) { @@ -309,6 +319,7 @@ public class MemoryTransactionStore implements TransactionStore { if (message.getTransactionId() != null) { Tx tx = getTx(message.getTransactionId()); tx.add(new AddMessageCommand() { + MessageStore messageStore = destination; public Message getMessage() { return message; } @@ -322,6 +333,11 @@ public class MemoryTransactionStore implements TransactionStore { destination.addMessage(ctx, message); } + @Override + public void setMessageStore(MessageStore messageStore) { + this.messageStore = messageStore; + } + }); } else { destination.addMessage(null, message); diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 41b45771df..effbc8389d 100755 --- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -301,7 +301,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } } if (LOG.isDebugEnabled()) { - LOG.debug("restore consumer: " + infoToSend.getConsumerId()); + LOG.debug("consumer: " + infoToSend.getConsumerId()); } transport.oneway(infoToSend); } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index c4c3406330..0ee982358d 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -18,6 +18,9 @@ package org.apache.activemq.store.jdbc; import java.io.IOException; import java.sql.SQLException; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.ActiveMQMessageAudit; @@ -65,7 +68,7 @@ public class JDBCMessageStore extends AbstractMessageStore { protected final JDBCPersistenceAdapter persistenceAdapter; protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); - + final Set recoveredAdditions = new LinkedHashSet(); protected ActiveMQMessageAudit audit; public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { @@ -136,6 +139,9 @@ public class JDBCMessageStore extends AbstractMessageStore { } protected void onAdd(MessageId messageId, long sequenceId, byte priority) { + if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) { + recoveredAdditions.add(sequenceId); + } } public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { @@ -275,6 +281,18 @@ public class JDBCMessageStore extends AbstractMessageStore { public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { TransactionContext c = persistenceAdapter.getTransactionContext(); try { + if (!recoveredAdditions.isEmpty()) { + for (Iterator iterator = recoveredAdditions.iterator(); iterator.hasNext(); ) { + Long sequenceId = iterator.next(); + iterator.remove(); + maxReturned--; + if (sequenceId <= lastRecoveredSequenceId.get()) { + Message msg = (Message)wireFormat.unmarshal(new ByteSequence(adapter.doGetMessageById(c, sequenceId))); + LOG.trace("recovered add {} {}", this, msg.getMessageId()); + listener.recoverMessage(msg); + } + } + } adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index 520ff1638e..135528e9ea 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -28,6 +28,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.ProxyMessageStore; import org.apache.activemq.store.ProxyTopicMessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; @@ -48,6 +49,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { private HashMap topicStores = new HashMap(); + private HashMap queueStores = new HashMap(); public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) { super(jdbcPersistenceAdapter); @@ -110,6 +112,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { message.getPriority()); } + + @Override + public void setMessageStore(MessageStore messageStore) { + throw new RuntimeException("MessageStore already known"); + } }); } tx.messages = updateFromPreparedStateCommands; @@ -166,6 +173,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { message.getMessageId().setEntryLocator(id); Tx tx = getPreparedTx(message.getTransactionId()); tx.add(new AddMessageCommand() { + MessageStore messageStore; @Override public Message getMessage() { return message; @@ -173,12 +181,18 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { @Override public MessageStore getMessageStore() { - return null; + return messageStore; } @Override public void run(ConnectionContext context) throws IOException { ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId()); + ((JDBCMessageStore)messageStore).onAdd(message.getMessageId(), ((Long)message.getMessageId().getEntryLocator()).longValue(), message.getPriority()); + } + + @Override + public void setMessageStore(MessageStore messageStore) { + this.messageStore = messageStore; } }); @@ -289,6 +303,11 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate()); } + @Override + protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) { + queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate()); + } + @Override protected void onRecovered(Tx tx) { for (RemoveMessageCommand removeMessageCommand: tx.acks) { @@ -304,6 +323,10 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment(); } } + for (AddMessageCommand addMessageCommand : tx.messages) { + ActiveMQDestination destination = addMessageCommand.getMessage().getDestination(); + addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination)); + } } @Override diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 3095a0dd35..9255c0027d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -562,6 +562,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); + recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); sd.orderIndex.resetCursorPosition(); for (Iterator> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator .hasNext(); ) { @@ -589,7 +590,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Entry entry = null; - int counter = 0; + int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { entry = iterator.next(); if (ackedAndPrepared.contains(entry.getValue().messageId)) { @@ -610,6 +611,31 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } + protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { + int counter = 0; + String id; + for (Iterator iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { + id = iterator.next(); + iterator.remove(); + Long sequence = sd.messageIdIndex.get(tx, id); + if (sequence != null) { + if (sd.orderIndex.alreadyDispatched(sequence)) { + listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); + counter++; + if (counter >= maxReturned) { + break; + } + } else { + LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); + } + } else { + LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); + } + } + return counter; + } + + @Override public void resetBatching() { if (pageFile.isLoaded()) { @@ -875,6 +901,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { StoredDestination sd = getStoredDestination(dest, tx); LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); sd.orderIndex.setBatch(tx, cursorPos); + recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -918,7 +945,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } Entry entry = null; - int counter = 0; + int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator .hasNext();) { entry = iterator.next(); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index 7bcb99a2ca..1ca1def72f 100755 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -295,7 +295,7 @@ public class KahaDBTransactionStore implements TransactionStore { } else { KahaTransactionInfo info = getTransactionInfo(txid); theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit); - forgetRecoveredAcks(txid); + forgetRecoveredAcks(txid, false); } }else { LOG.error("Null transaction passed on commit"); @@ -310,16 +310,16 @@ public class KahaDBTransactionStore implements TransactionStore { if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { KahaTransactionInfo info = getTransactionInfo(txid); theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null); - forgetRecoveredAcks(txid); + forgetRecoveredAcks(txid, true); } else { inflightTransactions.remove(txid); } } - protected void forgetRecoveredAcks(TransactionId txid) throws IOException { + protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException { if (txid.isXATransaction()) { XATransactionId xaTid = ((XATransactionId) txid); - theStore.forgetRecoveredAcks(xaTid.getPreparedAcks()); + theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback); } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index b185d69742..78e26a956b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2302,6 +2302,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @SuppressWarnings("rawtypes") protected final LinkedHashMap> preparedTransactions = new LinkedHashMap>(); protected final Set ackedAndPrepared = new HashSet(); + protected final Set rolledBackAcks = new HashSet(); // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, // till then they are skipped by the store. @@ -2317,12 +2318,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - public void forgetRecoveredAcks(ArrayList acks) throws IOException { + public void forgetRecoveredAcks(ArrayList acks, boolean rollback) throws IOException { if (acks != null) { this.indexLock.writeLock().lock(); try { for (MessageAck ack : acks) { - ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey()); + final String id = ack.getLastMessageId().toProducerKey(); + ackedAndPrepared.remove(id); + if (rollback) { + rolledBackAcks.add(id); + } } } finally { this.indexLock.writeLock().unlock(); @@ -2945,6 +2950,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return lastGetPriority; } + public boolean alreadyDispatched(Long sequence) { + return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || + (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || + (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); + } + class MessageOrderIterator implements Iterator>{ Iterator>currentIterator; final Iterator>highIterator; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index fb570b21d9..49fe817fb7 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean; import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.*; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.JMXSupport; @@ -233,17 +234,137 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { // Commit the prepared transactions. for (int i = 0; i < dar.getData().length; i++) { - connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i])); + TransactionId transactionId = (TransactionId) dar.getData()[i]; + LOG.info("commit: " + transactionId); + connection.request(createCommitTransaction2Phase(connectionInfo, transactionId)); } // We should get the committed transactions. final int countToReceive = expectedMessageCount(4, destination); for (int i = 0; i < countToReceive ; i++) { Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); assertNotNull("Got non null message: " + i, m); } assertNoMessagesLeft(connection); + assertEmptyDLQ(); + } + + private void assertEmptyDLQ() throws Exception { + try { + DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + assertEquals("nothing on dlq", 0, destinationView.getQueueSize()); + assertEquals("nothing added to dlq", 0, destinationView.getEnqueueCount()); + } catch (java.lang.reflect.UndeclaredThrowableException maybeOk) { + if (maybeOk.getUndeclaredThrowable() instanceof javax.management.InstanceNotFoundException) { + // perfect no dlq + } else { + throw maybeOk; + } + } + } + + public void testPreparedInterleavedTransactionRecoveredOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + // send non tx message + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.request(message); + + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // consume non transacted message, but don't ack + int countToReceive = expectedMessageCount(1, destination); + for (int i=0; i< countToReceive; i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("got non tx message after prepared", m); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + assertEquals(4, dar.getData().length); + + // ensure we can close a connection with prepared transactions + connection.request(closeConnectionInfo(connectionInfo)); + + // open again to deliver outcome + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + + // Commit the prepared transactions. + for (int i = 0; i < dar.getData().length; i++) { + TransactionId transactionId = (TransactionId) dar.getData()[i]; + LOG.info("commit: " + transactionId); + connection.request(createCommitTransaction2Phase(connectionInfo, transactionId)); + } + + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // We should get the committed transactions and the non tx message + countToReceive = expectedMessageCount(5, destination); + for (int i = 0; i < countToReceive ; i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got non null message: " + i, m); + } + + assertNoMessagesLeft(connection); + assertEmptyDLQ(); } public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java index 13bd7556ff..29822f177d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java @@ -292,7 +292,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase { super.executeBatch(); if (throwSQLException){ - throw new SQLException("TEST SQL EXCEPTION from executeBatch"); + throw new SQLException("TEST SQL EXCEPTION from executeBatch after super. execution"); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 54a8a01666..c365d374ce 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -225,7 +225,7 @@ public class FailoverTransactionTest extends TestSupport { setDefaultPersistenceAdapter(broker); broker.start(); - assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); // new transaction Message msg = consumer.receive(20000);