ARTEMIS-1046 Fixing TX eventually stalling with AMQP

I have also reviewed the model in which we used transactions
This commit is contained in:
Clebert Suconic 2017-03-17 15:59:34 -04:00
parent 291a4719b6
commit 1ef4dcf7d9
8 changed files with 170 additions and 89 deletions

View File

@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class); private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>(); private ConcurrentMap<Binary, Transaction> transactions = new ConcurrentHashMap<>();
private final ProtonProtocolManager manager; private final ProtonProtocolManager manager;
@ -224,25 +224,32 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public Binary newTransaction() { public Binary newTransaction() {
XidImpl xid = newXID(); XidImpl xid = newXID();
Binary binary = new Binary(xid.getGlobalTransactionId());
Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1); Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
transactions.put(xid, transaction); transactions.put(binary, transaction);
return new Binary(xid.getGlobalTransactionId()); return binary;
} }
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
XidImpl xid = newXID(txid.getArray()); Transaction tx;
Transaction tx = transactions.get(xid);
if (remove) {
tx = transactions.remove(txid);
} else {
tx = transactions.get(txid);
}
if (tx == null) { if (tx == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString()); logger.warn("Couldn't find txid = " + txid);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString());
} }
return tx; return tx;
} }
public void removeTransaction(Binary txid) { public Transaction removeTransaction(Binary txid) {
XidImpl xid = newXID(txid.getArray()); XidImpl xid = newXID(txid.getArray());
transactions.remove(xid); return transactions.remove(xid);
} }
protected XidImpl newXID() { protected XidImpl newXID() {

View File

@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback {
private final AtomicBoolean draining = new AtomicBoolean(false); private final AtomicBoolean draining = new AtomicBoolean(false);
public Object getProtonLock() {
return connection.getLock();
}
public AMQPSessionCallback(AMQPConnectionCallback protonSPI, public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager, ProtonProtocolManager manager,
AMQPConnectionContext connection, AMQPConnectionContext connection,
@ -382,8 +385,10 @@ public class AMQPSessionCallback implements SessionCallback {
condition.setDescription(errorMessage); condition.setDescription(errorMessage);
Rejected rejected = new Rejected(); Rejected rejected = new Rejected();
rejected.setError(condition); rejected.setError(condition);
delivery.disposition(rejected); synchronized (connection.getLock()) {
delivery.settle(); delivery.disposition(rejected);
delivery.settle();
}
connection.flush(); connection.flush();
} }
@ -536,29 +541,14 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
return protonSPI.getTransaction(txid); return protonSPI.getTransaction(txid, remove);
} }
public Binary newTransaction() { public Binary newTransaction() {
return protonSPI.newTransaction(); return protonSPI.newTransaction();
} }
public void commitTX(Binary txid) throws Exception {
Transaction tx = protonSPI.getTransaction(txid);
tx.commit(true);
protonSPI.removeTransaction(txid);
}
public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
Transaction tx = protonSPI.getTransaction(txid);
tx.rollback();
protonSPI.removeTransaction(txid);
}
public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
}
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, routingType); return serverSession.getMatchingQueue(address, routingType);

View File

@ -142,7 +142,7 @@ public class AMQPSessionContext extends ProtonInitializable {
} }
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) { public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI); ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection);
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));

View File

@ -155,7 +155,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (delivery.getRemoteState() instanceof TransactionalState) { if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState(); TransactionalState txState = (TransactionalState) delivery.getRemoteState();
tx = this.sessionSPI.getTransaction(txState.getTxnId()); tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
} }
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
@ -201,8 +201,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} else { } else {
synchronized (connection.getLock()) { synchronized (connection.getLock()) {
receiver.flow(credits); receiver.flow(credits);
connection.flush();
} }
connection.flush();
} }
} }

View File

@ -493,7 +493,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (remoteState instanceof TransactionalState) { if (remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState; TransactionalState txState = (TransactionalState) remoteState;
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId()); ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
if (txState.getOutcome() != null) { if (txState.getOutcome() != null) {
settleImmediate = false; settleImmediate = false;

View File

@ -18,10 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -36,9 +35,6 @@ import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.impl.MessageImpl; import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/** /**
* handles an amqp Coordinator to deal with transaction boundaries etc * handles an amqp Coordinator to deal with transaction boundaries etc
*/ */
@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class); private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
public static final int DEFAULT_COORDINATOR_CREDIT = 100; public static final int DEFAULT_COORDINATOR_CREDIT = 100;
public static final int CREDIT_LOW_WATERMARK = 30;
final AMQPSessionCallback sessionSPI; final AMQPSessionCallback sessionSPI;
final AMQPConnectionContext connection;
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) { public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
this.sessionSPI = sessionSPI; this.sessionSPI = sessionSPI;
this.connection = connection;
} }
@Override @Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException { public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
final Receiver receiver; final Receiver receiver;
try { try {
receiver = ((Receiver) delivery.getLink()); receiver = ((Receiver) delivery.getLink());
@ -66,9 +63,21 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
return; return;
} }
receiver.recv(new NettyWritable(buffer)); byte[] buffer;
synchronized (connection.getLock()) {
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
receiver.flow(DEFAULT_COORDINATOR_CREDIT);
}
buffer = new byte[delivery.available()];
receiver.recv(buffer, 0, buffer.length);
receiver.advance();
}
receiver.advance();
MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer); MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
@ -78,44 +87,47 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Binary txID = sessionSPI.newTransaction(); Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared(); Declared declared = new Declared();
declared.setTxnId(txID); declared.setTxnId(txID);
delivery.disposition(declared); synchronized (connection.getLock()) {
delivery.disposition(declared);
}
} else if (action instanceof Discharge) { } else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action; Discharge discharge = (Discharge) action;
Binary txID = discharge.getTxnId(); Binary txID = discharge.getTxnId();
sessionSPI.dischargeTx(txID); ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
if (discharge.getFail()) { tx.discharge();
try {
sessionSPI.rollbackTX(txID, true);
delivery.disposition(new Accepted());
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
}
} else {
try {
sessionSPI.commitTX(txID);
delivery.disposition(new Accepted());
} catch (ActiveMQAMQPException amqpE) {
throw amqpE;
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
}
}
// Replenish coordinator receiver credit on exhaustion so sender can continue if (discharge.getFail()) {
// transaction declare and discahrge operations. tx.rollback();
if (receiver.getCredit() == 0) { synchronized (connection.getLock()) {
receiver.flow(DEFAULT_COORDINATOR_CREDIT); delivery.disposition(new Accepted());
}
connection.flush();
} else {
tx.commit();
synchronized (connection.getLock()) {
delivery.disposition(new Accepted());
}
connection.flush();
} }
} }
} catch (ActiveMQAMQPException amqpE) { } catch (ActiveMQAMQPException amqpE) {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); log.warn(amqpE.getMessage(), amqpE);
} catch (Exception e) { synchronized (connection.getLock()) {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
}
connection.flush();
} catch (Throwable e) {
log.warn(e.getMessage(), e); log.warn(e.getMessage(), e);
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); synchronized (connection.getLock()) {
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
}
connection.flush();
} finally { } finally {
delivery.settle(); synchronized (connection.getLock()) {
buffer.release(); delivery.settle();
}
connection.flush();
} }
} }

View File

@ -16,28 +16,14 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.util; package org.apache.activemq.artemis.protocol.amqp.util;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl; import org.apache.qpid.proton.message.impl.MessageImpl;
public class DeliveryUtil { public class DeliveryUtil {
public static int readDelivery(Receiver receiver, ByteBuf buffer) { public static MessageImpl decodeMessageImpl(byte[] data) {
int initial = buffer.writerIndex();
// optimization by norman
int count;
while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
// Increment the writer index by the number of bytes written into it while calling recv.
buffer.writerIndex(buffer.writerIndex() + count);
buffer.ensureWritable(count);
}
return buffer.writerIndex() - initial;
}
public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
MessageImpl message = (MessageImpl) Message.Factory.create(); MessageImpl message = (MessageImpl) Message.Factory.create();
message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); message.decode(data, 0, data.length);
return message; return message;
} }

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,9 +17,20 @@
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/** /**
@ -788,4 +801,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 120000)
public void testSendPersistentTX() throws Exception {
int MESSAGE_COUNT = 100000;
AtomicInteger errors = new AtomicInteger(0);
server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true);
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
Connection sendConnection = factory.createConnection();
Connection consumerConnection = factory.createConnection();
try {
Thread receiverThread = new Thread() {
@Override
public void run() {
try {
consumerConnection.start();
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue q1 = consumerSession.createQueue("q1");
MessageConsumer consumer = consumerSession.createConsumer(q1);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
Message message = consumer.receive(5000);
if (message == null) {
throw new IOException("No message read in time.");
}
if (i % 100 == 0) {
if (i % 1000 == 0) System.out.println("Read message " + i);
consumerSession.commit();
}
}
// Assure that all messages are consumed
consumerSession.commit();
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
receiverThread.start();
Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue q1 = sendingSession.createQueue("q1");
MessageProducer producer = sendingSession.createProducer(q1);
producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
if (i % 100 == 0) {
if (i % 1000 == 0) System.out.println("Sending " + i);
sendingSession.commit();
}
}
sendingSession.commit();
receiverThread.join(50000);
Assert.assertFalse(receiverThread.isAlive());
Assert.assertEquals(0, errors.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
sendConnection.close();
consumerConnection.close();
}
}
} }