https://issues.apache.org/jira/browse/AMQ-5080 - fix up rar transacted delivery and redelivery processing, xarecovery and transaction completion afer failover - using failover maxReconnectAttempts=0 to avoid blocking periodic recovery and negate replay of aborted transaction state

This commit is contained in:
gtully 2014-03-03 13:22:30 +00:00
parent 2360fb8596
commit e8818fafea
14 changed files with 306 additions and 63 deletions

View File

@ -143,15 +143,22 @@ public class ProducerBrokerExchange {
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
if (producerSequenceId <= lastStoredForMessageProducer) {
canDispatch = false;
LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer
});
}
} else if (producerSequenceId <= lastSendSequenceNumber.get()) {
canDispatch = false;
LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] less than last stored: {}", new Object[]{
if (messageSend.isInTransaction()) {
LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
});
} else {
LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
(LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
});
}
} else {
// track current so we can suppress duplicates later in the stream
lastSendSequenceNumber.set(producerSequenceId);

View File

@ -503,6 +503,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
if (consumerExchange != null) {
broker.acknowledge(consumerExchange, ack);
} else if (ack.isInTransaction()) {
LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
}
return null;
}

View File

@ -1849,6 +1849,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
dispatchers.remove(consumerId);
}
public boolean hasDispatcher(ConsumerId consumerId) {
return dispatchers.containsKey(consumerId);
}
/**
* @param o - the command to consume
*/
@ -1878,6 +1882,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
md.setMessage(msg);
}
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
}
return null;
}

View File

@ -649,7 +649,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}
final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
clearRequestsCounter.incrementAndGet();
executor.clearMessagesInProgress();
// we are called from inside the transport reconnection logic which involves us
// clearing all the connections' consumers dispatch and delivered lists. So rather
@ -860,10 +862,26 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
// TODO: Ack it without delivery to client
MessageAck earlyAck = null;
if (message.isExpired()) {
earlyAck = new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1);
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
}
if (earlyAck != null) {
try {
asyncSendPacket(earlyAck);
} catch (Throwable t) {
LOG.error("error dispatching ack: {} ", earlyAck, t);
connection.onClientInternalException(t);
} finally {
continue;
}
}
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@ -886,16 +904,36 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (ack.getTransactionId() != null) {
getTransactionContext().addSynchronization(new Synchronization() {
@Override
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
public void beforeEnd() throws Exception {
asyncSendPacket(ack);
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
@ -932,6 +970,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
});
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
messageListener.onMessage(message);
} catch (Throwable e) {

View File

@ -40,6 +40,7 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
@ -71,9 +72,8 @@ public class TransactionContext implements XAResource {
private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
new HashMap<TransactionId, List<TransactionContext>>();
private final ActiveMQConnection connection;
private ActiveMQConnection connection;
private final LongSequenceGenerator localTransactionIdGenerator;
private final ConnectionId connectionId;
private List<Synchronization> synchronizations;
// To track XA transactions.
@ -82,10 +82,14 @@ public class TransactionContext implements XAResource {
private LocalTransactionEventListener localTransactionEventListener;
private int beforeEndIndex;
// for RAR recovery
public TransactionContext() {
localTransactionIdGenerator = null;
}
public TransactionContext(ActiveMQConnection connection) {
this.connection = connection;
this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
this.connectionId = connection.getConnectionInfo().getConnectionId();
}
public boolean isInXATransaction() {
@ -231,7 +235,7 @@ public class TransactionContext implements XAResource {
if (transactionId == null) {
synchronizations = null;
beforeEndIndex = 0;
this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
this.connection.ensureConnectionInfoSent();
this.connection.asyncSendPacket(info);
@ -646,6 +650,13 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
this.connection.checkClosedOrFailed();
final FailoverTransport failoverTransport = this.connection.getTransport().narrow(FailoverTransport.class);
if (failoverTransport != null && !failoverTransport.isConnected()) {
// otherwise call will block on reconnect forfeting any app level periodic check
XAException xaException = new XAException("Failover transport not connected: " + this.getConnection().getTransport());
xaException.errorCode = XAException.XAER_RMERR;
throw xaException;
}
this.connection.ensureConnectionInfoSent();
DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
@ -657,6 +668,7 @@ public class TransactionContext implements XAResource {
answer = new XATransactionId[data.length];
System.arraycopy(data, 0, answer, 0, data.length);
}
LOG.trace("recover({})={}", flag, answer);
return answer;
} catch (JMSException e) {
throw toXAException(e);
@ -676,7 +688,7 @@ public class TransactionContext implements XAResource {
// Helper methods.
//
// ///////////////////////////////////////////////////////////
private String getResourceManagerId() throws JMSException {
protected String getResourceManagerId() throws JMSException {
return this.connection.getResourceManagerId();
}
@ -695,11 +707,11 @@ public class TransactionContext implements XAResource {
associatedXid = xid;
transactionId = new XATransactionId(xid);
TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
try {
this.connection.asyncSendPacket(info);
if (LOG.isDebugEnabled()) {
LOG.debug("Started XA transaction: " + transactionId);
LOG.debug("{} started XA transaction {} ", this, transactionId);
}
} catch (JMSException e) {
disassociate();
@ -709,11 +721,11 @@ public class TransactionContext implements XAResource {
} else {
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
try {
syncSendPacketWithInterruptionHandling(info);
if (LOG.isDebugEnabled()) {
LOG.debug("Ended XA transaction: " + transactionId);
LOG.debug("{} ended XA transaction {}", this, transactionId);
}
} catch (JMSException e) {
disassociate();
@ -800,6 +812,14 @@ public class TransactionContext implements XAResource {
return connection;
}
// for RAR xa recovery where xaresource connection is per request
public ActiveMQConnection setConnection(ActiveMQConnection connection) {
ActiveMQConnection existing = this.connection;
this.connection = connection;
return existing;
}
public void cleanup() {
associatedXid = null;
transactionId = null;

View File

@ -102,19 +102,52 @@ public class XATransactionId extends TransactionId implements Xid, Comparable {
if (transactionKey == null) {
StringBuffer s = new StringBuffer();
s.append("XID:[" + formatId + ",globalId=");
for (int i = 0; i < globalTransactionId.length; i++) {
s.append(Integer.toHexString(globalTransactionId[i]));
}
s.append(stringForm(formatId, globalTransactionId));
s.append(",branchId=");
for (int i = 0; i < branchQualifier.length; i++) {
s.append(Integer.toHexString(branchQualifier[i]));
}
s.append(stringForm(formatId, branchQualifier));
s.append("]");
transactionKey = s.toString();
}
return transactionKey;
}
private String stringForm(int format, byte[] uid) {
StringBuffer s = new StringBuffer();
switch (format) {
case 131077: // arjuna
stringFormArj(s, uid);
break;
default: // aries
stringFormDefault(s, uid);
}
return s.toString();
}
private void stringFormDefault(StringBuffer s, byte[] uid) {
for (int i = 0; i < uid.length; i++) {
s.append(Integer.toHexString(uid[i]));
}
}
private void stringFormArj(StringBuffer s, byte[] uid) {
try {
DataByteArrayInputStream byteArrayInputStream = new DataByteArrayInputStream(uid);
s.append(Long.toString(byteArrayInputStream.readLong(), 16));
s.append(':');
s.append(Long.toString(byteArrayInputStream.readLong(), 16));
s.append(':');
s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
s.append(':');
s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
s.append(':');
s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
} catch (Exception ignored) {
stringFormDefault(s, uid);
}
}
public String toString() {
return getTransactionKey();
}

View File

@ -98,7 +98,9 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
throw (JMSException)e.getCause();
}
LOG.debug("Connection could not be created:", e);
throw new JMSException(e.getMessage());
JMSException jmsException = new JMSException(e.getMessage());
jmsException.setLinkedException(e);
throw jmsException;
}
}

View File

@ -235,7 +235,7 @@ public class ActiveMQEndpointWorker {
c.close();
}
} catch (JMSException e) {
//
LOG.trace("failed to close c {}", c, e);
}
}
@ -249,7 +249,7 @@ public class ActiveMQEndpointWorker {
cc.close();
}
} catch (JMSException e) {
//
LOG.trace("failed to close cc {}", cc, e);
}
}

View File

@ -201,13 +201,15 @@ public class ActiveMQManagedConnection implements ManagedConnection, ExceptionLi
return;
}
try {
cleanup();
} finally {
try {
physicalConnection.close();
destroyed = true;
} catch (JMSException e) {
LOG.info("Error occurred during close of a JMS connection.", e);
LOG.trace("Error occurred during close of a JMS connection.", e);
}
}
}
@ -233,10 +235,10 @@ public class ActiveMQManagedConnection implements ManagedConnection, ExceptionLi
physicalConnection.cleanup();
} catch (JMSException e) {
throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e);
}
} finally {
// defer transaction cleanup till after close so that close is aware of the current tx
localAndXATransaction.cleanup();
}
}
/**

View File

@ -16,22 +16,20 @@
*/
package org.apache.activemq.ra;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
@ -39,6 +37,8 @@ import org.apache.activemq.TransactionContext;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Knows how to connect to one ActiveMQ server. It can then activate endpoints
@ -50,7 +50,7 @@ import org.apache.activemq.util.ServiceSupport;
*
*/
public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
private BootstrapContext bootstrapContext;
@ -233,21 +233,129 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
*/
public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
try {
return new XAResource[]{(XAResource)
java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{XAResource.class},
new java.lang.reflect.InvocationHandler () {
return new XAResource[]{
new TransactionContext() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ActiveMQConnection connection = makeConnection();
public boolean isSameRM(XAResource xaresource) throws XAException {
ActiveMQConnection original = null;
try {
return method.invoke(new TransactionContext(connection), args);
original = setConnection(newConnection());
boolean result = super.isSameRM(xaresource);
LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
return result;
} catch (JMSException e) {
LOG.trace("isSameRM({}) failed", xaresource, e);
XAException xaException = new XAException(e.getMessage());
throw xaException;
} finally {
closeConnection(original);
}
}
@Override
protected String getResourceManagerId() throws JMSException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
return super.getResourceManagerId();
} finally {
closeConnection(original);
}
}
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
ActiveMQConnection original = null;
try {
setConnection(newConnection());
super.commit(xid, onePhase);
LOG.trace("{}.commit({},{})", getConnection(), xid);
} catch (JMSException e) {
LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
throwXAException(e);
} finally {
closeConnection(original);
}
}
@Override
public void rollback(Xid xid) throws XAException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
super.rollback(xid);
LOG.trace("{}.rollback({})", getConnection(), xid);
} catch (JMSException e) {
LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
throwXAException(e);
} finally {
closeConnection(original);
}
}
@Override
public Xid[] recover(int flags) throws XAException {
Xid[] result = new Xid[]{};
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
result = super.recover(flags);
LOG.trace("{}.recover({})={}", getConnection(), flags, result);
} catch (JMSException e) {
LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
throwXAException(e);
} finally {
closeConnection(original);
}
return result;
}
@Override
public void forget(Xid xid) throws XAException {
ActiveMQConnection original = null;
try {
original = setConnection(newConnection());
super.forget(xid);
LOG.trace("{}.forget({})", getConnection(), xid);
} catch (JMSException e) {
LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
throwXAException(e);
} finally {
closeConnection(original);
}
}
private void throwXAException(JMSException e) throws XAException {
XAException xaException = new XAException(e.getMessage());
xaException.errorCode = XAException.XAER_RMFAIL;
throw xaException;
}
private ActiveMQConnection newConnection() throws JMSException {
ActiveMQConnection connection = makeConnection();
connection.start();
return connection;
}
private void closeConnection(ActiveMQConnection original) {
ActiveMQConnection connection = getConnection();
if (connection != null) {
try {
connection.close();
} catch (Throwable ignore) {}
} catch (JMSException ignored) {
} finally {
setConnection(original);
}
}
})};
}
}};
} catch (Exception e) {
throw new ResourceException(e);

View File

@ -33,13 +33,17 @@ import org.slf4j.LoggerFactory;
public class LocalAndXATransaction implements XAResource, LocalTransaction {
private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class);
private final TransactionContext transactionContext;
private TransactionContext transactionContext;
private boolean inManagedTx;
public LocalAndXATransaction(TransactionContext transactionContext) {
this.transactionContext = transactionContext;
}
public void setTransactionContext(TransactionContext transactionContext) {
this.transactionContext = transactionContext;
}
public void setInManagedTx(boolean inManagedTx) throws JMSException {
this.inManagedTx = inManagedTx;
if (!inManagedTx) {

View File

@ -168,15 +168,15 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
if ( session.isRunning() ) {
session.run();
} else {
log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size());
stale = true;
}
} catch (Throwable e) {
stale = true;
if ( log.isDebugEnabled() ) {
log.debug("Endpoint failed to process message.", e);
log.debug("Endpoint {} failed to process message.", session, e);
} else if ( log.isInfoEnabled() ) {
log.info("Endpoint failed to process message. Reason: " + e.getMessage());
log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), session);
}
} finally {
InboundContextSupport.unregister(this);
@ -190,6 +190,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
}
if (!session.hasUncomsumedMessages()) {
runningFlag = false;
log.debug("Session has no unconsumed message, returning to pool");
pool.returnToPool(this);
break;
}
@ -255,7 +256,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
*/
@Override
public String toString() {
return "ServerSessionImpl:" + serverSessionId;
return "ServerSessionImpl:" + serverSessionId + "{" + session +"}";
}
public void close() {

View File

@ -227,11 +227,24 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
try {
ActiveMQSession session = (ActiveMQSession)ss.getSession();
List l = session.getUnconsumedMessages();
if (!l.isEmpty()) {
ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
if (connection != null) {
for (Iterator i = l.iterator(); i.hasNext();) {
dispatchToSession((MessageDispatch)i.next());
MessageDispatch md = (MessageDispatch)i.next();
if (connection.hasDispatcher(md.getConsumerId())) {
dispatchToSession(md);
LOG.trace("on remove of {} redispatch of {}", session, md);
} else {
LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", md, session.getConnection());
}
}
} else {
LOG.trace("on remove of {} not redispatching while disconnected", session);
}
}
} catch (Throwable t) {
LOG.error("Error redispatching unconsumed messages from stale session", t);
LOG.error("Error redispatching unconsumed messages from stale server session {}", ss, t);
}
ss.close();
synchronized (closing) {

View File

@ -117,5 +117,11 @@ public class ActiveMQConnectionFactoryTest extends TestCase {
assertEquals("one resource", 1, resources.length);
assertEquals("no pending transactions", 0, resources[0].recover(100).length);
// validate equality
XAResource[] resource2 = ra.getXAResources(null);
assertEquals("one resource", 1, resource2.length);
assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
assertFalse("no tthe same instance", resources[0].equals(resource2[0]));
}
}