AMQ-6707 - JDBC XA recovery and completion.

ensure pending transactions are visible for recovery without restart
sync store and cursor size during and after completion
ensure pending messages are not visible to browsers
retain transaction state on jdbc error
a bunch of new tests around xa completion
This commit is contained in:
gtully 2018-04-26 14:53:51 +01:00
parent 1d2226e6cf
commit ea70e827c0
22 changed files with 1106 additions and 130 deletions

View File

@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
@ -64,6 +63,7 @@ public class TransactionBroker extends BrokerFilter {
// The prepared XA transactions.
private TransactionStore transactionStore;
private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
final ConnectionContext context = new ConnectionContext();
public TransactionBroker(Broker next, TransactionStore transactionStore) {
super(next);
@ -82,7 +82,6 @@ public class TransactionBroker extends BrokerFilter {
public void start() throws Exception {
transactionStore.start();
try {
final ConnectionContext context = new ConnectionContext();
context.setBroker(this);
context.setInRecoveryMode(true);
context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
@ -128,12 +127,11 @@ public class TransactionBroker extends BrokerFilter {
private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
Destination destination = addDestination(context, amqDestination, false);
registerSync(destination, transaction, ack);
registerSync(amqDestination, transaction, ack);
}
private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
private void registerSync(ActiveMQDestination destination, Transaction transaction, BaseCommand command) {
Synchronization sync = new PreparedDestinationCompletion(this, destination, command.isMessage());
// ensure one per destination in the list
Synchronization existing = transaction.findMatching(sync);
if (existing != null) {
@ -144,10 +142,12 @@ public class TransactionBroker extends BrokerFilter {
}
static class PreparedDestinationCompletion extends Synchronization {
final Destination destination;
private final TransactionBroker transactionBroker;
final ActiveMQDestination destination;
final boolean messageSend;
int opCount = 1;
public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) {
this.transactionBroker = transactionBroker;
this.destination = destination;
// rollback relevant to acks, commit to sends
this.messageSend = messageSend;
@ -173,21 +173,23 @@ public class TransactionBroker extends BrokerFilter {
@Override
public void afterRollback() throws Exception {
if (!messageSend) {
destination.clearPendingMessages();
Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false);
dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getMessages().add(opCount);
LOG.debug("cleared pending from afterRollback: {}", destination);
}
}
@Override
public void afterCommit() throws Exception {
Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false);
if (messageSend) {
destination.clearPendingMessages();
destination.getDestinationStatistics().getEnqueues().add(opCount);
destination.getDestinationStatistics().getMessages().add(opCount);
dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getEnqueues().add(opCount);
dest.getDestinationStatistics().getMessages().add(opCount);
LOG.debug("cleared pending from afterCommit: {}", destination);
} else {
destination.getDestinationStatistics().getDequeues().add(opCount);
destination.getDestinationStatistics().getMessages().subtract(opCount);
dest.getDestinationStatistics().getDequeues().add(opCount);
}
}
}

View File

@ -239,7 +239,7 @@ public interface Destination extends Service, Task, Message.MessageDestination {
boolean isDoOptimzeMessageStorage();
void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
public void clearPendingMessages();
public void clearPendingMessages(int pendingAdditionsCount);
void duplicateFromStore(Message message, Subscription subscription);
}

View File

@ -379,8 +379,8 @@ public class DestinationFilter implements Destination {
}
@Override
public void clearPendingMessages() {
next.clearPendingMessages();
public void clearPendingMessages(int pendingAdditionsCount) {
next.clearPendingMessages(0);
}
@Override

View File

@ -413,9 +413,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
Destination nodeDest = (Destination) node.getRegionDestination();
synchronized (dispatchLock) {
getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
nodeDest.getDestinationStatistics().getInflight().decrement();
if (dispatched.remove(node)) {
// if consumer is removed, dispatched will be empty and inflight will
// already have been adjusted
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
nodeDest.getDestinationStatistics().getInflight().decrement();
}
}
contractPrefetchExtension(1);
nodeDest.wakeup();

View File

@ -68,6 +68,7 @@ import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
@ -1248,7 +1249,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
QueueMessageReference ref = (QueueMessageReference) i.next();
if (ref.isExpired() && (ref.getLockOwner() == null)) {
toExpire.add(ref);
} else if (l.contains(ref.getMessage()) == false) {
} else if (!ref.isAcked() && l.contains(ref.getMessage()) == false) {
l.add(ref.getMessage());
}
}
@ -1326,9 +1327,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
@Override
public void clearPendingMessages() {
public void clearPendingMessages(int pendingAdditionsCount) {
messagesLock.writeLock().lock();
try {
final ActiveMQMessage dummyPersistent = new ActiveMQMessage();
dummyPersistent.setPersistent(true);
for (int i=0; i<pendingAdditionsCount; i++) {
try {
// track the increase in the cursor size w/o reverting to the store
messages.addMessageFirst(dummyPersistent);
} catch (Exception ignored) {
LOG.debug("Unexpected exception on tracking pending message additions", ignored);
}
}
if (resetNeeded) {
messages.gc();
messages.reset();

View File

@ -850,9 +850,10 @@ public class Topic extends BaseDestination implements Task {
/**
* force a reread of the store - after transaction recovery completion
* @param pendingAdditionsCount
*/
@Override
public void clearPendingMessages() {
public void clearPendingMessages(int pendingAdditionsCount) {
dispatchLock.readLock().lock();
try {
for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {

View File

@ -79,7 +79,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
@Override
public void rebase() {
resetSize();
MessageId lastAdded = lastCachedIds[SYNC_ADD];
if (lastAdded != null) {
try {
@ -397,7 +396,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
@Override
public synchronized void addMessageFirst(MessageReference node) throws Exception {
setCacheEnabled(false);
size++;
}

View File

@ -29,12 +29,10 @@ import org.apache.activemq.usage.MemoryUsage;
/**
* A simple proxy that delegates to another MessageStore.
*/
public class ProxyTopicMessageStore implements TopicMessageStore {
final TopicMessageStore delegate;
public class ProxyTopicMessageStore extends ProxyMessageStore implements TopicMessageStore {
public ProxyTopicMessageStore(TopicMessageStore delegate) {
this.delegate = delegate;
super(delegate);
}
public MessageStore getDelegate() {
@ -83,40 +81,40 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
@Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return delegate.lookupSubscription(clientId, subscriptionName);
return ((TopicMessageStore)delegate).lookupSubscription(clientId, subscriptionName);
}
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
delegate.acknowledge(context, clientId, subscriptionName, messageId, ack);
((TopicMessageStore)delegate).acknowledge(context, clientId, subscriptionName, messageId, ack);
}
@Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
delegate.addSubscription(subscriptionInfo, retroactive);
((TopicMessageStore)delegate).addSubscription(subscriptionInfo, retroactive);
}
@Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
delegate.deleteSubscription(clientId, subscriptionName);
((TopicMessageStore)delegate).deleteSubscription(clientId, subscriptionName);
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
throws Exception {
delegate.recoverSubscription(clientId, subscriptionName, listener);
((TopicMessageStore)delegate).recoverSubscription(clientId, subscriptionName, listener);
}
@Override
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
((TopicMessageStore)delegate).recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
delegate.resetBatching(clientId, subscriptionName);
((TopicMessageStore)delegate).resetBatching(clientId, subscriptionName);
}
@Override
@ -126,7 +124,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return delegate.getAllSubscriptions();
return ((TopicMessageStore)delegate).getAllSubscriptions();
}
@Override
@ -136,7 +134,7 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
@Override
public int getMessageCount(String clientId, String subscriberName) throws IOException {
return delegate.getMessageCount(clientId, subscriberName);
return ((TopicMessageStore)delegate).getMessageCount(clientId, subscriberName);
}
@Override
@ -230,11 +228,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
@Override
public long getMessageSize(String clientId, String subscriberName)
throws IOException {
return delegate.getMessageSize(clientId, subscriberName);
return ((TopicMessageStore)delegate).getMessageSize(clientId, subscriberName);
}
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return delegate.getMessageStoreSubStatistics();
return ((TopicMessageStore)delegate).getMessageStoreSubStatistics();
}
}

View File

@ -107,11 +107,12 @@ public class MemoryTransactionStore implements TransactionStore {
cmd.run(ctx);
}
persistenceAdapter.commitTransaction(ctx);
} catch (IOException e) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
persistenceAdapter.commitTransaction(ctx);
}
}
@ -267,7 +268,7 @@ public class MemoryTransactionStore implements TransactionStore {
}
Tx tx;
if (wasPrepared) {
tx = preparedTransactions.remove(txid);
tx = preparedTransactions.get(txid);
} else {
tx = inflightTransactions.remove(txid);
}
@ -275,6 +276,9 @@ public class MemoryTransactionStore implements TransactionStore {
if (tx != null) {
tx.commit();
}
if (wasPrepared) {
preparedTransactions.remove(txid);
}
if (postCommit != null) {
postCommit.run();
}

View File

@ -72,8 +72,8 @@ public class XATransaction extends Transaction {
case PREPARED_STATE:
// 2 phase commit, work done.
// We would record commit here.
setStateFinished();
storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
setStateFinished();
break;
default:
illegalStateTransition("commit");
@ -88,9 +88,21 @@ public class XATransaction extends Transaction {
} catch (XAException xae) {
throw xae;
} catch (Throwable t) {
LOG.warn("Store COMMIT FAILED: ", t);
rollback();
XAException xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBOTHER);
LOG.warn("Store COMMIT FAILED: " + txid, t);
XAException xae = null;
if (wasPrepared) {
// report and await outcome
xae = newXAException("STORE COMMIT FAILED: " + t.getMessage(), XAException.XA_RETRY);
// fire rollback syncs to revert
doPostRollback();
} else {
try {
rollback();
xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBCOMMFAIL);
} catch (Throwable e) {
xae = newXAException("STORE COMMIT FAILED: " + t.getMessage() +". Rolled failed:" + e.getMessage(), XAException.XA_RBINTEGRITY);
}
}
xae.initCause(t);
throw xae;
}

View File

@ -120,7 +120,7 @@ import org.slf4j.LoggerFactory;
if (destination instanceof Queue) {
Queue queue = (Queue)destination;
if (queue.isResetNeeded()) {
queue.clearPendingMessages();
queue.clearPendingMessages(0);
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
@ -84,7 +85,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
private WireFormat wireFormat = new OpenWireFormat();
private Statements statements;
private JDBCAdapter adapter;
private MemoryTransactionStore transactionStore;
private final JdbcMemoryTransactionStore transactionStore = new JdbcMemoryTransactionStore(this);
private ScheduledFuture<?> cleanupTicket;
private int cleanupPeriod = 1000 * 60 * 5;
private boolean useExternalMessageReferences;
@ -102,6 +103,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
protected final HashMap<ActiveMQDestination, MessageStore> storeCache = new HashMap<>();
{
setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
@ -191,18 +193,26 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
MessageStore rc = storeCache.get(destination);
if (rc == null) {
MessageStore store = transactionStore.proxy(new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit));
rc = storeCache.putIfAbsent(destination, store);
if (rc == null) {
rc = store;
}
}
return rc;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
TopicMessageStore rc = (TopicMessageStore) storeCache.get(destination);
if (rc == null) {
TopicMessageStore store = transactionStore.proxy(new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit));
rc = (TopicMessageStore) storeCache.putIfAbsent(destination, store);
if (rc == null) {
rc = store;
}
}
return rc;
}
@ -220,6 +230,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
LOG.error("Failed to remove consumer destination: " + destination, ioe);
}
}
storeCache.remove(destination);
}
private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
@ -243,13 +254,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
storeCache.remove(destination);
}
@Override
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new JdbcMemoryTransactionStore(this);
}
return this.transactionStore;
}

View File

@ -152,7 +152,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
public LastRecoveredEntry defaultPriority() {
return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
return perPriority[0];
}
@Override
@ -321,7 +321,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
final String key = getSubscriptionKey(clientId, subscriptionName);
LastRecovered recovered = new LastRecovered();
recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
recovered.perPriority[priority].recovered = sequenceId;
subscriberLastRecoveredMap.put(key, recovered);
pendingCompletion.add(key);
LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);

View File

@ -18,16 +18,12 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
@ -36,12 +32,9 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
@ -57,9 +50,6 @@ import org.apache.activemq.util.DataByteArrayInputStream;
public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
private HashMap<ActiveMQDestination, MessageStore> queueStores = new HashMap<ActiveMQDestination, MessageStore>();
public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
super(jdbcPersistenceAdapter);
}
@ -332,38 +322,37 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
}
@Override
protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
}
@Override
protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) {
queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate());
}
@Override
protected void onRecovered(Tx tx) {
for (RemoveMessageCommand removeMessageCommand: tx.acks) {
if (removeMessageCommand instanceof LastAckCommand) {
LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) findMessageStore(lastAckCommand.getMessageAck().getDestination());
jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
lastAckCommand.setMessageStore(jdbcTopicMessageStore);
} else {
// when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
// but the sql is non portable to match BLOB with LIKE etc
// so we make up for it when we recover the ack
((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(queueStores.get(removeMessageCommand.getMessageAck().getDestination()));
((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(findMessageStore(removeMessageCommand.getMessageAck().getDestination()));
}
}
for (AddMessageCommand addMessageCommand : tx.messages) {
ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination));
addMessageCommand.setMessageStore(findMessageStore(addMessageCommand.getMessage().getDestination()));
}
}
private MessageStore findMessageStore(ActiveMQDestination destination) {
ProxyMessageStore proxyMessageStore = null;
try {
if (destination.isQueue()) {
proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
} else {
proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
}
} catch (IOException error) {
throw new RuntimeException("Failed to find/create message store for destination: " + destination, error);
}
return proxyMessageStore.getDelegate();
}
@Override
public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {

View File

@ -243,6 +243,7 @@ public class TransactionContext {
updateLastAckStatement.close();
updateLastAckStatement = null;
}
completions.clear();
connection.rollback();
}

View File

@ -66,6 +66,7 @@ import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.TransactionStore;
@ -363,11 +364,61 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
this.forceRecoverIndex = forceRecoverIndex;
}
public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException {
if (preparedAcks != null) {
Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
for (MessageAck ack : preparedAcks) {
stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
}
ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
for (MessageAck ack : preparedAcks) {
if (entry.getKey().equals(ack.getDestination())) {
perStoreAcks.add(ack);
}
}
entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback);
perStoreAcks.clear();
}
}
}
public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException {
Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
for (MessageAck ack : preparedAcks) {
stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
}
ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
for (MessageAck ack : preparedAcks) {
if (entry.getKey().equals(ack.getDestination())) {
perStoreAcks.add(ack);
}
}
entry.getValue().trackRecoveredAcks(perStoreAcks);
perStoreAcks.clear();
}
}
private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination));
if (store == null) {
if (activeMQDestination.isQueue()) {
store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);
} else {
store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination);
}
}
return (KahaDBMessageStore) store.getDelegate();
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
protected final Set<String> ackedAndPrepared = new HashSet<>();
protected final Set<String> rolledBackAcks = new HashSet<>();
double doneTasks, canceledTasks = 0;
@ -383,6 +434,39 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
return destination;
}
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store.
// 'at most once' XA guarantee
public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
indexLock.writeLock().unlock();
}
}
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
if (acks != null) {
indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
final String id = ack.getLastMessageId().toProducerKey();
ackedAndPrepared.remove(id);
if (rollback) {
rolledBackAcks.add(id);
incrementAndAddSizeToStoreStat(dest, 0);
}
}
} finally {
indexLock.writeLock().unlock();
}
}
}
@Override
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException {
@ -739,6 +823,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
return statistics;
}
});
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
} finally {

View File

@ -3016,39 +3016,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>();
@SuppressWarnings("rawtypes")
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>();
protected final Set<String> ackedAndPrepared = new HashSet<>();
protected final Set<String> rolledBackAcks = new HashSet<>();
// messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
// till then they are skipped by the store.
// 'at most once' XA guarantee
public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
this.indexLock.writeLock().unlock();
}
}
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
if (acks != null) {
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
final String id = ack.getLastMessageId().toProducerKey();
ackedAndPrepared.remove(id);
if (rollback) {
rolledBackAcks.add(id);
}
}
} finally {
this.indexLock.writeLock().unlock();
}
}
}
@SuppressWarnings("rawtypes")
private List<Operation> getInflightTx(KahaTransactionInfo info) {

View File

@ -48,6 +48,7 @@ public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
@Override
protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker);
broker.setAdvisorySupport(false);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource);

View File

@ -698,8 +698,8 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// validate destination depth via jmx
DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
assertEquals("enqueue count does not see prepared acks", 4, destinationView.getQueueSize());
assertEquals("enqueue count does not see prepared acks", 0, destinationView.getDequeueCount());
assertEquals("enqueue count does not see prepared acks", 0, destinationView.getQueueSize());
assertEquals("dequeue count does not see prepared acks", 0, destinationView.getDequeueCount());
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
@ -708,7 +708,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
assertEquals("enqueue count does not see commited acks", 0, destinationView.getQueueSize());
assertEquals("enqueue count does not see commited acks", 4, destinationView.getDequeueCount());
assertEquals("dequeue count does not see commited acks", 4, destinationView.getDequeueCount());
}

View File

@ -151,6 +151,7 @@ public class JDBCCommitExceptionTest extends TestCase {
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
jdbc = new BrokenPersistenceAdapter();
jdbc.setUseLock(false);

View File

@ -17,13 +17,10 @@
package org.apache.activemq.store.jdbc;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
@ -31,6 +28,9 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.XATransactionId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,7 +47,7 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
@Override
public void setUp() throws Exception {
super.setUp();
onePhase = true;
factory = new ActiveMQXAConnectionFactory(
connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
}
@ -129,4 +129,41 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
}
public void testCommitSendErrorRecovery() throws Exception {
XAConnection connection = factory.createXAConnection();
connection.start();
XASession session = connection.createXASession();
Destination destination = session.createQueue("TEST");
MessageProducer producer = session.createProducer(destination);
XAResource resource = session.getXAResource();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
message.setTransactionId(new XATransactionId(tid));
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
jdbc.setShouldBreak(true);
try {
resource.commit(tid, true);
} catch (Exception expected) {
expected.printStackTrace();
}
// recover
Xid[] recovered = resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
jdbc.setShouldBreak(false);
resource.commit(recovered[0], false);
assertEquals("one enque", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
}
}

View File

@ -0,0 +1,857 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.XASession;
import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Enumeration;
import static org.apache.activemq.util.TestUtils.createXid;
@RunWith(value = Parameterized.class)
public class XACompletionTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(XACompletionTest.class);
protected ActiveMQXAConnectionFactory factory;
protected static final int messagesExpected = 1;
protected BrokerService broker;
protected String connectionUri;
@Parameterized.Parameter
public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
@Parameterized.Parameters(name="store={0}")
public static Iterable<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{ {TestSupport.PersistenceAdapterChoice.KahaDB},{PersistenceAdapterChoice.JDBC} });
}
@Before
public void setUp() throws Exception {
broker = createBroker();
}
@After
public void stopAll() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
broker = null;
}
}
@Test
public void testStatsAndRedispatchAfterAckPreparedClosed() throws Exception {
factory = new ActiveMQXAConnectionFactory(
connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + 0);
factory.setWatchTopicAdvisories(false);
sendMessages(1);
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
Message message = consumer.receive(2000);
LOG.info("Received : " + message);
resource.end(tid, XAResource.TMSUCCESS);
activeMQXAConnection.close();
dumpMessages();
dumpMessages();
LOG.info("Try jmx browse... after commit");
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
assertEquals("size", 1, proxy.getQueueSize());
LOG.info("Try receive... after rollback");
message = regularReceive("TEST");
assertNotNull("message gone", message);
}
@Test
public void testStatsAndBrowseAfterAckPreparedCommitted() throws Exception {
factory = new ActiveMQXAConnectionFactory(
connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
factory.setWatchTopicAdvisories(false);
sendMessages(messagesExpected);
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
int messagesReceived = 0;
for (int i = 0; i < messagesExpected; i++) {
Message message = null;
try {
LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
message = consumer.receive(2000);
LOG.info("Received : " + message);
messagesReceived++;
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
consumer.close();
dumpMessages();
resource.commit(tid, false);
dumpMessages();
LOG.info("Try jmx browse... after commit");
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
assertTrue(proxy.browseMessages().isEmpty());
assertEquals("prefetch 0", 0, proxy.getInFlightCount());
assertEquals("size 0", 0, proxy.getQueueSize());
LOG.info("Try browse... after commit");
Message browsed = regularBrowseFirst();
assertNull("message gone", browsed);
LOG.info("Try receive... after commit");
Message message = regularReceive("TEST");
assertNull("message gone", message);
}
@Test
public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {
factory = new ActiveMQXAConnectionFactory(
connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
sendMessages(10);
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
assertEquals("prefetch 0", 0, proxy.getInFlightCount());
assertEquals("size 0", 10, proxy.getQueueSize());
assertEquals("size 0", 0, proxy.cursorSize());
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
for (int i = 0; i < 5; i++) {
Message message = null;
try {
message = consumer.receive(2000);
LOG.info("Received : " + message);
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
consumer.close();
dumpMessages();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return proxy.getInFlightCount() == 0l;
}
});
assertEquals("prefetch", 0, proxy.getInFlightCount());
assertEquals("size", 10, proxy.getQueueSize());
assertEquals("cursor size", 0, proxy.cursorSize());
resource.rollback(tid);
dumpMessages();
LOG.info("Try jmx browse... after rollback");
assertEquals(10, proxy.browseMessages().size());
assertEquals("prefetch", 0, proxy.getInFlightCount());
assertEquals("size", 10, proxy.getQueueSize());
assertEquals("cursor size", 0, proxy.cursorSize());
LOG.info("Try browse... after");
Message browsed = regularBrowseFirst();
assertNotNull("message gone", browsed);
LOG.info("Try receive... after");
for (int i=0; i<10; i++) {
Message message = regularReceive("TEST");
assertNotNull("message gone", message);
}
}
@Test
public void testStatsAndConsumeAfterAckPreparedRolledback() throws Exception {
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
sendMessages(10);
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
int messagesReceived = 0;
for (int i = 0; i < 5; i++) {
Message message = null;
try {
LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
message = consumer.receive(2000);
LOG.info("Received : " + message);
messagesReceived++;
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
consumer.close();
assertEquals("drain", 5, drainUnack(5, "TEST"));
dumpMessages();
broker = restartBroker();
assertEquals("redrain", 5, drainUnack(5, "TEST"));
LOG.info("Try consume... after restart");
dumpMessages();
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
assertEquals("prefetch", 0, proxy.getInFlightCount());
assertEquals("size", 5, proxy.getQueueSize());
assertEquals("cursor size 0", 0, proxy.cursorSize());
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
xaSession = activeMQXAConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
xaResource.recover(XAResource.TMNOFLAGS);
LOG.info("Rollback outcome for ack");
xaResource.rollback(xids[0]);
LOG.info("Try receive... after rollback");
for (int i=0;i<10; i++) {
Message message = regularReceive("TEST");
assertNotNull("message gone: " + i, message);
}
dumpMessages();
assertNull("none left", regularReceive("TEST"));
assertEquals("prefetch", 0, proxy.getInFlightCount());
assertEquals("size", 0, proxy.getQueueSize());
assertEquals("cursor size", 0, proxy.cursorSize());
assertEquals("dq", 10, proxy.getDequeueCount());
}
@Test
public void testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws Exception {
factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
factory.setWatchTopicAdvisories(false);
sendMessages(20);
for (int i = 0; i < 10; i++) {
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
Message message = null;
try {
message = consumer.receive(2000);
LOG.info("Received (" + i + ") : ," + message);
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
resource.end(tid, XAResource.TMSUCCESS);
resource.prepare(tid);
// no close - b/c messages end up in pagedInPendingDispatch!
// activeMQXAConnection.close();
}
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
xaResource.recover(XAResource.TMNOFLAGS);
xaResource.rollback(xids[0]);
xaResource.rollback(xids[1]);
activeMQXAConnection.close();
LOG.info("RESTART");
broker = restartBroker();
dumpMessages();
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
// set maxBatchSize=1
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1);
javax.jms.Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST");
MessageConsumer consumer = session.createConsumer(destination);
consumer.close();
ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
// recover/rollback the second tx
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
activeMQXAConnectionFactory.setWatchTopicAdvisories(false);
activeMQXAConnection = (ActiveMQXAConnection) activeMQXAConnectionFactory.createXAConnection();
activeMQXAConnection.start();
xaSession = activeMQXAConnection.createXASession();
xaResource = xaSession.getXAResource();
xids = xaResource.recover(XAResource.TMSTARTRSCAN);
xaResource.recover(XAResource.TMNOFLAGS);
for (int i=0; i< xids.length; i++) {
xaResource.rollback(xids[i]);
}
// another prefetch demand of 1
MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("TEST?consumer.prefetchSize=2"));
LOG.info("Try receive... after rollback");
Message message = regularReceiveWith(receiveFactory, "TEST");
assertNotNull("message 1: ", message);
LOG.info("Received : " + message);
dumpMessages();
message = regularReceiveWith(receiveFactory, "TEST");
assertNotNull("last message", message);
LOG.info("Received : " + message);
}
@Test
public void testMoveInTwoBranches() throws Exception {
factory = new ActiveMQXAConnectionFactory(
connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
factory.setWatchTopicAdvisories(false);
sendMessages(messagesExpected);
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
final Xid tid = createXid();
byte[] branch = tid.getBranchQualifier();
final byte[] branch2 = Arrays.copyOf(branch, branch.length);
branch2[0] = '!';
Xid branchTid = new Xid() {
@Override
public int getFormatId() {
return tid.getFormatId();
}
@Override
public byte[] getGlobalTransactionId() {
return tid.getGlobalTransactionId();
}
@Override
public byte[] getBranchQualifier() {
return branch2;
}
};
resource.start(tid, XAResource.TMNOFLAGS);
int messagesReceived = 0;
Message message = null;
for (int i = 0; i < messagesExpected; i++) {
try {
LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
message = consumer.receive(2000);
LOG.info("Received : " + message);
messagesReceived++;
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnectionSend.start();
XASession xaSessionSend = activeMQXAConnection.createXASession();
Destination destinationSend = xaSessionSend.createQueue("TEST_MOVE");
MessageProducer producer = xaSessionSend.createProducer(destinationSend);
XAResource resourceSend = xaSessionSend.getXAResource();
resourceSend.start(branchTid, XAResource.TMNOFLAGS);
ActiveMQMessage toSend = (ActiveMQMessage) xaSessionSend.createTextMessage();
toSend.setTransactionId(new XATransactionId(branchTid));
producer.send(toSend);
resourceSend.end(branchTid, XAResource.TMSUCCESS);
resourceSend.prepare(branchTid);
resource.prepare(tid);
consumer.close();
LOG.info("Prepared");
dumpMessages();
LOG.info("Commit Ack");
resource.commit(tid, false);
dumpMessages();
LOG.info("Commit Send");
resourceSend.commit(branchTid, false);
dumpMessages();
LOG.info("Try jmx browse... after commit");
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
assertTrue(proxy.browseMessages().isEmpty());
assertEquals("dq ", 1, proxy.getDequeueCount());
assertEquals("size 0", 0, proxy.getQueueSize());
ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
QueueViewMBean moveProxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
assertEquals("enq", 1, moveProxy.getEnqueueCount());
assertEquals("size 1", 1, moveProxy.getQueueSize());
assertNotNull(regularReceive("TEST_MOVE"));
assertEquals("size 0", 0, moveProxy.getQueueSize());
}
@Test
public void testMoveInTwoBranchesTwoBrokers() throws Exception {
factory = new ActiveMQXAConnectionFactory(
connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=" + messagesExpected);
factory.setWatchTopicAdvisories(false);
sendMessages(messagesExpected);
ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnection.start();
XASession xaSession = activeMQXAConnection.createXASession();
Destination destination = xaSession.createQueue("TEST");
MessageConsumer consumer = xaSession.createConsumer(destination);
XAResource resource = xaSession.getXAResource();
final Xid tid = createXid();
byte[] branch = tid.getBranchQualifier();
final byte[] branch2 = Arrays.copyOf(branch, branch.length);
branch2[0] = '!';
Xid branchTid = new Xid() {
@Override
public int getFormatId() {
return tid.getFormatId();
}
@Override
public byte[] getGlobalTransactionId() {
return tid.getGlobalTransactionId();
}
@Override
public byte[] getBranchQualifier() {
return branch2;
}
};
resource.start(tid, XAResource.TMNOFLAGS);
int messagesReceived = 0;
Message message = null;
for (int i = 0; i < messagesExpected; i++) {
try {
LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected);
message = consumer.receive(2000);
LOG.info("Received : " + message);
messagesReceived++;
} catch (Exception e) {
LOG.debug("Caught exception:", e);
}
}
resource.end(tid, XAResource.TMSUCCESS);
ActiveMQXAConnection activeMQXAConnectionSend = (ActiveMQXAConnection) factory.createXAConnection();
activeMQXAConnectionSend.start();
XASession xaSessionSend = activeMQXAConnection.createXASession();
Destination destinationSend = xaSessionSend.createQueue("TEST_MOVE");
MessageProducer producer = xaSessionSend.createProducer(destinationSend);
XAResource resourceSend = xaSessionSend.getXAResource();
resourceSend.start(branchTid, XAResource.TMNOFLAGS);
ActiveMQMessage toSend = (ActiveMQMessage) xaSessionSend.createTextMessage();
toSend.setTransactionId(new XATransactionId(branchTid));
producer.send(toSend);
resourceSend.end(branchTid, XAResource.TMSUCCESS);
resourceSend.prepare(branchTid);
resource.prepare(tid);
consumer.close();
LOG.info("Prepared");
dumpMessages();
LOG.info("Commit Ack");
resource.commit(tid, false);
dumpMessages();
LOG.info("Commit Send");
resourceSend.commit(branchTid, false);
dumpMessages();
LOG.info("Try jmx browse... after commit");
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
assertTrue(proxy.browseMessages().isEmpty());
assertEquals("dq ", 1, proxy.getDequeueCount());
assertEquals("size 0", 0, proxy.getQueueSize());
ObjectName queueMoveViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE");
QueueViewMBean moveProxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueMoveViewMBeanName, QueueViewMBean.class, true);
assertEquals("enq", 1, moveProxy.getEnqueueCount());
assertEquals("size 1", 1, moveProxy.getQueueSize());
assertNotNull(regularReceive("TEST_MOVE"));
assertEquals("size 0", 0, moveProxy.getQueueSize());
}
private Message regularReceive(String qName) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
return regularReceiveWith(factory, qName);
}
private Message regularReceiveWith(ActiveMQConnectionFactory factory, String qName) throws Exception {
javax.jms.Connection connection = factory.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(qName);
MessageConsumer consumer = session.createConsumer(destination);
return consumer.receive(2000);
} finally {
connection.close();
}
}
private int drainUnack(int limit, String qName) throws Exception {
int drained = 0;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit);
javax.jms.Connection connection = factory.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(qName);
MessageConsumer consumer = session.createConsumer(destination);
while (drained < limit && consumer.receive(2000) != null) {
drained++;
};
consumer.close();
} finally {
connection.close();
}
return drained;
}
private Message regularBrowseFirst() throws Exception {
javax.jms.Connection connection = factory.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("TEST");
QueueBrowser browser = session.createBrowser(destination);
Enumeration e = browser.getEnumeration();
if (e.hasMoreElements()) {
return (Message) e.nextElement();
}
return null;
} finally {
connection.close();
}
}
protected void sendMessages(int messagesExpected) throws Exception {
sendMessagesWith(factory, messagesExpected);
}
protected void sendMessagesWith(ConnectionFactory factory, int messagesExpected) throws Exception {
javax.jms.Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i=0; i<messagesExpected; i++) {
LOG.debug("Sending message " + (i+1) + " of " + messagesExpected);
producer.send(session.createTextMessage("test message " + (i+1)));
}
connection.close();
}
protected void dumpMessages() throws Exception {
if (persistenceAdapterChoice.compareTo(PersistenceAdapterChoice.JDBC) != 0) {
return;
}
WireFormat wireFormat = new OpenWireFormat();
java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS");
ResultSet result = statement.executeQuery();
LOG.info("Messages in broker db...");
while(result.next()) {
long id = result.getLong(1);
org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
String xid = result.getString(3);
LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message);
}
statement.close();
conn.close();
}
protected BrokerService createBroker() throws Exception {
return createBroker(true);
}
protected BrokerService restartBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
return createBroker(false);
}
protected BrokerService createBroker(boolean del) throws Exception {
BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0);
policyMap.setDefaultEntry(policyEntry);
broker.setDestinationPolicy(policyMap);
broker.setDeleteAllMessagesOnStartup(del);
setPersistenceAdapter(broker, persistenceAdapterChoice);
broker.setPersistent(true);
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
broker.start();
return broker;
}
}