diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 7e7dc60c26..4265f28c5b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class); - private ConcurrentMap transactions = new ConcurrentHashMap<>(); + private ConcurrentMap transactions = new ConcurrentHashMap<>(); private final ProtonProtocolManager manager; @@ -224,25 +224,32 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { public Binary newTransaction() { XidImpl xid = newXID(); + Binary binary = new Binary(xid.getGlobalTransactionId()); Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1); - transactions.put(xid, transaction); - return new Binary(xid.getGlobalTransactionId()); + transactions.put(binary, transaction); + return binary; } - public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { - XidImpl xid = newXID(txid.getArray()); - Transaction tx = transactions.get(xid); + public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException { + Transaction tx; + + if (remove) { + tx = transactions.remove(txid); + } else { + tx = transactions.get(txid); + } if (tx == null) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString()); + logger.warn("Couldn't find txid = " + txid); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString()); } return tx; } - public void removeTransaction(Binary txid) { + public Transaction removeTransaction(Binary txid) { XidImpl xid = newXID(txid.getArray()); - transactions.remove(xid); + return transactions.remove(xid); } protected XidImpl newXID() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 7f7e22b730..3592dbc77c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; -import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback { private final AtomicBoolean draining = new AtomicBoolean(false); + public Object getProtonLock() { + return connection.getLock(); + } + public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -382,8 +385,10 @@ public class AMQPSessionCallback implements SessionCallback { condition.setDescription(errorMessage); Rejected rejected = new Rejected(); rejected.setError(condition); - delivery.disposition(rejected); - delivery.settle(); + synchronized (connection.getLock()) { + delivery.disposition(rejected); + delivery.settle(); + } connection.flush(); } @@ -536,29 +541,14 @@ public class AMQPSessionCallback implements SessionCallback { } } - public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { - return protonSPI.getTransaction(txid); + public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException { + return protonSPI.getTransaction(txid, remove); } public Binary newTransaction() { return protonSPI.newTransaction(); } - public void commitTX(Binary txid) throws Exception { - Transaction tx = protonSPI.getTransaction(txid); - tx.commit(true); - protonSPI.removeTransaction(txid); - } - - public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception { - Transaction tx = protonSPI.getTransaction(txid); - tx.rollback(); - protonSPI.removeTransaction(txid); - } - - public void dischargeTx(Binary txid) throws ActiveMQAMQPException { - ((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge(); - } public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { return serverSession.getMatchingQueue(address, routingType); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index d1fc0e1524..ccc4a6ce27 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -142,7 +142,7 @@ public class AMQPSessionContext extends ProtonInitializable { } public void addTransactionHandler(Coordinator coordinator, Receiver receiver) { - ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI); + ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection); coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index f08c1fc4e2..54467cfd6c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -155,7 +155,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (delivery.getRemoteState() instanceof TransactionalState) { TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - tx = this.sessionSPI.getTransaction(txState.getTxnId()); + tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); } sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); @@ -201,8 +201,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } else { synchronized (connection.getLock()) { receiver.flow(credits); - connection.flush(); } + connection.flush(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 0e0447fbe2..5a97c02da3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -493,7 +493,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (remoteState instanceof TransactionalState) { TransactionalState txState = (TransactionalState) remoteState; - ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId()); + ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); if (txState.getOutcome() != null) { settleImmediate = false; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 721bd33dfd..12498b0890 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -18,10 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; -import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -36,9 +35,6 @@ import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - /** * handles an amqp Coordinator to deal with transaction boundaries etc */ @@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class); public static final int DEFAULT_COORDINATOR_CREDIT = 100; + public static final int CREDIT_LOW_WATERMARK = 30; final AMQPSessionCallback sessionSPI; + final AMQPConnectionContext connection; - public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) { + public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { this.sessionSPI = sessionSPI; + this.connection = connection; } @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); - final Receiver receiver; try { receiver = ((Receiver) delivery.getLink()); @@ -66,9 +63,21 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { return; } - receiver.recv(new NettyWritable(buffer)); + byte[] buffer; + + synchronized (connection.getLock()) { + // Replenish coordinator receiver credit on exhaustion so sender can continue + // transaction declare and discahrge operations. + if (receiver.getCredit() < CREDIT_LOW_WATERMARK) { + receiver.flow(DEFAULT_COORDINATOR_CREDIT); + } + + buffer = new byte[delivery.available()]; + receiver.recv(buffer, 0, buffer.length); + receiver.advance(); + } + - receiver.advance(); MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer); @@ -78,44 +87,47 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); - delivery.disposition(declared); + synchronized (connection.getLock()) { + delivery.disposition(declared); + } } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; Binary txID = discharge.getTxnId(); - sessionSPI.dischargeTx(txID); - if (discharge.getFail()) { - try { - sessionSPI.rollbackTX(txID, true); - delivery.disposition(new Accepted()); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage()); - } - } else { - try { - sessionSPI.commitTX(txID); - delivery.disposition(new Accepted()); - } catch (ActiveMQAMQPException amqpE) { - throw amqpE; - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); - } - } + ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true); + tx.discharge(); - // Replenish coordinator receiver credit on exhaustion so sender can continue - // transaction declare and discahrge operations. - if (receiver.getCredit() == 0) { - receiver.flow(DEFAULT_COORDINATOR_CREDIT); + if (discharge.getFail()) { + tx.rollback(); + synchronized (connection.getLock()) { + delivery.disposition(new Accepted()); + } + connection.flush(); + } else { + tx.commit(); + synchronized (connection.getLock()) { + delivery.disposition(new Accepted()); + } + connection.flush(); } } } catch (ActiveMQAMQPException amqpE) { - delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); - } catch (Exception e) { + log.warn(amqpE.getMessage(), amqpE); + synchronized (connection.getLock()) { + delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); + } + connection.flush(); + } catch (Throwable e) { log.warn(e.getMessage(), e); - delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); + synchronized (connection.getLock()) { + delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); + } + connection.flush(); } finally { - delivery.settle(); - buffer.release(); + synchronized (connection.getLock()) { + delivery.settle(); + } + connection.flush(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java index 9257c6bfaa..4267b85b28 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java @@ -16,28 +16,14 @@ */ package org.apache.activemq.artemis.protocol.amqp.util; -import io.netty.buffer.ByteBuf; -import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; public class DeliveryUtil { - public static int readDelivery(Receiver receiver, ByteBuf buffer) { - int initial = buffer.writerIndex(); - // optimization by norman - int count; - while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) { - // Increment the writer index by the number of bytes written into it while calling recv. - buffer.writerIndex(buffer.writerIndex() + count); - buffer.ensureWritable(count); - } - return buffer.writerIndex() - initial; - } - - public static MessageImpl decodeMessageImpl(ByteBuf buffer) { + public static MessageImpl decodeMessageImpl(byte[] data) { MessageImpl message = (MessageImpl) Message.Factory.create(); - message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); + message.decode(data, 0, data.length); return message; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index c00cc1c244..41bc5e782c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,9 +17,20 @@ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; import org.junit.Test; /** @@ -788,4 +801,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 120000) + public void testSendPersistentTX() throws Exception { + int MESSAGE_COUNT = 100000; + AtomicInteger errors = new AtomicInteger(0); + server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true); + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + Connection sendConnection = factory.createConnection(); + Connection consumerConnection = factory.createConnection(); + try { + + Thread receiverThread = new Thread() { + @Override + public void run() { + try { + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue q1 = consumerSession.createQueue("q1"); + + MessageConsumer consumer = consumerSession.createConsumer(q1); + + for (int i = 1; i <= MESSAGE_COUNT; i++) { + Message message = consumer.receive(5000); + if (message == null) { + throw new IOException("No message read in time."); + } + + if (i % 100 == 0) { + if (i % 1000 == 0) System.out.println("Read message " + i); + consumerSession.commit(); + } + } + + // Assure that all messages are consumed + consumerSession.commit(); + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + + } + }; + + receiverThread.start(); + + Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED); + + javax.jms.Queue q1 = sendingSession.createQueue("q1"); + MessageProducer producer = sendingSession.createProducer(q1); + producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < MESSAGE_COUNT; i++) { + producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + if (i % 100 == 0) { + if (i % 1000 == 0) System.out.println("Sending " + i); + sendingSession.commit(); + } + } + + sendingSession.commit(); + + receiverThread.join(50000); + Assert.assertFalse(receiverThread.isAlive()); + + Assert.assertEquals(0, errors.get()); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + sendConnection.close(); + consumerConnection.close(); + } + + } }