diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 9021272532..850671a642 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -35,12 +35,12 @@ import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL; @@ -220,7 +220,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { public Binary newTransaction() { XidImpl xid = newXID(); - Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1); + Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1); transactions.put(xid, transaction); return new Binary(xid.getGlobalTransactionId()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 5931afea99..a079190ae5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -40,7 +40,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; -import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; @@ -49,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -327,6 +327,7 @@ public class AMQPSessionCallback implements SessionCallback { recoverContext(); try { ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); + ((ServerConsumer) brokerConsumer).getQueue().forceDelivery(); } finally { resetContext(); } @@ -486,7 +487,7 @@ public class AMQPSessionCallback implements SessionCallback { ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); try { - return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount); + return plugSender.deliverMessage(ref, deliveryCount); } catch (Exception e) { synchronized (connection.getLock()) { plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); @@ -560,7 +561,10 @@ public class AMQPSessionCallback implements SessionCallback { Transaction tx = protonSPI.getTransaction(txid); tx.rollback(); protonSPI.removeTransaction(txid); + } + public void dischargeTx(Binary txid) throws ActiveMQAMQPException { + ((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge(); } public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index bd7b979151..89b6ed3306 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; +import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transport.ErrorCondition; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index ea2635ef13..d5fc1966a3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.util.Arrays; +import java.util.List; + import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.RoutingType; @@ -35,9 +38,6 @@ import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.jboss.logging.Logger; -import java.util.Arrays; -import java.util.List; - public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 4e33c9b5f9..fcbd47e2bb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -30,17 +30,19 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.AddressQueryResult; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; @@ -474,26 +476,29 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (closed) { return; } - Message message = (Message)delivery.getContext(); + + Message message = ((MessageReference) delivery.getContext()).getMessage(); boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; DeliveryState remoteState = delivery.getRemoteState(); + boolean settleImmediate = true; if (remoteState != null) { // If we are transactional then we need ack if the msg has been accepted if (remoteState instanceof TransactionalState) { TransactionalState txState = (TransactionalState) remoteState; - Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId()); + ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId()); + if (txState.getOutcome() != null) { + settleImmediate = false; Outcome outcome = txState.getOutcome(); if (outcome instanceof Accepted) { if (!delivery.remotelySettled()) { TransactionalState txAccepted = new TransactionalState(); txAccepted.setOutcome(Accepted.getInstance()); txAccepted.setTxnId(txState.getTxnId()); - delivery.disposition(txAccepted); } // we have to individual ack as we can't guarantee we will get the delivery @@ -501,6 +506,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // from dealer, a perf hit but a must try { sessionSPI.ack(tx, brokerConsumer, message); + tx.addDelivery(delivery, this); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); } @@ -550,16 +556,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr protonSession.replaceTag(delivery.getTag()); } - synchronized (connection.getLock()) { - delivery.settle(); - sender.offer(1); - } + if (settleImmediate) settle(delivery); } else { // todo not sure if we need to do anything here } } + public void settle(Delivery delivery) { + synchronized (connection.getLock()) { + delivery.settle(); + } + } + public synchronized void checkState() { sessionSPI.resumeDelivery(brokerConsumer); } @@ -567,7 +576,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * handle an out going message from ActiveMQ Artemis, send via the Proton Sender */ - public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception { + public int deliverMessage(MessageReference messageReference, int deliveryCount) throws Exception { + AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage()); + if (closed) { return 0; } @@ -602,14 +613,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr final Delivery delivery; delivery = sender.delivery(tag, 0, tag.length); delivery.setMessageFormat((int) message.getMessageFormat()); - delivery.setContext(message); + delivery.setContext(messageReference); // this will avoid a copy.. patch provided by Norman using buffer.array() sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, message); + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); delivery.settle(); } else { sender.advance(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java similarity index 96% rename from artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java rename to artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 1afeba873a..2cdb0729ab 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -14,13 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.protocol.amqp.proton; +package org.apache.activemq.artemis.protocol.amqp.proton.transaction; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.qpid.proton.amqp.Binary; @@ -75,11 +76,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { Declared declared = new Declared(); declared.setTxnId(txID); delivery.disposition(declared); - delivery.settle(); } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; Binary txID = discharge.getTxnId(); + sessionSPI.dischargeTx(txID); if (discharge.getFail()) { try { sessionSPI.rollbackTX(txID, true); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java new file mode 100644 index 0000000000..ab4ff42981 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton.transaction; + +import javax.transaction.xa.Xid; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.RefsOperation; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.qpid.proton.engine.Delivery; + + +/** + * AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled + * or not. This class extends the Core TransactionImpl used for normal TX behaviour. In the case where deliveries + * have been settled, normal Ack rollback is applied. For cases where deliveries are unsettled and rolled back, + * we increment the delivery count and return to the consumer. + */ +public class ProtonTransactionImpl extends TransactionImpl { + + /* We need to track the Message reference against the AMQP objects, so we can check whether the corresponding + deliveries have been settled. We also need to ensure we are settling on the correct link. Hence why we keep a ref + to the ProtonServerSenderContext here. + */ + private final Map> deliveries = new HashMap<>(); + + private boolean discharged; + + public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { + super(xid, storageManager, timeoutSeconds); + } + + @Override + public RefsOperation createRefsOperation(Queue queue) { + return new ProtonTransactionRefsOperation(queue, storageManager); + } + + @Override + public void rollback() throws Exception { + super.rollback(); + } + + public void addDelivery(Delivery delivery, ProtonServerSenderContext context) { + deliveries.put(((MessageReference) delivery.getContext()), new Pair<>(delivery, context)); + } + + public Map> getDeliveries() { + return deliveries; + } + + @Override + public void commit() throws Exception { + super.commit(); + + // Settle all unsettled deliveries if commit is successful + for (Pair p : deliveries.values()) { + if (!p.getA().isSettled()) p.getB().settle(p.getA()); + } + } + + public boolean isDischarged() { + return discharged; + } + + public void discharge() { + discharged = true; + } +} + + diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java new file mode 100644 index 0000000000..7b48ac0d0f --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.proton.transaction; + +import java.util.LinkedList; +import java.util.Map; + +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.core.server.impl.RefsOperation; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.qpid.proton.engine.Delivery; + +/** + * AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled + * or not. This class extends the Core RefsOperation used for normal acks. In the case where deliveries have been + * settled, normal Ack rollback is applied. For cases where deliveries are unsettled and rolled back, we increment + * the delivery count and return to the consumer. + */ +public class ProtonTransactionRefsOperation extends RefsOperation { + + public ProtonTransactionRefsOperation(final Queue queue, StorageManager storageManager) { + super(queue, storageManager); + } + + @Override + public void rollbackRedelivery(Transaction txn, MessageReference ref, long timeBase, Map> queueMap) throws Exception { + ProtonTransactionImpl tx = (ProtonTransactionImpl) txn; + + if (tx.getDeliveries().containsKey(ref)) { + Delivery del = tx.getDeliveries().get(ref).getA(); + ServerConsumer consumer = (ServerConsumer) tx.getDeliveries().get(ref).getB().getBrokerConsumer(); + // Rollback normally if the delivery is not settled or a forced TX rollback is done (e.g. connection drop). + if (del.remotelySettled() || !tx.isDischarged()) { + super.rollbackRedelivery(tx, ref, timeBase, queueMap); + } else { + ref.incrementDeliveryCount(); + consumer.backToDelivering(ref); + del.disposition(del.getLocalState() == null ? del.getDefaultDeliveryState() : del.getLocalState()); + } + } else { + super.rollbackRedelivery(tx, ref, timeBase, queueMap); + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 0f3da07254..c5935d7eb8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -89,18 +89,7 @@ public class RefsOperation extends TransactionOperationAbstract { if (ref.isAlreadyAcked()) { ackedRefs.add(ref); } - // if ignore redelivery check, we just perform redelivery straight - if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) { - LinkedList toCancel = queueMap.get(ref.getQueue()); - - if (toCancel == null) { - toCancel = new LinkedList<>(); - - queueMap.put((QueueImpl) ref.getQueue(), toCancel); - } - - toCancel.addFirst(ref); - } + rollbackRedelivery(tx, ref, timeBase, queueMap); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorCheckingDLQ(e); } @@ -145,6 +134,21 @@ public class RefsOperation extends TransactionOperationAbstract { } } + protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map> queueMap) throws Exception { + // if ignore redelivery check, we just perform redelivery straight + if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) { + LinkedList toCancel = queueMap.get(ref.getQueue()); + + if (toCancel == null) { + toCancel = new LinkedList<>(); + + queueMap.put((QueueImpl) ref.getQueue(), toCancel); + } + + toCancel.addFirst(ref); + } + } + @Override public void afterCommit(final Transaction tx) { for (MessageReference ref : refsToAck) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index db2e8310a3..d9b45d3fea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -99,12 +99,23 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { ActiveMQServer server = createServer(true, true); serverManager = new JMSServerManagerImpl(server); Configuration serverConfig = server.getConfiguration(); + + // Address 1 CoreAddressConfiguration address = new CoreAddressConfiguration(); address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST); CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(); queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST); address.getQueueConfigurations().add(queueConfig); serverConfig.addAddressConfiguration(address); + + // Address 2 + CoreAddressConfiguration address2 = new CoreAddressConfiguration(); + address2.setName(getTestName2()).getRoutingTypes().add(RoutingType.ANYCAST); + CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration(); + queueConfig2.setName(getTestName2()).setAddress(getTestName2()).setRoutingType(RoutingType.ANYCAST); + address2.getQueueConfigurations().add(queueConfig2); + serverConfig.addAddressConfiguration(address2); + serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.setSecurityEnabled(false); Set acceptors = serverConfig.getAcceptorConfigurations(); @@ -127,6 +138,10 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { return getName(); } + public String getTestName2() { + return getName() + "2"; + } + public AmqpClientTestSupport() { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java index d92fa0feec..f206654654 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { receiver1.flow(1); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + + AmqpReceiver receiver2 = session.createReceiver(getTestName()); + + assertNotNull("did not receive message first time", message); assertEquals("MessageID:0", message.getMessageId()); @@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { assertNotNull(protonMessage); assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + receiver2.flow(1); message.release(); - // Read the message again and validate its state - AmqpReceiver receiver2 = session.createReceiver(getTestName()); - receiver2.flow(1); + // Read the message again and validate its state message = receiver2.receive(10, TimeUnit.SECONDS); assertNotNull("did not receive message again", message); assertEquals("MessageID:0", message.getMessageId()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 1708720334..1b2a1b0df5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -514,7 +514,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Rollback the other half the consumed messages txnSession.begin(); for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { - messages.get(i).accept(txnSession); + messages.get(i).accept(txnSession, false); } txnSession.rollback(); @@ -525,18 +525,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { message.release(); } - // Commit the other half the consumed messages - // This is a variation from the .NET client tests which doesn't settle the - // messages in the TX until commit is called but on ActiveMQ they will be - // redispatched regardless and not stay in the acquired state. - txnSession.begin(); - for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); - message.accept(); - } - txnSession.commit(); - // The final message should still be pending. { AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 39daee4727..1308c37b3b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase { private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; - private static final String brokerName = "my-broker"; private static final long maxSizeBytes = 1 * 1024 * 1024; @@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase { session.close(); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); //because tx commit is executed async on broker, we use a timed wait. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10)); } @Test @@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase { session.rollback(); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); //because tx rollback is executed async on broker, we use a timed wait. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10)); } @@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase { return count; } } + + @Test + public void testReleaseDisposition() throws Exception { + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection connection = client.connect(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(address); + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + sender.send(message); + + AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(10); + + AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(m1); + m1.release(); + + //receiver.flow(10); + AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(m2); + m2.accept(); + } finally { + connection.close(); + } + } }