https://issues.apache.org/jira/browse/AMQ-3872 - Implement "exactly once" delivery with JDBC and XA in the event of a failure post prepare. jdbc xa is not a runner due to single table and update locks. implemention adds xid column to messages and acks table. a non null value indicated a prepared add/ack, so the insert is done on prepare. the result of the transaction outcome requires a row level update

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1345202 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-06-01 14:32:50 +00:00
parent 15f85cd768
commit 64f3492c85
20 changed files with 1532 additions and 150 deletions

View File

@ -25,6 +25,7 @@ import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext;
@ -60,6 +61,7 @@ public class ConnectionContext {
private boolean dontSendReponse;
private boolean clientMaster = true;
private ConnectionState connectionState;
private XATransactionId xid;
public ConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
@ -329,4 +331,12 @@ public class ConnectionContext {
public ConnectionState getConnectionState() {
return this.connectionState;
}
public void setXid(XATransactionId id) {
this.xid = id;
}
public XATransactionId getXid() {
return xid;
}
}

View File

@ -137,42 +137,40 @@ public class TransactionBroker extends BrokerFilter {
}
private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
if (destination instanceof Queue) {
Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
// ensure one per destination in the list
transaction.removeSynchronization(sync);
transaction.addSynchronization(sync);
}
Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
// ensure one per destination in the list
transaction.removeSynchronization(sync);
transaction.addSynchronization(sync);
}
static class PreparedDestinationCompletion extends Synchronization {
final Queue queue;
final Destination destination;
final boolean messageSend;
public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
this.queue = queue;
public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
this.destination = destination;
// rollback relevant to acks, commit to sends
this.messageSend = messageSend;
}
@Override
public int hashCode() {
return System.identityHashCode(queue) +
return System.identityHashCode(destination) +
System.identityHashCode(Boolean.valueOf(messageSend));
}
@Override
public boolean equals(Object other) {
return other instanceof PreparedDestinationCompletion &&
queue.equals(((PreparedDestinationCompletion) other).queue) &&
destination.equals(((PreparedDestinationCompletion) other).destination) &&
messageSend == ((PreparedDestinationCompletion) other).messageSend;
}
@Override
public void afterRollback() throws Exception {
if (!messageSend) {
queue.clearPendingMessages();
destination.clearPendingMessages();
if (LOG.isDebugEnabled()) {
LOG.debug("cleared pending from afterRollback : " + queue);
LOG.debug("cleared pending from afterRollback : " + destination);
}
}
}
@ -180,9 +178,9 @@ public class TransactionBroker extends BrokerFilter {
@Override
public void afterCommit() throws Exception {
if (messageSend) {
queue.clearPendingMessages();
destination.clearPendingMessages();
if (LOG.isDebugEnabled()) {
LOG.debug("cleared pending from afterCommit : " + queue);
LOG.debug("cleared pending from afterCommit : " + destination);
}
}
}

View File

@ -233,4 +233,6 @@ public interface Destination extends Service, Task {
boolean isDoOptimzeMessageStorage();
void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
public void clearPendingMessages();
}

View File

@ -310,4 +310,9 @@ public class DestinationFilter implements Destination {
next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
}
@Override
public void clearPendingMessages() {
next.clearPendingMessages();
}
}

View File

@ -18,8 +18,10 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -745,4 +747,47 @@ public class Topic extends BaseDestination implements Task {
}
return result;
}
/**
* force a reread of the store - after transaction recovery completion
*/
public void clearPendingMessages() {
dispatchLock.readLock().lock();
try {
for (DurableTopicSubscription durableTopicSubscription : durableSubcribers.values()) {
clearPendingAndDispatch(durableTopicSubscription);
}
} finally {
dispatchLock.readLock().unlock();
}
}
public void clearPendingMessages(SubscriptionKey subscriptionKey) {
dispatchLock.readLock().lock();
try {
DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);
clearPendingAndDispatch(durableTopicSubscription);
} finally {
dispatchLock.readLock().unlock();
}
}
private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
synchronized (durableTopicSubscription.pendingLock) {
durableTopicSubscription.pending.clear();
try {
durableTopicSubscription.dispatchPending();
} catch (IOException exception) {
LOG.warn("After clear of pending, failed to dispatch to: " +
durableTopicSubscription + ", for :" + destination + ", pending: " +
durableTopicSubscription.pending, exception);
}
}
}
public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
return durableSubcribers;
}
}

View File

@ -152,7 +152,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
MessageId copy = new MessageId(producerId, producerSequenceId);
copy.key = key;
copy.brokerSequenceId = brokerSequenceId;
copy.dataLocator = dataLocator;
copy.dataLocator = new AtomicReference<Object>(dataLocator.get());
return copy;
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.activemq.command;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import javax.transaction.xa.Xid;
import org.apache.activemq.util.HexSupport;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* @openwire:marshaller code="112"
@ -32,6 +34,8 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
private int formatId;
private byte[] branchQualifier;
private byte[] globalTransactionId;
private transient DataByteArrayOutputStream outputStream;
private transient byte[] encodedXidBytes;
private transient int hash;
private transient String transactionKey;
@ -46,14 +50,58 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
this.branchQualifier = xid.getBranchQualifier();
}
public XATransactionId(byte[] encodedBytes) {
encodedXidBytes = encodedBytes;
initFromEncodedBytes();
}
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
final int XID_PREFIX_SIZE = 16;
//+|-,(long)lastAck,(byte)priority,(int)formatid,(short)globalLength....
private void initFromEncodedBytes() {
DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXidBytes);
inputStream.skipBytes(10);
formatId = inputStream.readInt();
int globalLength = inputStream.readShort();
globalTransactionId = new byte[globalLength];
try {
inputStream.read(globalTransactionId);
branchQualifier = new byte[inputStream.available()];
inputStream.read(branchQualifier);
} catch (IOException fatal) {
throw new RuntimeException(this + ", failed to decode:", fatal);
}
}
public synchronized byte[] getEncodedXidBytes() {
if (encodedXidBytes == null) {
outputStream = new DataByteArrayOutputStream(XID_PREFIX_SIZE + globalTransactionId.length + branchQualifier.length);
outputStream.position(10);
outputStream.writeInt(formatId);
// global length
outputStream.writeShort(globalTransactionId.length);
try {
outputStream.write(globalTransactionId);
outputStream.write(branchQualifier);
} catch (IOException fatal) {
throw new RuntimeException(this + ", failed to encode:", fatal);
}
encodedXidBytes = outputStream.getData();
}
return encodedXidBytes;
}
public DataByteArrayOutputStream getOutputStream() {
return outputStream;
}
public synchronized String getTransactionKey() {
if (transactionKey == null) {
StringBuffer s = new StringBuffer();
s.append("XID:[globalId=");
s.append("XID:[" + formatId + ",globalId=");
for (int i = 0; i < globalTransactionId.length; i++) {
s.append(Integer.toHexString(globalTransactionId[i]));
}

View File

@ -79,7 +79,7 @@ public interface TopicMessageStore extends MessageStore {
void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
* A hint to the Store to reset any batching state for a durable subscriber
*
* @param clientId
* @param subscriptionName

View File

@ -20,9 +20,11 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.XATransactionId;
/**
*
@ -35,7 +37,7 @@ public interface JDBCAdapter {
void doDropTables(TransactionContext c) throws SQLException, IOException;
void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority) throws SQLException, IOException;
void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority, XATransactionId xid) throws SQLException, IOException;
void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
@ -45,11 +47,11 @@ public interface JDBCAdapter {
String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException;
void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception;
void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException;
void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException;
void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener)
throws Exception;
@ -92,11 +94,17 @@ public interface JDBCAdapter {
long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
public int getMaxRows();
public void setMaxRows(int maxRows);
void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException;
void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException;
void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException;
}

View File

@ -120,15 +120,19 @@ public class JDBCMessageStore extends AbstractMessageStore {
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
this.isPrioritizedMessages() ? message.getPriority() : 0);
adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} finally {
c.close();
}
onAdd(messageId, sequenceId, message.getPriority());
if (context != null && context.getXid() != null) {
message.getMessageId().setDataLocator(sequenceId);
} else {
onAdd(messageId, sequenceId, message.getPriority());
}
}
protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
@ -186,19 +190,22 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
// Get a connection and remove the message from the DB
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doRemoveMessage(c, seq);
adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
} finally {
c.close();
}
if (context != null && context.getXid() != null) {
ack.getLastMessageId().setDataLocator(seq);
}
}
public void recover(final MessageRecoveryListener listener) throws Exception {
@ -315,7 +322,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
@Override
public void setBatch(MessageId messageId) {
try {
long[] storedValues = getStoreSequenceIdForMessageId(messageId);
long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(messageId, destination);
lastRecoveredSequenceId.set(storedValues[0]);
lastRecoveredPriority.set(storedValues[1]);
} catch (IOException ignoredAsAlreadyLogged) {
@ -328,20 +335,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
long[] result = new long[]{-1, Byte.MAX_VALUE -1};
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, destination, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormat;
@ -237,7 +238,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore(this);
transactionStore = new JdbcMemoryTransactionStore(this);
}
return this.transactionStore;
}
@ -768,4 +769,95 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void setMaxRows(int maxRows) {
this.maxRows = maxRows;
}
public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
TransactionContext c = getTransactionContext();
try {
getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
} finally {
c.close();
}
}
public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
long sequence = (Long)messageId.getDataLocator();
getAdapter().doCommitAddOp(c, sequence);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
} finally {
c.close();
}
}
public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getDataLocator(), null);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
} finally {
c.close();
}
}
public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
} finally {
c.close();
}
}
public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " + store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
} finally {
c.close();
}
}
// after recovery there is no record of the original messageId for the ack
public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
}
long[] getStoreSequenceIdForMessageId(MessageId messageId, ActiveMQDestination destination) throws IOException {
long[] result = new long[]{-1, Byte.MAX_VALUE -1};
TransactionContext c = getTransactionContext();
try {
result = adapter.getStoreSequenceId(c, destination, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
} finally {
c.close();
}
return result;
}
}

View File

@ -19,9 +19,11 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -48,6 +50,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
private Set<String> pendingCompletion = new HashSet<String>();
public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
@ -75,9 +78,9 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
try {
long[] res = getCachedStoreSequenceId(c, destination, messageId);
if (this.isPrioritizedMessages()) {
adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
} else {
adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
}
if (LOG.isTraceEnabled()) {
LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
@ -90,7 +93,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
long[] val = null;
sequenceIdCacheSizeLock.readLock().lock();
try {
@ -254,7 +257,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
try {
if (LOG.isTraceEnabled()) {
LOG.trace(key + " existing last recovered: " + lastRecovered);
LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
}
if (isPrioritizedMessages()) {
Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
@ -291,7 +294,26 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
public void resetBatching(String clientId, String subscriptionName) {
subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
String key = getSubscriptionKey(clientId, subscriptionName);
if (!pendingCompletion.contains(key)) {
subscriberLastRecoveredMap.remove(key);
} else {
LOG.trace(this + ", skip resetBatch during pending completion for: " + key);
}
}
public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
final String key = getSubscriptionKey(clientId, subscriptionName);
LastRecovered recovered = new LastRecovered();
recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
subscriberLastRecoveredMap.put(key, recovered);
pendingCompletion.add(key);
LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
}
public void complete(String clientId, String subscriptionName) {
pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
}
protected void onAdd(MessageId messageId, long sequenceId, byte priority) {

View File

@ -0,0 +1,379 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.SubscriptionKey;
/**
* respect 2pc prepare
* uses local transactions to maintain prepared state
* xid column provides transaction flag for additions and removals
* a commit clears that context and completes the work
* a rollback clears the flag and removes the additions
* Essentially a prepare is an insert &| update transaction
* commit|rollback is an update &| remove
*/
public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
super(jdbcPersistenceAdapter);
}
@Override
public void prepare(TransactionId txid) throws IOException {
Tx tx = inflightTransactions.remove(txid);
if (tx == null) {
return;
}
ConnectionContext ctx = new ConnectionContext();
// setting the xid modifies the add/remove to be pending transaction outcome
ctx.setXid((XATransactionId) txid);
persistenceAdapter.beginTransaction(ctx);
try {
// Do all the message adds.
for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
cmd.run(ctx);
}
// And removes..
for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
cmd.run(ctx);
}
} catch ( IOException e ) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
persistenceAdapter.commitTransaction(ctx);
ctx.setXid(null);
// setup for commit outcome
ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
final AddMessageCommand addMessageCommand = iter.next();
updateFromPreparedStateCommands.add(new AddMessageCommand() {
@Override
public Message getMessage() {
return addMessageCommand.getMessage();
}
@Override
public MessageStore getMessageStore() {
return addMessageCommand.getMessageStore();
}
@Override
public void run(ConnectionContext context) throws IOException {
JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
Message message = addMessageCommand.getMessage();
jdbcPersistenceAdapter.commitAdd(context, message.getMessageId());
((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd(
message.getMessageId(),
(Long)message.getMessageId().getDataLocator(),
message.getPriority());
}
});
}
tx.messages = updateFromPreparedStateCommands;
preparedTransactions.put(txid, tx);
}
@Override
public void rollback(TransactionId txid) throws IOException {
Tx tx = inflightTransactions.remove(txid);
if (tx == null) {
tx = preparedTransactions.remove(txid);
if (tx != null) {
// undo prepare work
ConnectionContext ctx = new ConnectionContext();
persistenceAdapter.beginTransaction(ctx);
try {
for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) {
final Message message = iter.next().getMessage();
// need to delete the row
((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1));
}
for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) {
RemoveMessageCommand removeMessageCommand = iter.next();
if (removeMessageCommand instanceof LastAckCommand ) {
((LastAckCommand)removeMessageCommand).rollback(ctx);
} else {
// need to unset the txid flag on the existing row
((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx,
removeMessageCommand.getMessageAck().getLastMessageId());
}
}
} catch (IOException e) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
persistenceAdapter.commitTransaction(ctx);
}
}
}
@Override
public void recover(TransactionRecoveryListener listener) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).recover(this);
super.recover(listener);
}
public void recoverAdd(long id, byte[] messageBytes) throws IOException {
final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
message.getMessageId().setDataLocator(id);
Tx tx = getPreparedTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
@Override
public Message getMessage() {
return message;
}
@Override
public MessageStore getMessageStore() {
return null;
}
@Override
public void run(ConnectionContext context) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
}
});
}
public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
msg.getMessageId().setDataLocator(id);
Tx tx = getPreparedTx(new XATransactionId(xid));
final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
tx.add(new RemoveMessageCommand() {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public void run(ConnectionContext context) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
}
@Override
public MessageStore getMessageStore() {
return null;
}
});
}
interface LastAckCommand extends RemoveMessageCommand {
void rollback(ConnectionContext context) throws IOException;
String getClientId();
String getSubName();
long getSequence();
byte getPriority();
void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore);
}
public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
Tx tx = getPreparedTx(new XATransactionId(encodedXid));
DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
inputStream.skipBytes(1); // +|-
final long lastAck = inputStream.readLong();
final byte priority = inputStream.readByte();
final MessageAck ack = new MessageAck();
ack.setDestination(destination);
tx.add(new LastAckCommand() {
JDBCTopicMessageStore jdbcTopicMessageStore;
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public MessageStore getMessageStore() {
return jdbcTopicMessageStore;
}
@Override
public void run(ConnectionContext context) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
jdbcTopicMessageStore.complete(clientId, subName);
}
@Override
public void rollback(ConnectionContext context) throws IOException {
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId);
jdbcTopicMessageStore.complete(clientId, subName);
}
@Override
public String getClientId() {
return clientId;
}
@Override
public String getSubName() {
return subName;
}
@Override
public long getSequence() {
return lastAck;
}
@Override
public byte getPriority() {
return priority;
}
@Override
public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
this.jdbcTopicMessageStore = jdbcTopicMessageStore;
}
});
}
@Override
protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
}
@Override
protected void onRecovered(Tx tx) {
for (RemoveMessageCommand removeMessageCommand: tx.acks) {
if (removeMessageCommand instanceof LastAckCommand) {
LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
lastAckCommand.setMessageStore(jdbcTopicMessageStore);
}
}
}
@Override
public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
if (ack.isInTransaction()) {
Tx tx = getTx(ack.getTransactionId());
tx.add(new LastAckCommand() {
public MessageAck getMessageAck() {
return ack;
}
public void run(ConnectionContext ctx) throws IOException {
topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
}
@Override
public MessageStore getMessageStore() {
return topicMessageStore;
}
@Override
public void rollback(ConnectionContext context) throws IOException {
JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
jdbcTopicMessageStore,
ack,
subscriptionName, clientId);
jdbcTopicMessageStore.complete(clientId, subscriptionName);
Map<ActiveMQDestination, Destination> destinations = ((JDBCPersistenceAdapter) persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap();
Topic topic = (Topic) destinations.get(topicMessageStore.getDestination());
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
topic.getDurableTopicSubs().get(key).getPending().rollback(ack.getLastMessageId());
topic.clearPendingMessages(key);
}
@Override
public String getClientId() {
return clientId;
}
@Override
public String getSubName() {
return subscriptionName;
}
@Override
public long getSequence() {
throw new IllegalStateException("Sequence id must be inferred from ack");
}
@Override
public byte getPriority() {
throw new IllegalStateException("Priority must be inferred from ack or row");
}
@Override
public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
throw new IllegalStateException("message store already known!");
}
});
} else {
topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
}
}
}

View File

@ -76,6 +76,15 @@ public class Statements {
private String deleteOldMessagesStatementWithPriority;
private String durableSubscriberMessageCountStatementWithPriority;
private String dropAckPKAlterStatementEnd;
private String updateXidFlagStatement;
private String findOpsPendingOutcomeStatement;
private String clearXidFlagStatement;
private String updateDurableLastAckInTxStatement;
private String findAcksPendingOutcomeStatement;
private String clearDurableLastAckInTxStatement;
private String updateDurableLastAckWithPriorityStatement;
private String updateDurableLastAckWithPriorityInTxStatement;
private String findXidByIdStatement;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
@ -99,7 +108,9 @@ public class Statements {
"INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
"ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType,
"ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " DEFAULT 5 NOT NULL",
"ALTER TABLE " + getFullAckTableName() + " ADD XID " + binaryDataType,
"ALTER TABLE " + getFullAckTableName() + " " + getDropAckPKAlterStatementEnd(),
"ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
};
@ -131,7 +142,7 @@ public class Statements {
if (addMessageStatement == null) {
addMessageStatement = "INSERT INTO "
+ getFullMessageTableName()
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)";
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
}
return addMessageStatement;
}
@ -171,7 +182,14 @@ public class Statements {
}
return findMessageByIdStatement;
}
public String getFindXidByIdStatement() {
if (findXidByIdStatement == null) {
findXidByIdStatement = "SELECT XID FROM " + getFullMessageTableName() + " WHERE ID=?";
}
return findXidByIdStatement;
}
public String getFindAllMessagesStatement() {
if (findAllMessagesStatement == null) {
findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
@ -271,6 +289,7 @@ public class Statements {
findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ getFullAckTableName() + " D "
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.XID IS NULL"
+ " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ " AND M.ID > ?"
+ " ORDER BY M.ID";
@ -283,6 +302,7 @@ public class Statements {
findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M,"
+ " " + getFullAckTableName() + " D"
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.XID IS NULL"
+ " AND M.CONTAINER=D.CONTAINER"
+ " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
+ " AND M.ID > ? AND M.PRIORITY = ?"
@ -414,7 +434,7 @@ public class Statements {
public String getDestinationMessageCountStatement() {
if (destinationMessageCountStatement == null) {
destinationMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?";
+ " WHERE CONTAINER=? AND XID IS NULL";
}
return destinationMessageCountStatement;
}
@ -425,7 +445,7 @@ public class Statements {
public String getFindNextMessagesStatement() {
if (findNextMessagesStatement == null) {
findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+ " WHERE CONTAINER=? AND ID > ? AND XID IS NULL ORDER BY ID";
}
return findNextMessagesStatement;
}
@ -437,6 +457,7 @@ public class Statements {
if (findNextMessagesByPriorityStatement == null) {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?"
+ " AND XID IS NULL"
+ " AND ((ID > ? AND PRIORITY = ?) OR PRIORITY < ?)"
+ " ORDER BY PRIORITY DESC, ID";
}
@ -478,11 +499,76 @@ public class Statements {
public String getUpdateDurableLastAckStatement() {
if (updateDurableLastAckStatement == null) {
updateDurableLastAckStatement = "UPDATE " + getFullAckTableName()
+ " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ " SET LAST_ACKED_ID=?, XID = NULL WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return updateDurableLastAckStatement;
}
public String getUpdateDurableLastAckInTxStatement() {
if (updateDurableLastAckInTxStatement == null) {
updateDurableLastAckInTxStatement = "UPDATE " + getFullAckTableName()
+ " SET XID=? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return updateDurableLastAckInTxStatement;
}
public String getUpdateDurableLastAckWithPriorityStatement() {
if (updateDurableLastAckWithPriorityStatement == null) {
updateDurableLastAckWithPriorityStatement = "UPDATE " + getFullAckTableName()
+ " SET LAST_ACKED_ID=?, XID = NULL WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
}
return updateDurableLastAckWithPriorityStatement;
}
public String getUpdateDurableLastAckWithPriorityInTxStatement() {
if (updateDurableLastAckWithPriorityInTxStatement == null) {
updateDurableLastAckWithPriorityInTxStatement = "UPDATE " + getFullAckTableName()
+ " SET XID=? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
}
return updateDurableLastAckWithPriorityInTxStatement;
}
public String getClearDurableLastAckInTxStatement() {
if (clearDurableLastAckInTxStatement == null) {
clearDurableLastAckInTxStatement = "UPDATE " + getFullAckTableName()
+ " SET XID = NULL WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
}
return clearDurableLastAckInTxStatement;
}
public String getFindOpsPendingOutcomeStatement() {
if (findOpsPendingOutcomeStatement == null) {
findOpsPendingOutcomeStatement = "SELECT ID, XID, MSG FROM " + getFullMessageTableName()
+ " WHERE XID IS NOT NULL ORDER BY ID";
}
return findOpsPendingOutcomeStatement;
}
public String getFindAcksPendingOutcomeStatement() {
if (findAcksPendingOutcomeStatement == null) {
findAcksPendingOutcomeStatement = "SELECT XID," +
" CONTAINER, CLIENT_ID, SUB_NAME FROM " + getFullAckTableName()
+ " WHERE XID IS NOT NULL";
}
return findAcksPendingOutcomeStatement;
}
public String getUpdateXidFlagStatement() {
if (updateXidFlagStatement == null) {
updateXidFlagStatement = "UPDATE " + getFullMessageTableName()
+ " SET XID = ? WHERE ID = ?";
}
return updateXidFlagStatement;
}
public String getClearXidFlagStatement() {
if (clearXidFlagStatement == null) {
clearXidFlagStatement = "UPDATE " + getFullMessageTableName()
+ " SET XID = NULL WHERE ID = ?";
}
return clearXidFlagStatement;
}
public String getFullMessageTableName() {
return getTablePrefix() + getMessageTableName();
}
@ -788,5 +874,41 @@ public class Statements {
public void setUpdateDurableLastAckStatement(String updateDurableLastAckStatement) {
this.updateDurableLastAckStatement = updateDurableLastAckStatement;
}
}
public void setUpdateXidFlagStatement(String updateXidFlagStatement) {
this.updateXidFlagStatement = updateXidFlagStatement;
}
public void setFindOpsPendingOutcomeStatement(String findOpsPendingOutcomeStatement) {
this.findOpsPendingOutcomeStatement = findOpsPendingOutcomeStatement;
}
public void setClearXidFlagStatement(String clearXidFlagStatement) {
this.clearXidFlagStatement = clearXidFlagStatement;
}
public void setUpdateDurableLastAckInTxStatement(String updateDurableLastAckInTxStatement) {
this.updateDurableLastAckInTxStatement = updateDurableLastAckInTxStatement;
}
public void setFindAcksPendingOutcomeStatement(String findAcksPendingOutcomeStatement) {
this.findAcksPendingOutcomeStatement = findAcksPendingOutcomeStatement;
}
public void setClearDurableLastAckInTxStatement(String clearDurableLastAckInTxStatement) {
this.clearDurableLastAckInTxStatement = clearDurableLastAckInTxStatement;
}
public void setUpdateDurableLastAckWithPriorityStatement(String updateDurableLastAckWithPriorityStatement) {
this.updateDurableLastAckWithPriorityStatement = updateDurableLastAckWithPriorityStatement;
}
public void setUpdateDurableLastAckWithPriorityInTxStatement(String updateDurableLastAckWithPriorityInTxStatement) {
this.updateDurableLastAckWithPriorityInTxStatement = updateDurableLastAckWithPriorityInTxStatement;
}
public void setFindXidByIdStatement(String findXidByIdStatement) {
this.findXidByIdStatement = findXidByIdStatement;
}
}

View File

@ -18,18 +18,16 @@ package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.sql.rowset.serial.SerialBlob;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.ByteArrayOutputStream;
@ -52,11 +50,25 @@ import org.apache.activemq.util.ByteArrayOutputStream;
*/
public class BlobJDBCAdapter extends DefaultJDBCAdapter {
@Override
public void setStatements(Statements statements) {
String addMessageStatement = "INSERT INTO "
+ statements.getFullMessageTableName()
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())";
statements.setAddMessageStatement(addMessageStatement);
String findMessageByIdStatement = "SELECT MSG FROM " +
statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
statements.setFindMessageByIdStatement(findMessageByIdStatement);
super.setStatements(statements);
}
@Override
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration, byte priority) throws SQLException, IOException {
long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
// Add the Blob record.
@ -74,12 +86,29 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
s.close();
// Select the blob record so that we can update it.
s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
updateBlob(c.getConnection(), statements.getFindMessageByIdStatement(), sequence, data);
if (xid != null) {
byte[] xidVal = xid.getEncodedXidBytes();
xidVal[0] = '+';
updateBlob(c.getConnection(), statements.getFindXidByIdStatement(), sequence, xidVal);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
private void updateBlob(Connection connection, String findMessageByIdStatement, long sequence, byte[] data) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = connection.prepareStatement(statements.getFindMessageByIdStatement(),
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
s.setLong(1, sequence);
rs = s.executeQuery();
if (!rs.next()) {
throw new IOException("Failed select blob for message: " + messageID + " in container.");
throw new IOException("Failed select blob for message: " + sequence + " in container.");
}
// Update the blob
@ -88,9 +117,7 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
blob.setBytes(1, data);
rs.updateBlob(1, blob);
rs.updateRow(); // Update the row with the updated blob
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}

View File

@ -17,11 +17,8 @@
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@ -35,12 +32,15 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -201,10 +201,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
close(s);
}
}
/**
* A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
*/
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration, byte priority) throws SQLException, IOException {
long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
cleanupExclusiveLock.readLock().lock();
try {
@ -221,6 +224,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setLong(5, expiration);
s.setLong(6, priority);
setBinaryData(s, 7, data);
if (xid != null) {
byte[] xidVal = xid.getEncodedXidBytes();
xidVal[0] = '+';
setBinaryData(s, 8, xidVal);
} else {
setBinaryData(s, 8, null);
}
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
@ -326,17 +336,27 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
/**
* A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
*/
public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
PreparedStatement s = c.getRemovedMessageStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
s = c.getConnection().prepareStatement(xid == null ?
this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
if (this.batchStatments) {
c.setRemovedMessageStatement(s);
}
}
s.setLong(1, seq);
if (xid == null) {
s.setLong(1, seq);
} else {
byte[] xidVal = xid.getEncodedXidBytes();
setBinaryData(s, 1, xidVal);
s.setLong(2, seq);
}
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
@ -406,26 +426,33 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long prio) throws SQLException, IOException {
public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
String subscriptionName, long seq, long priority) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
s = c.getConnection().prepareStatement(xid == null ?
this.statements.getUpdateDurableLastAckWithPriorityStatement() :
this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
if (this.batchStatments) {
c.setUpdateLastAckStatement(s);
}
}
s.setLong(1, seq);
if (xid != null) {
byte[] xidVal = encodeXid(xid, seq, priority);
setBinaryData(s, 1, xidVal);
} else {
s.setLong(1, seq);
}
s.setString(2, destination.getQualifiedName());
s.setString(3, clientId);
s.setString(4, subscriptionName);
s.setLong(5, prio);
s.setLong(5, priority);
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
@ -436,18 +463,25 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq, long priority) throws SQLException, IOException {
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
String subscriptionName, long seq, long priority) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
s = c.getConnection().prepareStatement(xid == null ?
this.statements.getUpdateDurableLastAckStatement() :
this.statements.getUpdateDurableLastAckInTxStatement());
if (this.batchStatments) {
c.setUpdateLastAckStatement(s);
}
}
s.setLong(1, seq);
if (xid != null) {
byte[] xidVal = encodeXid(xid, seq, priority);
setBinaryData(s, 1, xidVal);
} else {
s.setLong(1, seq);
}
s.setString(2, destination.getQualifiedName());
s.setString(3, clientId);
s.setString(4, subscriptionName);
@ -466,6 +500,35 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
byte[] xidVal = xid.getEncodedXidBytes();
// encode the update
DataByteArrayOutputStream outputStream = xid.getOutputStream();
outputStream.position(1);
outputStream.writeLong(seq);
outputStream.writeByte(Long.valueOf(priority).byteValue());
return xidVal;
}
@Override
public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subName);
s.setLong(4, priority);
if (s.executeUpdate() != 1) {
throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
// dumpTables(c,
@ -879,30 +942,38 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
/**
* @param c
* @param destination
* @param clientId
* @param subscriberName
* @return
* @throws SQLException
* @throws IOException
*/
public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriberName) throws SQLException, IOException {
@Override
public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriberName);
s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
rs = s.executeQuery();
if (!rs.next()) {
return null;
while (rs.next()) {
long id = rs.getLong(1);
byte[] encodedXid = getBinaryData(rs, 2);
if (encodedXid[0] == '+') {
jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
} else {
jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
}
}
close(rs);
close(s);
s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
rs = s.executeQuery();
while (rs.next()) {
byte[] encodedXid = getBinaryData(rs, 1);
String destination = rs.getString(2);
String subName = rs.getString(3);
String subId = rs.getString(4);
jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
subName, subId);
}
return getBinaryData(rs, 1);
} finally {
close(rs);
cleanupExclusiveLock.readLock().unlock();
@ -910,6 +981,23 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
@Override
public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
s.setLong(1, sequence);
if (s.executeUpdate() != 1) {
throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
IOException {
PreparedStatement s = null;
@ -978,7 +1066,28 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
close(s);
}
}
public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
s.setString(1, id.toString());
rs = s.executeQuery();
long seq = -1;
if (rs.next()) {
seq = rs.getLong(1);
}
return seq;
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
}
/* public void dumpTables(Connection c, String destinationName, String clientId, String
subscriptionName) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
@ -1034,25 +1143,4 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
} */
public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
s.setString(1, id.toString());
rs = s.executeQuery();
long seq = -1;
if (rs.next()) {
seq = rs.getLong(1);
}
return seq;
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
}
}

View File

@ -48,15 +48,6 @@ public class OracleBlobJDBCAdapter extends BlobJDBCAdapter {
statements.setLongDataType("NUMBER");
statements.setSequenceDataType("NUMBER");
String addMessageStatement = "INSERT INTO "
+ statements.getFullMessageTableName()
+ "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob())";
statements.setAddMessageStatement(addMessageStatement);
String findMessageByIdStatement = "SELECT MSG FROM " +
statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
statements.setFindMessageByIdStatement(findMessageByIdStatement);
super.setStatements(statements);
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.JDBCMessageStore;
import java.io.IOException;
import java.util.ArrayList;
@ -45,16 +46,16 @@ import java.util.concurrent.Future;
*/
public class MemoryTransactionStore implements TransactionStore {
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
final PersistenceAdapter persistenceAdapter;
protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
protected final PersistenceAdapter persistenceAdapter;
private boolean doingRecover;
public class Tx {
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
public void add(AddMessageCommand msg) {
messages.add(msg);
@ -114,6 +115,8 @@ public class MemoryTransactionStore implements TransactionStore {
public interface AddMessageCommand {
Message getMessage();
MessageStore getMessageStore();
void run(ConnectionContext context) throws IOException;
}
@ -121,6 +124,8 @@ public class MemoryTransactionStore implements TransactionStore {
MessageAck getMessageAck();
void run(ConnectionContext context) throws IOException;
MessageStore getMessageStore();
}
public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
@ -164,7 +169,7 @@ public class MemoryTransactionStore implements TransactionStore {
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
@ -204,12 +209,17 @@ public class MemoryTransactionStore implements TransactionStore {
subscriptionName, messageId, ack);
}
};
onProxyTopicStore(proxyTopicMessageStore);
return proxyTopicMessageStore;
}
protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
}
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) {
public void prepare(TransactionId txid) throws IOException {
Tx tx = inflightTransactions.remove(txid);
if (tx == null) {
return;
@ -226,6 +236,15 @@ public class MemoryTransactionStore implements TransactionStore {
return tx;
}
public Tx getPreparedTx(TransactionId txid) {
Tx tx = preparedTransactions.get(txid);
if (tx == null) {
tx = new Tx();
preparedTransactions.put(txid, tx);
}
return tx;
}
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
@ -248,7 +267,7 @@ public class MemoryTransactionStore implements TransactionStore {
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) {
public void rollback(TransactionId txid) throws IOException {
preparedTransactions.remove(txid);
inflightTransactions.remove(txid);
}
@ -268,12 +287,16 @@ public class MemoryTransactionStore implements TransactionStore {
Object txid = iter.next();
Tx tx = preparedTransactions.get(txid);
listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
onRecovered(tx);
}
} finally {
this.doingRecover = false;
}
}
protected void onRecovered(Tx tx) {
}
/**
* @param message
* @throws IOException
@ -291,6 +314,11 @@ public class MemoryTransactionStore implements TransactionStore {
return message;
}
@Override
public MessageStore getMessageStore() {
return destination;
}
public void run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
}
@ -320,13 +348,18 @@ public class MemoryTransactionStore implements TransactionStore {
public void run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
}
@Override
public MessageStore getMessageStore() {
return destination;
}
});
} else {
destination.removeMessage(null, ack);
}
}
final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
if (doingRecover) {
return;
@ -342,6 +375,11 @@ public class MemoryTransactionStore implements TransactionStore {
public void run(ConnectionContext ctx) throws IOException {
destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
}
@Override
public MessageStore getMessageStore() {
return destination;
}
});
} else {
destination.acknowledge(null, clientId, subscriptionName, messageId, ack);

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.derby.jdbc.EmbeddedXADataSource;
public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
EmbeddedXADataSource dataSource;
@Override
protected void setUp() throws Exception {
dataSource = new EmbeddedXADataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
stopDerby();
}
@Override
protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource);
broker.setPersistenceAdapter(jdbc);
}
@Override
protected void restartBroker() throws Exception {
broker.stop();
stopDerby();
dataSource = new EmbeddedXADataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
broker = createRestartedBroker();
broker.start();
}
private void stopDerby() {
LOG.info("STOPPING DB!@!!!!");
final EmbeddedDataSource ds = dataSource;
try {
ds.setShutdownDatabase("shutdown");
ds.getConnection();
} catch (Exception ignored) {
}
}
public static Test suite() {
return suite(JdbcXARecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
@Override
protected ActiveMQDestination createDestination() {
return new ActiveMQQueue("test,special");
}
}

View File

@ -23,9 +23,13 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataArrayResponse;
@ -37,6 +41,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,6 +53,8 @@ import org.slf4j.LoggerFactory;
*/
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
public boolean prioritySupport = false;
public void testPreparedJmxView() throws Exception {
ActiveMQDestination destination = createDestination();
@ -96,6 +103,10 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
// validate destination depth via jmx
DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());
TransactionId first = (TransactionId)dar.getData()[0];
// via jmx, force outcome
for (int i = 0; i < 4; i++) {
@ -131,6 +142,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
return proxy;
}
private DestinationViewMBean getProxyToDestination(ActiveMQDestination destination) throws MalformedObjectNameException, JMSException {
ObjectName objectName = new ObjectName("org.apache.activemq:Type=" + (destination.isQueue() ? "Queue" : "Topic") + ",Destination=" +
JMXSupport.encodeObjectNamePart(destination.getPhysicalName()) + ",BrokerName=localhost");
DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(objectName,
DestinationViewMBean.class, true);
return proxy;
}
public void testPreparedTransactionRecoveredOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@ -213,6 +234,94 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertNoMessagesLeft(connection);
}
public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
restartBroker();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse) response;
assertEquals(4, dar.getData().length);
// ensure we can close a connection with prepared transactions
connection.request(closeConnectionInfo(connectionInfo));
// open again to deliver outcome
connection = createConnection();
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
// Commit the prepared transactions.
for (int i = 0; i < dar.getData().length; i++) {
connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i]));
}
// We should get the committed transactions.
for (int i = 0; i < expectedMessageCount(4, destination); i++) {
Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10));
assertNotNull(m);
}
assertNoMessagesLeft(connection);
}
public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@ -260,6 +369,55 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertNoMessagesLeft(connection);
}
public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < 4; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
}
// Commit 2 phase
connection.request(createPrepareTransaction(connectionInfo, txid));
connection.send(createCommitTransaction2Phase(connectionInfo, txid));
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
restartBroker();
// Setup the consumer and receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
for (int i = 0; i < expectedMessageCount(4, destination); i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
}
assertNoMessagesLeft(connection);
}
public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@ -396,6 +554,90 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
// REVISIT for kahadb
if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
LOG.warn("only works on jdbc");
return;
}
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// setup durable subs
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
final int messageCount = expectedMessageCount(4, destination);
Message m = null;
for (int i = 0; i < messageCount; i++) {
m = receiveMessage(connection);
assertNotNull("unexpected null on: " + i, m);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createPrepareTransaction(connectionInfo, txid));
// restart the broker.
restartBroker();
connection = createConnection();
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
connection.send(connectionInfo);
// validate recovery
TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
sessionInfo = createSessionInfo(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
// no redelivery, exactly once semantics unless there is rollback
m = receiveMessage(connection);
assertNull(m);
assertNoMessagesLeft(connection);
connection.request(createCommitTransaction2Phase(connectionInfo, txid));
// validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
ActiveMQDestination destination = createDestination();
@ -409,7 +651,8 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
connection.send(sessionInfo);
connection.send(producerInfo);
for (int i = 0; i < 4; i++) {
int numMessages = 4;
for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
@ -426,13 +669,13 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
@ -466,7 +709,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
// Begin new transaction for redelivery
LOG.info("new tx for redelivery");
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
@ -475,11 +718,11 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
consumerInfo = createConsumerInfo(sessionInfo, dest);
connection.send(consumerInfo);
for (int i = 0; i < 4; i++) {
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
assertNotNull("unexpected null on:" + i, message);
}
MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
@ -492,6 +735,180 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() {
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
// REVISIT for kahadb
if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
LOG.warn("only works on jdbc");
return;
}
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// setup durable subs
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
int numMessages = 4;
for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = null;
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createPrepareTransaction(connectionInfo, txid));
// restart the broker.
restartBroker();
connection = createConnection();
connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
connection.send(connectionInfo);
// validate recovery
TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER);
DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
sessionInfo = createSessionInfo(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
// no redelivery, exactly once semantics while prepared
message = receiveMessage(connection);
assertNull(message);
assertNoMessagesLeft(connection);
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
LOG.info("new tx for redelivery");
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull("unexpected null on:" + i, message);
}
ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Commit
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
// validate recovery complete
dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
}
public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() {
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
// REVISIT for kahadb
if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) {
LOG.warn("only works on jdbc");
return;
}
ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
connectionInfo.setClientId("durable");
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
// setup durable subs
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
consumerInfo.setSubscriptionName("durable");
connection.send(consumerInfo);
int numMessages = 4;
for (int i = 0; i < numMessages; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = null;
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
}
// one ack with last received, mimic a beforeEnd synchronization
MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
connection.request(createPrepareTransaction(connectionInfo, txid));
// rollback so we get redelivery
connection.request(createRollbackTransaction(connectionInfo, txid));
LOG.info("new tx for redelivery");
txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull("unexpected null on:" + i, message);
}
ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
// Commit
connection.request(createCommitTransaction1Phase(connectionInfo, txid));
}
private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
}
@ -564,6 +981,13 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertNoMessagesLeft(connection);
}
@Override
protected PolicyEntry getDefaultPolicy() {
PolicyEntry policyEntry = super.getDefaultPolicy();
policyEntry.setPrioritizedMessages(prioritySupport);
return policyEntry;
}
public static Test suite() {
return suite(XARecoveryBrokerTest.class);
}