ARTEMIS-1118 IO callbacks on AMQP

This commit is contained in:
Clebert Suconic 2017-04-17 23:21:43 -04:00
parent 807e4e5d9c
commit 31d78eddf1
5 changed files with 181 additions and 91 deletions

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
public interface RunnableEx {
void run() throws Exception;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -53,6 +54,7 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -78,6 +80,8 @@ public class AMQPSessionCallback implements SessionCallback {
private final ProtonProtocolManager manager; private final ProtonProtocolManager manager;
private final StorageManager storageManager;
private final AMQPConnectionContext connection; private final AMQPConnectionContext connection;
private final Connection transportConnection; private final Connection transportConnection;
@ -100,6 +104,7 @@ public class AMQPSessionCallback implements SessionCallback {
OperationContext operationContext) { OperationContext operationContext) {
this.protonSPI = protonSPI; this.protonSPI = protonSPI;
this.manager = manager; this.manager = manager;
this.storageManager = manager.getServer().getStorageManager();
this.connection = connection; this.connection = connection;
this.transportConnection = transportConnection; this.transportConnection = transportConnection;
this.closeExecutor = executor; this.closeExecutor = executor;
@ -134,6 +139,24 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
public void withinContext(RunnableEx run) throws Exception {
OperationContext context = recoverContext();
try {
run.run();
} finally {
resetContext(context);
}
}
public void afterIO(IOCallback ioCallback) {
OperationContext context = recoverContext();
try {
manager.getServer().getStorageManager().afterCompleteOperations(ioCallback);
} finally {
resetContext(context);
}
}
@Override @Override
public void browserFinished(ServerConsumer consumer) { public void browserFinished(ServerConsumer consumer) {
@ -315,11 +338,11 @@ public class AMQPSessionCallback implements SessionCallback {
public void close() throws Exception { public void close() throws Exception {
//need to check here as this can be called if init fails //need to check here as this can be called if init fails
if (serverSession != null) { if (serverSession != null) {
recoverContext(); OperationContext context = recoverContext();
try { try {
serverSession.close(false); serverSession.close(false);
} finally { } finally {
resetContext(); resetContext(context);
} }
} }
} }
@ -328,30 +351,30 @@ public class AMQPSessionCallback implements SessionCallback {
if (transaction == null) { if (transaction == null) {
transaction = serverSession.getCurrentTransaction(); transaction = serverSession.getCurrentTransaction();
} }
recoverContext(); OperationContext oldContext = recoverContext();
try { try {
((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID()); ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID());
} finally { } finally {
resetContext(); resetContext(oldContext);
} }
} }
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
recoverContext(); OperationContext oldContext = recoverContext();
try { try {
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery(); ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally { } finally {
resetContext(); resetContext(oldContext);
} }
} }
public void reject(Object brokerConsumer, Message message) throws Exception { public void reject(Object brokerConsumer, Message message) throws Exception {
recoverContext(); OperationContext oldContext = recoverContext();
try { try {
((ServerConsumer) brokerConsumer).reject(message.getMessageID()); ((ServerConsumer) brokerConsumer).reject(message.getMessageID());
} finally { } finally {
resetContext(); resetContext(oldContext);
} }
} }
@ -380,22 +403,26 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
recoverContext(); OperationContext oldcontext = recoverContext();
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); try {
if (store.isRejectingMessages()) { PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
// We drop pre-settled messages (and abort any associated Tx) if (store.isRejectingMessages()) {
if (delivery.remotelySettled()) { // We drop pre-settled messages (and abort any associated Tx)
if (transaction != null) { if (delivery.remotelySettled()) {
String amqpAddress = delivery.getLink().getTarget().getAddress(); if (transaction != null) {
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); String amqpAddress = delivery.getLink().getTarget().getAddress();
transaction.markAsRollbackOnly(e); ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
transaction.markAsRollbackOnly(e);
}
} else {
rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
} }
} else { } else {
rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); serverSend(transaction, message, delivery, receiver);
} }
} else { } finally {
serverSend(transaction, message, delivery, receiver); resetContext(oldcontext);
} }
} }
@ -406,61 +433,67 @@ public class AMQPSessionCallback implements SessionCallback {
Rejected rejected = new Rejected(); Rejected rejected = new Rejected();
rejected.setError(condition); rejected.setError(condition);
connection.lock(); afterIO(new IOCallback() {
try { @Override
delivery.disposition(rejected); public void done() {
delivery.settle(); connection.lock();
} finally { try {
connection.unlock(); delivery.disposition(rejected);
} delivery.settle();
connection.flush(); } finally {
connection.unlock();
}
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} }
private void serverSend(final Transaction transaction, private void serverSend(final Transaction transaction,
final Message message, final Message message,
final Delivery delivery, final Delivery delivery,
final Receiver receiver) throws Exception { final Receiver receiver) throws Exception {
try { message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); serverSession.send(transaction, message, false, false);
serverSession.send(transaction, message, false, false); afterIO(new IOCallback() {
@Override
public void done() {
connection.lock();
try {
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { delivery.disposition(txAccepted);
@Override } else {
public void done() { delivery.disposition(Accepted.getInstance());
connection.lock();
try {
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId());
delivery.disposition(txAccepted);
} else {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
} finally {
connection.unlock();
} }
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
connection.lock();
try {
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush(); connection.flush();
} finally {
connection.unlock();
} }
}
@Override });
public void onError(int errorCode, String errorMessage) {
connection.lock();
try {
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush();
} finally {
connection.unlock();
}
}
});
} finally {
resetContext();
}
} }
public void offerProducerCredit(final String address, public void offerProducerCredit(final String address,
@ -502,12 +535,15 @@ public class AMQPSessionCallback implements SessionCallback {
manager.getServer().destroyQueue(new SimpleString(queueName)); manager.getServer().destroyQueue(new SimpleString(queueName));
} }
private void resetContext() { public void resetContext(OperationContext oldContext) {
manager.getServer().getStorageManager().setContext(null); storageManager.setContext(oldContext);
} }
private void recoverContext() { public OperationContext recoverContext() {
OperationContext oldContext = storageManager.getContext();
manager.getServer().getStorageManager().setContext(serverSession.getSessionContext()); manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
return oldContext;
} }
@Override @Override

View File

@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool()); AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
eventHandler.ifPresent(amqpConnection::addEventHandler); eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);

View File

@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
@ -486,6 +488,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return; return;
} }
OperationContext oldContext = sessionSPI.recoverContext();
try { try {
Message message = ((MessageReference) delivery.getContext()).getMessage(); Message message = ((MessageReference) delivery.getContext()).getMessage();
@ -590,7 +594,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// todo not sure if we need to do anything here // todo not sure if we need to do anything here
} }
} finally { } finally {
connection.flush(); sessionSPI.afterIO(new IOCallback() {
@Override
public void done() {
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
connection.flush();
}
});
sessionSPI.resetContext(oldContext);
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@ -118,24 +119,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true); ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true);
tx.discharge(); tx.discharge();
IOCallback ioAction = new IOCallback() {
@Override
public void done() {
connection.lock();
try {
delivery.disposition(new Accepted());
} finally {
connection.unlock();
}
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
if (discharge.getFail()) { if (discharge.getFail()) {
tx.rollback(); sessionSPI.withinContext(() -> tx.rollback());
connection.lock(); sessionSPI.afterIO(ioAction);
try {
delivery.disposition(new Accepted());
} finally {
connection.unlock();
}
connection.flush();
} else { } else {
tx.commit(); sessionSPI.withinContext(() -> tx.commit());
connection.lock(); sessionSPI.afterIO(ioAction);
try {
delivery.disposition(new Accepted());
} finally {
connection.unlock();
}
connection.flush();
} }
} }
} catch (ActiveMQAMQPException amqpE) { } catch (ActiveMQAMQPException amqpE) {
@ -157,13 +163,23 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
} }
connection.flush(); connection.flush();
} finally { } finally {
connection.lock(); sessionSPI.afterIO(new IOCallback() {
try { @Override
delivery.settle(); public void done() {
} finally { connection.lock();
connection.unlock(); try {
} delivery.settle();
connection.flush(); } finally {
connection.unlock();
}
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} }
} }