ARTEMIS-1012 Fix AMQP Transaction Retirement

This commit is contained in:
Martyn Taylor 2017-03-02 21:06:41 +00:00 committed by Clebert Suconic
parent 543dd4c9e3
commit 456e2a65e2
11 changed files with 222 additions and 46 deletions

View File

@ -35,12 +35,12 @@ import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
@ -220,7 +220,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public Binary newTransaction() {
XidImpl xid = newXID();
Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
transactions.put(xid, transaction);
return new Binary(xid.getGlobalTransactionId());
}

View File

@ -40,7 +40,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@ -49,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -326,7 +326,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
recoverContext();
try {
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);;
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext();
@ -487,7 +487,7 @@ public class AMQPSessionCallback implements SessionCallback {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
try {
return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount);
return plugSender.deliverMessage(ref, deliveryCount);
} catch (Exception e) {
synchronized (connection.getLock()) {
plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
@ -563,6 +563,10 @@ public class AMQPSessionCallback implements SessionCallback {
protonSPI.removeTransaction(txid);
}
public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
}
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, routingType);
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.Arrays;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -35,9 +38,6 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
import java.util.Arrays;
import java.util.List;
public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);

View File

@ -30,17 +30,19 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
@ -474,26 +476,29 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (closed) {
return;
}
Message message = (Message)delivery.getContext();
Message message = ((MessageReference) delivery.getContext()).getMessage();
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
DeliveryState remoteState = delivery.getRemoteState();
boolean settleImmediate = true;
if (remoteState != null) {
// If we are transactional then we need ack if the msg has been accepted
if (remoteState instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) remoteState;
Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId());
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId());
if (txState.getOutcome() != null) {
settleImmediate = false;
Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) {
if (!delivery.remotelySettled()) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(txState.getTxnId());
delivery.disposition(txAccepted);
}
// we have to individual ack as we can't guarantee we will get the delivery
@ -501,6 +506,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// from dealer, a perf hit but a must
try {
sessionSPI.ack(tx, brokerConsumer, message);
tx.addDelivery(delivery, this);
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
@ -550,16 +556,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
protonSession.replaceTag(delivery.getTag());
}
synchronized (connection.getLock()) {
delivery.settle();
sender.offer(1);
}
if (settleImmediate) settle(delivery);
} else {
// todo not sure if we need to do anything here
}
}
public void settle(Delivery delivery) {
synchronized (connection.getLock()) {
delivery.settle();
}
}
public synchronized void checkState() {
sessionSPI.resumeDelivery(brokerConsumer);
}
@ -567,7 +576,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/**
* handle an out going message from ActiveMQ Artemis, send via the Proton Sender
*/
public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception {
public int deliverMessage(MessageReference messageReference, int deliveryCount) throws Exception {
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
if (closed) {
return 0;
}
@ -602,14 +613,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(message);
delivery.setContext(messageReference);
// this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
sessionSPI.ack(null, brokerConsumer, message);
sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
delivery.settle();
} else {
sender.advance();

View File

@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary;
@ -75,11 +76,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Declared declared = new Declared();
declared.setTxnId(txID);
delivery.disposition(declared);
delivery.settle();
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
Binary txID = discharge.getTxnId();
sessionSPI.dischargeTx(txID);
if (discharge.getFail()) {
try {
sessionSPI.rollbackTX(txID, true);

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import javax.transaction.xa.Xid;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.qpid.proton.engine.Delivery;
/**
* AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
* or not. This class extends the Core TransactionImpl used for normal TX behaviour. In the case where deliveries
* have been settled, normal Ack rollback is applied. For cases where deliveries are unsettled and rolled back,
* we increment the delivery count and return to the consumer.
*/
public class ProtonTransactionImpl extends TransactionImpl {
/* We need to track the Message reference against the AMQP objects, so we can check whether the corresponding
deliveries have been settled. We also need to ensure we are settling on the correct link. Hence why we keep a ref
to the ProtonServerSenderContext here.
*/
private final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> deliveries = new HashMap<>();
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
super(xid, storageManager, timeoutSeconds);
}
@Override
public RefsOperation createRefsOperation(Queue queue) {
return new ProtonTransactionRefsOperation(queue, storageManager);
}
@Override
public void rollback() throws Exception {
super.rollback();
}
public void addDelivery(Delivery delivery, ProtonServerSenderContext context) {
deliveries.put(((MessageReference) delivery.getContext()), new Pair<>(delivery, context));
}
public Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> getDeliveries() {
return deliveries;
}
@Override
public void commit() throws Exception {
super.commit();
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
if (!p.getA().isSettled()) p.getB().settle(p.getA());
}
}
public boolean isDischarged() {
return discharged;
}
public void discharge() {
discharged = true;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.qpid.proton.engine.Delivery;
/**
* AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
* or not. This class extends the Core RefsOperation used for normal acks. In the case where deliveries have been
* settled, normal Ack rollback is applied. For cases where deliveries are unsettled and rolled back, we increment
* the delivery count and return to the consumer.
*/
public class ProtonTransactionRefsOperation extends RefsOperation {
public ProtonTransactionRefsOperation(final Queue queue, StorageManager storageManager) {
super(queue, storageManager);
}
@Override
public void rollbackRedelivery(Transaction txn, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
ProtonTransactionImpl tx = (ProtonTransactionImpl) txn;
if (tx.getDeliveries().containsKey(ref)) {
Delivery del = tx.getDeliveries().get(ref).getA();
ServerConsumer consumer = (ServerConsumer) tx.getDeliveries().get(ref).getB().getBrokerConsumer();
// Rollback normally if the delivery is not settled or a forced TX rollback is done (e.g. connection drop).
if (del.remotelySettled() || !tx.isDischarged()) {
super.rollbackRedelivery(tx, ref, timeBase, queueMap);
} else {
ref.incrementDeliveryCount();
consumer.backToDelivering(ref);
del.disposition(del.getLocalState() == null ? del.getDefaultDeliveryState() : del.getLocalState());
}
} else {
super.rollbackRedelivery(tx, ref, timeBase, queueMap);
}
}
}

View File

@ -89,18 +89,7 @@ public class RefsOperation extends TransactionOperationAbstract {
if (ref.isAlreadyAcked()) {
ackedRefs.add(ref);
}
// if ignore redelivery check, we just perform redelivery straight
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) {
toCancel = new LinkedList<>();
queueMap.put((QueueImpl) ref.getQueue(), toCancel);
}
toCancel.addFirst(ref);
}
rollbackRedelivery(tx, ref, timeBase, queueMap);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCheckingDLQ(e);
}
@ -145,6 +134,21 @@ public class RefsOperation extends TransactionOperationAbstract {
}
}
protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
// if ignore redelivery check, we just perform redelivery straight
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) {
toCancel = new LinkedList<>();
queueMap.put((QueueImpl) ref.getQueue(), toCancel);
}
toCancel.addFirst(ref);
}
}
@Override
public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) {

View File

@ -99,12 +99,23 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
// Address 1
CoreAddressConfiguration address = new CoreAddressConfiguration();
address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST);
address.getQueueConfigurations().add(queueConfig);
serverConfig.addAddressConfiguration(address);
// Address 2
CoreAddressConfiguration address2 = new CoreAddressConfiguration();
address2.setName(getTestName2()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration();
queueConfig2.setName(getTestName2()).setAddress(getTestName2()).setRoutingType(RoutingType.ANYCAST);
address2.getQueueConfigurations().add(queueConfig2);
serverConfig.addAddressConfiguration(address2);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
@ -127,6 +138,10 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
return getName();
}
public String getTestName2() {
return getName() + "2";
}
public AmqpClientTestSupport() {
}

View File

@ -514,7 +514,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Rollback the other half the consumed messages
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
messages.get(i).accept(txnSession);
messages.get(i).accept(txnSession, false);
}
txnSession.rollback();
@ -525,18 +525,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
message.release();
}
// Commit the other half the consumed messages
// This is a variation from the .NET client tests which doesn't settle the
// messages in the TX until commit is called but on ActiveMQ they will be
// redispatched regardless and not stay in the acquired state.
txnSession.begin();
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
txnSession.commit();
// The final message should still be pending.
{
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);