diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java new file mode 100644 index 0000000000..426cfa2898 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RunnableEx.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +public interface RunnableEx { + void run() throws Exception; +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 9e54d418ba..08ea959d7d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; @@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.artemis.utils.RunnableEx; import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -78,6 +80,8 @@ public class AMQPSessionCallback implements SessionCallback { private final ProtonProtocolManager manager; + private final StorageManager storageManager; + private final AMQPConnectionContext connection; private final Connection transportConnection; @@ -100,6 +104,7 @@ public class AMQPSessionCallback implements SessionCallback { OperationContext operationContext) { this.protonSPI = protonSPI; this.manager = manager; + this.storageManager = manager.getServer().getStorageManager(); this.connection = connection; this.transportConnection = transportConnection; this.closeExecutor = executor; @@ -134,6 +139,24 @@ public class AMQPSessionCallback implements SessionCallback { } } + public void withinContext(RunnableEx run) throws Exception { + OperationContext context = recoverContext(); + try { + run.run(); + } finally { + resetContext(context); + } + } + + public void afterIO(IOCallback ioCallback) { + OperationContext context = recoverContext(); + try { + manager.getServer().getStorageManager().afterCompleteOperations(ioCallback); + } finally { + resetContext(context); + } + } + @Override public void browserFinished(ServerConsumer consumer) { @@ -315,11 +338,11 @@ public class AMQPSessionCallback implements SessionCallback { public void close() throws Exception { //need to check here as this can be called if init fails if (serverSession != null) { - recoverContext(); + OperationContext context = recoverContext(); try { serverSession.close(false); } finally { - resetContext(); + resetContext(context); } } } @@ -328,30 +351,30 @@ public class AMQPSessionCallback implements SessionCallback { if (transaction == null) { transaction = serverSession.getCurrentTransaction(); } - recoverContext(); + OperationContext oldContext = recoverContext(); try { ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID()); } finally { - resetContext(); + resetContext(oldContext); } } public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { - recoverContext(); + OperationContext oldContext = recoverContext(); try { ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); ((ServerConsumer) brokerConsumer).getQueue().forceDelivery(); } finally { - resetContext(); + resetContext(oldContext); } } public void reject(Object brokerConsumer, Message message) throws Exception { - recoverContext(); + OperationContext oldContext = recoverContext(); try { ((ServerConsumer) brokerConsumer).reject(message.getMessageID()); } finally { - resetContext(); + resetContext(oldContext); } } @@ -380,22 +403,26 @@ public class AMQPSessionCallback implements SessionCallback { } } - recoverContext(); + OperationContext oldcontext = recoverContext(); - PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); - if (store.isRejectingMessages()) { - // We drop pre-settled messages (and abort any associated Tx) - if (delivery.remotelySettled()) { - if (transaction != null) { - String amqpAddress = delivery.getLink().getTarget().getAddress(); - ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); - transaction.markAsRollbackOnly(e); + try { + PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); + if (store.isRejectingMessages()) { + // We drop pre-settled messages (and abort any associated Tx) + if (delivery.remotelySettled()) { + if (transaction != null) { + String amqpAddress = delivery.getLink().getTarget().getAddress(); + ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); + transaction.markAsRollbackOnly(e); + } + } else { + rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); } } else { - rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); + serverSend(transaction, message, delivery, receiver); } - } else { - serverSend(transaction, message, delivery, receiver); + } finally { + resetContext(oldcontext); } } @@ -406,61 +433,67 @@ public class AMQPSessionCallback implements SessionCallback { Rejected rejected = new Rejected(); rejected.setError(condition); - connection.lock(); - try { - delivery.disposition(rejected); - delivery.settle(); - } finally { - connection.unlock(); - } - connection.flush(); + afterIO(new IOCallback() { + @Override + public void done() { + connection.lock(); + try { + delivery.disposition(rejected); + delivery.settle(); + } finally { + connection.unlock(); + } + connection.flush(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }); + } private void serverSend(final Transaction transaction, final Message message, final Delivery delivery, final Receiver receiver) throws Exception { - try { + message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); - message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); + serverSession.send(transaction, message, false, false); - serverSession.send(transaction, message, false, false); + afterIO(new IOCallback() { + @Override + public void done() { + connection.lock(); + try { + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); - manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { - @Override - public void done() { - connection.lock(); - try { - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txAccepted = new TransactionalState(); - txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); - - delivery.disposition(txAccepted); - } else { - delivery.disposition(Accepted.getInstance()); - } - delivery.settle(); - } finally { - connection.unlock(); + delivery.disposition(txAccepted); + } else { + delivery.disposition(Accepted.getInstance()); } + delivery.settle(); + } finally { + connection.unlock(); + } + connection.flush(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + connection.lock(); + try { + receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); connection.flush(); + } finally { + connection.unlock(); } - - @Override - public void onError(int errorCode, String errorMessage) { - connection.lock(); - try { - receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); - connection.flush(); - } finally { - connection.unlock(); - } - } - }); - } finally { - resetContext(); - } + } + }); } public void offerProducerCredit(final String address, @@ -502,12 +535,15 @@ public class AMQPSessionCallback implements SessionCallback { manager.getServer().destroyQueue(new SimpleString(queueName)); } - private void resetContext() { - manager.getServer().getStorageManager().setContext(null); + public void resetContext(OperationContext oldContext) { + storageManager.setContext(oldContext); } - private void recoverContext() { + public OperationContext recoverContext() { + + OperationContext oldContext = storageManager.getContext(); manager.getServer().getStorageManager().setContext(serverSession.getSessionContext()); + return oldContext; } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java index 441f3a68a6..6aa8fda224 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java @@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory { Executor executor = server.getExecutorFactory().getExecutor(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ccc93b7845..4d8bf53225 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Consumer; @@ -486,6 +488,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return; } + OperationContext oldContext = sessionSPI.recoverContext(); + try { Message message = ((MessageReference) delivery.getContext()).getMessage(); @@ -590,7 +594,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // todo not sure if we need to do anything here } } finally { - connection.flush(); + sessionSPI.afterIO(new IOCallback() { + @Override + public void done() { + connection.flush(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + connection.flush(); + } + }); + + sessionSPI.resetContext(oldContext); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 4579f1c361..bf2e575355 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction; import java.nio.ByteBuffer; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; @@ -118,24 +119,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true); tx.discharge(); + IOCallback ioAction = new IOCallback() { + @Override + public void done() { + connection.lock(); + try { + delivery.disposition(new Accepted()); + } finally { + connection.unlock(); + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }; + if (discharge.getFail()) { - tx.rollback(); - connection.lock(); - try { - delivery.disposition(new Accepted()); - } finally { - connection.unlock(); - } - connection.flush(); + sessionSPI.withinContext(() -> tx.rollback()); + sessionSPI.afterIO(ioAction); } else { - tx.commit(); - connection.lock(); - try { - delivery.disposition(new Accepted()); - } finally { - connection.unlock(); - } - connection.flush(); + sessionSPI.withinContext(() -> tx.commit()); + sessionSPI.afterIO(ioAction); } } } catch (ActiveMQAMQPException amqpE) { @@ -157,13 +163,23 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } connection.flush(); } finally { - connection.lock(); - try { - delivery.settle(); - } finally { - connection.unlock(); - } - connection.flush(); + sessionSPI.afterIO(new IOCallback() { + @Override + public void done() { + connection.lock(); + try { + delivery.settle(); + } finally { + connection.unlock(); + } + connection.flush(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }); } }