This commit is contained in:
Clebert Suconic 2017-03-06 11:55:29 -05:00
commit d12330f151
13 changed files with 257 additions and 51 deletions

View File

@ -35,12 +35,12 @@ import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
@ -220,7 +220,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public Binary newTransaction() { public Binary newTransaction() {
XidImpl xid = newXID(); XidImpl xid = newXID();
Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1); Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
transactions.put(xid, transaction); transactions.put(xid, transaction);
return new Binary(xid.getGlobalTransactionId()); return new Binary(xid.getGlobalTransactionId());
} }

View File

@ -40,7 +40,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@ -49,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -327,6 +327,7 @@ public class AMQPSessionCallback implements SessionCallback {
recoverContext(); recoverContext();
try { try {
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally { } finally {
resetContext(); resetContext();
} }
@ -486,7 +487,7 @@ public class AMQPSessionCallback implements SessionCallback {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
try { try {
return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount); return plugSender.deliverMessage(ref, deliveryCount);
} catch (Exception e) { } catch (Exception e) {
synchronized (connection.getLock()) { synchronized (connection.getLock()) {
plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
@ -560,7 +561,10 @@ public class AMQPSessionCallback implements SessionCallback {
Transaction tx = protonSPI.getTransaction(txid); Transaction tx = protonSPI.getTransaction(txid);
tx.rollback(); tx.rollback();
protonSPI.removeTransaction(txid); protonSPI.removeTransaction(txid);
}
public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
} }
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.Arrays;
import java.util.List;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
@ -35,9 +38,6 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import java.util.Arrays;
import java.util.List;
public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler { public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class); private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);

View File

@ -30,17 +30,19 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.activemq.artemis.selector.impl.SelectorParser;
@ -474,26 +476,29 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (closed) { if (closed) {
return; return;
} }
Message message = (Message)delivery.getContext();
Message message = ((MessageReference) delivery.getContext()).getMessage();
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
DeliveryState remoteState = delivery.getRemoteState(); DeliveryState remoteState = delivery.getRemoteState();
boolean settleImmediate = true;
if (remoteState != null) { if (remoteState != null) {
// If we are transactional then we need ack if the msg has been accepted // If we are transactional then we need ack if the msg has been accepted
if (remoteState instanceof TransactionalState) { if (remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState; TransactionalState txState = (TransactionalState) remoteState;
Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId()); ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId());
if (txState.getOutcome() != null) { if (txState.getOutcome() != null) {
settleImmediate = false;
Outcome outcome = txState.getOutcome(); Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) { if (outcome instanceof Accepted) {
if (!delivery.remotelySettled()) { if (!delivery.remotelySettled()) {
TransactionalState txAccepted = new TransactionalState(); TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance()); txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(txState.getTxnId()); txAccepted.setTxnId(txState.getTxnId());
delivery.disposition(txAccepted); delivery.disposition(txAccepted);
} }
// we have to individual ack as we can't guarantee we will get the delivery // we have to individual ack as we can't guarantee we will get the delivery
@ -501,6 +506,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// from dealer, a perf hit but a must // from dealer, a perf hit but a must
try { try {
sessionSPI.ack(tx, brokerConsumer, message); sessionSPI.ack(tx, brokerConsumer, message);
tx.addDelivery(delivery, this);
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
} }
@ -550,16 +556,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
protonSession.replaceTag(delivery.getTag()); protonSession.replaceTag(delivery.getTag());
} }
synchronized (connection.getLock()) { if (settleImmediate) settle(delivery);
delivery.settle();
sender.offer(1);
}
} else { } else {
// todo not sure if we need to do anything here // todo not sure if we need to do anything here
} }
} }
public void settle(Delivery delivery) {
synchronized (connection.getLock()) {
delivery.settle();
}
}
public synchronized void checkState() { public synchronized void checkState() {
sessionSPI.resumeDelivery(brokerConsumer); sessionSPI.resumeDelivery(brokerConsumer);
} }
@ -567,7 +576,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/** /**
* handle an out going message from ActiveMQ Artemis, send via the Proton Sender * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
*/ */
public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception { public int deliverMessage(MessageReference messageReference, int deliveryCount) throws Exception {
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
if (closed) { if (closed) {
return 0; return 0;
} }
@ -602,14 +613,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
final Delivery delivery; final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length); delivery = sender.delivery(tag, 0, tag.length);
delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(message); delivery.setContext(messageReference);
// this will avoid a copy.. patch provided by Norman using buffer.array() // this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
if (preSettle) { if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it. // Presettled means the client implicitly accepts any delivery we send it.
sessionSPI.ack(null, brokerConsumer, message); sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
delivery.settle(); delivery.settle();
} else { } else {
sender.advance(); sender.advance();

View File

@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
@ -75,11 +76,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Declared declared = new Declared(); Declared declared = new Declared();
declared.setTxnId(txID); declared.setTxnId(txID);
delivery.disposition(declared); delivery.disposition(declared);
delivery.settle();
} else if (action instanceof Discharge) { } else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action; Discharge discharge = (Discharge) action;
Binary txID = discharge.getTxnId(); Binary txID = discharge.getTxnId();
sessionSPI.dischargeTx(txID);
if (discharge.getFail()) { if (discharge.getFail()) {
try { try {
sessionSPI.rollbackTX(txID, true); sessionSPI.rollbackTX(txID, true);

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import javax.transaction.xa.Xid;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.qpid.proton.engine.Delivery;
/**
* AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
* or not. This class extends the Core TransactionImpl used for normal TX behaviour. In the case where deliveries
* have been settled, normal Ack rollback is applied. For cases where deliveries are unsettled and rolled back,
* we increment the delivery count and return to the consumer.
*/
public class ProtonTransactionImpl extends TransactionImpl {
/* We need to track the Message reference against the AMQP objects, so we can check whether the corresponding
deliveries have been settled. We also need to ensure we are settling on the correct link. Hence why we keep a ref
to the ProtonServerSenderContext here.
*/
private final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> deliveries = new HashMap<>();
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
super(xid, storageManager, timeoutSeconds);
}
@Override
public RefsOperation createRefsOperation(Queue queue) {
return new ProtonTransactionRefsOperation(queue, storageManager);
}
@Override
public void rollback() throws Exception {
super.rollback();
}
public void addDelivery(Delivery delivery, ProtonServerSenderContext context) {
deliveries.put(((MessageReference) delivery.getContext()), new Pair<>(delivery, context));
}
public Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> getDeliveries() {
return deliveries;
}
@Override
public void commit() throws Exception {
super.commit();
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
if (!p.getA().isSettled()) p.getB().settle(p.getA());
}
}
public boolean isDischarged() {
return discharged;
}
public void discharge() {
discharged = true;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.qpid.proton.engine.Delivery;
/**
* AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
* or not. This class extends the Core RefsOperation used for normal acks. In the case where deliveries have been
* settled, normal Ack rollback is applied. For cases where deliveries are unsettled and rolled back, we increment
* the delivery count and return to the consumer.
*/
public class ProtonTransactionRefsOperation extends RefsOperation {
public ProtonTransactionRefsOperation(final Queue queue, StorageManager storageManager) {
super(queue, storageManager);
}
@Override
public void rollbackRedelivery(Transaction txn, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
ProtonTransactionImpl tx = (ProtonTransactionImpl) txn;
if (tx.getDeliveries().containsKey(ref)) {
Delivery del = tx.getDeliveries().get(ref).getA();
ServerConsumer consumer = (ServerConsumer) tx.getDeliveries().get(ref).getB().getBrokerConsumer();
// Rollback normally if the delivery is not settled or a forced TX rollback is done (e.g. connection drop).
if (del.remotelySettled() || !tx.isDischarged()) {
super.rollbackRedelivery(tx, ref, timeBase, queueMap);
} else {
ref.incrementDeliveryCount();
consumer.backToDelivering(ref);
del.disposition(del.getLocalState() == null ? del.getDefaultDeliveryState() : del.getLocalState());
}
} else {
super.rollbackRedelivery(tx, ref, timeBase, queueMap);
}
}
}

View File

@ -89,18 +89,7 @@ public class RefsOperation extends TransactionOperationAbstract {
if (ref.isAlreadyAcked()) { if (ref.isAlreadyAcked()) {
ackedRefs.add(ref); ackedRefs.add(ref);
} }
// if ignore redelivery check, we just perform redelivery straight rollbackRedelivery(tx, ref, timeBase, queueMap);
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) {
toCancel = new LinkedList<>();
queueMap.put((QueueImpl) ref.getQueue(), toCancel);
}
toCancel.addFirst(ref);
}
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCheckingDLQ(e); ActiveMQServerLogger.LOGGER.errorCheckingDLQ(e);
} }
@ -145,6 +134,21 @@ public class RefsOperation extends TransactionOperationAbstract {
} }
} }
protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
// if ignore redelivery check, we just perform redelivery straight
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) {
toCancel = new LinkedList<>();
queueMap.put((QueueImpl) ref.getQueue(), toCancel);
}
toCancel.addFirst(ref);
}
}
@Override @Override
public void afterCommit(final Transaction tx) { public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) { for (MessageReference ref : refsToAck) {

View File

@ -99,12 +99,23 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ActiveMQServer server = createServer(true, true); ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server); serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration(); Configuration serverConfig = server.getConfiguration();
// Address 1
CoreAddressConfiguration address = new CoreAddressConfiguration(); CoreAddressConfiguration address = new CoreAddressConfiguration();
address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST); address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(); CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST); queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST);
address.getQueueConfigurations().add(queueConfig); address.getQueueConfigurations().add(queueConfig);
serverConfig.addAddressConfiguration(address); serverConfig.addAddressConfiguration(address);
// Address 2
CoreAddressConfiguration address2 = new CoreAddressConfiguration();
address2.setName(getTestName2()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration();
queueConfig2.setName(getTestName2()).setAddress(getTestName2()).setRoutingType(RoutingType.ANYCAST);
address2.getQueueConfigurations().add(queueConfig2);
serverConfig.addAddressConfiguration(address2);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false); serverConfig.setSecurityEnabled(false);
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations(); Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
@ -127,6 +138,10 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
return getName(); return getName();
} }
public String getTestName2() {
return getName() + "2";
}
public AmqpClientTestSupport() { public AmqpClientTestSupport() {
} }

View File

@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
receiver1.flow(1); receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
AmqpReceiver receiver2 = session.createReceiver(getTestName());
assertNotNull("did not receive message first time", message); assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId()); assertEquals("MessageID:0", message.getMessageId());
@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNotNull(protonMessage); assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
receiver2.flow(1);
message.release(); message.release();
// Read the message again and validate its state
AmqpReceiver receiver2 = session.createReceiver(getTestName()); // Read the message again and validate its state
receiver2.flow(1);
message = receiver2.receive(10, TimeUnit.SECONDS); message = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message); assertNotNull("did not receive message again", message);
assertEquals("MessageID:0", message.getMessageId()); assertEquals("MessageID:0", message.getMessageId());

View File

@ -514,7 +514,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Rollback the other half the consumed messages // Rollback the other half the consumed messages
txnSession.begin(); txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession); messages.get(i).accept(txnSession, false);
} }
txnSession.rollback(); txnSession.rollback();
@ -525,18 +525,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
message.release(); message.release();
} }
// Commit the other half the consumed messages
// This is a variation from the .NET client tests which doesn't settle the
// messages in the TX until commit is called but on ActiveMQ they will be
// redispatched regardless and not stay in the acquired state.
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
txnSession.commit();
// The final message should still be pending. // The final message should still be pending.
{ {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);

View File

@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String brokerName = "my-broker"; private static final String brokerName = "my-broker";
private static final long maxSizeBytes = 1 * 1024 * 1024; private static final long maxSizeBytes = 1 * 1024 * 1024;
@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase {
session.close(); session.close();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
//because tx commit is executed async on broker, we use a timed wait. //because tx commit is executed async on broker, we use a timed wait.
assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10)); assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
} }
@Test @Test
@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase {
session.rollback(); session.rollback();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
//because tx rollback is executed async on broker, we use a timed wait. //because tx rollback is executed async on broker, we use a timed wait.
assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10)); assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
} }
@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase {
return count; return count;
} }
} }
@Test
public void testReleaseDisposition() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(10);
AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(m1);
m1.release();
//receiver.flow(10);
AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(m2);
m2.accept();
} finally {
connection.close();
}
}
} }