ARTEMIS-2205 Refactor AMQP Processing into Netty Thread

These improvements were also part of this task:
- Routing is now cached as much as possible.
- A new Runnable is avoided for each individual message,
  since we use the Netty executor to perform delivery

https://issues.apache.org/jira/browse/ARTEMIS-2205
This commit is contained in:
Clebert Suconic 2018-12-17 09:11:54 -05:00 committed by Francesco Nigro
parent a40a459f8c
commit d79762fa04
49 changed files with 1512 additions and 725 deletions

View File

@ -43,3 +43,6 @@ JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -D
# Debug args: Uncomment to enable debug
#DEBUG_ARGS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
# Debug args: Uncomment for async profiler
#DEBUG_ARGS="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints"

View File

@ -183,6 +183,17 @@ public class TransportConfiguration implements Serializable {
return extraProps;
}
public Map<String, Object> getCombinedParams() {
Map<String, Object> combined = new HashMap<>();
if (params != null) {
combined.putAll(params);
}
if (extraProps != null) {
combined.putAll(extraProps);
}
return combined;
}
@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;

View File

@ -94,7 +94,12 @@ public class Wait {
public static void assertTrue(String failureMessage, Condition condition) throws Exception {
boolean result = waitFor(condition);
assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
}
public static void assertTrue(String failureMessage, Condition condition, final long duration) throws Exception {
boolean result = waitFor(condition, duration);
if (!result) {
Assert.fail(failureMessage);

View File

@ -73,7 +73,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
protected AMQPConnectionContext amqpConnection;
private final Executor closeExecutor;
private final Executor sessionExecutor;
private String remoteContainerId;
@ -85,15 +85,19 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public AMQPConnectionCallback(ProtonProtocolManager manager,
Connection connection,
Executor closeExecutor,
Executor sessionExecutor,
ActiveMQServer server) {
this.manager = manager;
this.connection = connection;
this.closeExecutor = closeExecutor;
this.sessionExecutor = sessionExecutor;
this.server = server;
saslMechanisms = manager.getSaslMechanisms();
}
public Connection getTransportConnection() {
return connection;
}
public String[] getSaslMechanisms() {
return saslMechanisms;
}
@ -213,7 +217,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext());
}
public void sendSASLSupported() {
@ -256,7 +260,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
public Binary newTransaction() {
XidImpl xid = newXID();
Binary binary = new Binary(xid.getGlobalTransactionId());
Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1, amqpConnection);
transactions.put(binary, transaction);
return binary;
}

View File

@ -16,10 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -40,15 +37,14 @@ import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@ -104,7 +100,8 @@ public class AMQPSessionCallback implements SessionCallback {
private final Executor sessionExecutor;
private final AtomicBoolean draining = new AtomicBoolean(false);
private final boolean directDeliver;
private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
@ -125,6 +122,7 @@ public class AMQPSessionCallback implements SessionCallback {
this.transportConnection = transportConnection;
this.sessionExecutor = executor;
this.operationContext = operationContext;
this.directDeliver = manager.isDirectDeliver();
}
@Override
@ -133,28 +131,6 @@ public class AMQPSessionCallback implements SessionCallback {
return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED;
}
public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
if (drain) {
// If the draining is already running, then don't do anything
if (draining.compareAndSet(false, true)) {
final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
serverConsumer.forceDelivery(1, new Runnable() {
@Override
public void run() {
try {
plugSender.reportDrained();
} finally {
draining.set(false);
}
}
});
}
} else {
serverConsumer.receiveCredits(-1);
}
}
public void withinContext(RunnableEx run) throws Exception {
OperationContext context = recoverContext();
try {
@ -180,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public boolean supportsDirectDelivery() {
return false;
return manager.isDirectDeliver();
}
public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
@ -347,7 +323,6 @@ public class AMQPSessionCallback implements SessionCallback {
return result;
}
public AddressQueryResult addressQuery(SimpleString addressName,
RoutingType routingType,
boolean autoCreate) throws Exception {
@ -373,41 +348,8 @@ public class AMQPSessionCallback implements SessionCallback {
}
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
consumer.close(false);
latch.countDown();
} catch (Exception e) {
}
}
};
// Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
// to avoid deadlocks the close has to be done outside of the main thread on an executor
// otherwise you could get a deadlock
Executor executor = protonSPI.getExeuctor();
if (executor != null) {
executor.execute(runnable);
} else {
runnable.run();
}
try {
// a short timeout will do.. 1 second is already long enough
if (!latch.await(1, TimeUnit.SECONDS)) {
logger.debug("Could not close consumer on time");
}
} catch (InterruptedException e) {
throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
}
consumer.close(false);
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}
@ -418,12 +360,19 @@ public class AMQPSessionCallback implements SessionCallback {
public void close() throws Exception {
//need to check here as this can be called if init fails
if (serverSession != null) {
OperationContext context = recoverContext();
try {
serverSession.close(false);
} finally {
resetContext(context);
}
// we cannot hold the nettyExecutor on this rollback here, otherwise other connections will be waiting
sessionExecutor.execute(() -> {
OperationContext context = recoverContext();
try {
try {
serverSession.close(false);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
} finally {
resetContext(context);
}
});
}
}
@ -468,7 +417,8 @@ public class AMQPSessionCallback implements SessionCallback {
final Delivery delivery,
SimpleString address,
int messageFormat,
ReadableBuffer data) throws Exception {
ReadableBuffer data,
RoutingContext routingContext) throws Exception {
AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools);
if (address != null) {
message.setAddress(address);
@ -503,7 +453,7 @@ public class AMQPSessionCallback implements SessionCallback {
rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
}
} else {
serverSend(transaction, message, delivery, receiver);
serverSend(context, transaction, message, delivery, receiver, routingContext);
}
} finally {
resetContext(oldcontext);
@ -520,14 +470,11 @@ public class AMQPSessionCallback implements SessionCallback {
afterIO(new IOCallback() {
@Override
public void done() {
connection.lock();
try {
connection.runLater(() -> {
delivery.disposition(rejected);
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
connection.flush();
});
}
@Override
@ -538,19 +485,20 @@ public class AMQPSessionCallback implements SessionCallback {
}
private void serverSend(final Transaction transaction,
private void serverSend(final ProtonServerReceiverContext context,
final Transaction transaction,
final Message message,
final Delivery delivery,
final Receiver receiver) throws Exception {
final Receiver receiver,
final RoutingContext routingContext) throws Exception {
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
serverSession.send(transaction, message, false, false);
serverSession.send(transaction, message, directDeliver, false, routingContext);
afterIO(new IOCallback() {
@Override
public void done() {
connection.lock();
try {
connection.runLater(() -> {
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
@ -561,21 +509,17 @@ public class AMQPSessionCallback implements SessionCallback {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
context.flow();
connection.flush();
});
}
@Override
public void onError(int errorCode, String errorMessage) {
connection.lock();
try {
connection.runNow(() -> {
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush();
} finally {
connection.unlock();
}
});
}
});
}
@ -635,15 +579,12 @@ public class AMQPSessionCallback implements SessionCallback {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
try {
return plugSender.deliverMessage(ref, deliveryCount, transportConnection);
return plugSender.deliverMessage(ref, consumer);
} catch (Exception e) {
connection.lock();
try {
connection.runNow(() -> {
plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
connection.flush();
} finally {
connection.unlock();
}
});
throw new IllegalStateException("Can't deliver message " + e, e);
}
@ -673,23 +614,22 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public void disconnect(ServerConsumer consumer, SimpleString queueName) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
connection.lock();
try {
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
connection.flush();
} catch (ActiveMQAMQPException e) {
logger.error("Error closing link for " + consumer.getQueue().getAddress());
} finally {
connection.unlock();
}
connection.runNow(() -> {
try {
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
connection.flush();
} catch (ActiveMQAMQPException e) {
logger.error("Error closing link for " + consumer.getQueue().getAddress());
}
});
}
@Override
public boolean hasCredits(ServerConsumer consumer) {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
if (plugSender != null && plugSender.getSender().getCredit() > 0) {
return true;
if (plugSender != null) {
return plugSender.hasCredits();
} else {
return false;
}
@ -757,6 +697,10 @@ public class AMQPSessionCallback implements SessionCallback {
this.transactionHandler = transactionHandler;
}
public Connection getTransportConnection() {
return transportConnection;
}
public ProtonTransactionHandler getTransactionHandler() {
return this.transactionHandler;
}
@ -782,4 +726,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
interface CreditRunnable extends Runnable {
boolean isRun();
}
}

View File

@ -122,7 +122,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
ErrorCondition errorCondition = new ErrorCondition();
errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
amqpConnection.close(errorCondition);
getTransportConnection().close();
// There's no need to flush, amqpConnection.close() is calling flush
// as long this semantic is kept no need to flush here
}
/**

View File

@ -77,6 +77,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private Long amqpIdleTimeout;
private boolean directDeliver = true;
/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
@ -131,6 +133,14 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
return this;
}
public boolean isDirectDeliver() {
return directDeliver;
}
public ProtonProtocolManager setDirectDeliver(boolean directDeliver) {
this.directDeliver = directDeliver;
return this;
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {

View File

@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
@ -31,12 +25,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExecutorNettyAdapter;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
@ -59,7 +57,11 @@ import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
@ -111,7 +113,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
EventLoop nettyExecutor;
if (connectionCallback.getTransportConnection() instanceof NettyConnection) {
nettyExecutor = ((NettyConnection) connectionCallback.getTransportConnection()).getNettyChannel().eventLoop();
} else {
nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor());
}
this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
handler.addEventHandler(this);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
@ -127,6 +135,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
public void requireInHandler() {
handler.requireHandler();
}
public void scheduledFlush() {
handler.scheduledFlush();
}
@ -159,35 +171,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
public void destroy() {
connectionCallback.close();
handler.runLater(() -> connectionCallback.close());
}
public boolean isSyncOnFlush() {
return false;
}
public boolean tryLock(long time, TimeUnit timeUnit) {
return handler.tryLock(time, timeUnit);
}
public void lock() {
handler.lock();
}
public void unlock() {
handler.unlock();
}
public int capacity() {
return handler.capacity();
}
public void flush() {
handler.flush();
}
public void close(ErrorCondition errorCondition) {
handler.close(errorCondition);
handler.close(errorCondition, this);
}
protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
@ -201,6 +197,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return sessionExtension;
}
public void runOnPool(Runnable run) {
handler.runOnPool(run);
}
public void runNow(Runnable run) {
handler.runNow(run);
}
public void runLater(Runnable run) {
handler.runLater(run);
}
protected boolean validateConnection(Connection connection) {
return connectionCallback.validateConnection(connection, handler.getSASLResult());
}
@ -224,6 +232,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
protected void initInternal() throws Exception {
}
public AMQPConnectionCallback getConnectionCallback() {
return connectionCallback;
}
protected void remoteLinkOpened(Link link) throws Exception {
AMQPSessionContext protonSession = getSessionExtension(link.getSession());
@ -314,7 +326,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
if (!connectionCallback.isSupportsAnonymous()) {
connectionCallback.sendSASLSupported();
connectionCallback.close();
handler.close(null);
handler.close(null, this);
}
}
}
@ -334,7 +346,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
connectionCallback.close();
handler.close(null);
handler.close(null, this);
}
@Override
@ -359,59 +371,73 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteOpen(Connection connection) throws Exception {
lock();
handler.requireHandler();
try {
try {
initInternal();
} catch (Exception e) {
log.error("Error init connection", e);
}
if (!validateConnection(connection)) {
connection.close();
} else {
connection.setContext(AMQPConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
} finally {
unlock();
initInternal();
} catch (Exception e) {
log.error("Error init connection", e);
}
if (!validateConnection(connection)) {
connection.close();
} else {
connection.setContext(AMQPConnectionContext.this);
connection.setContainer(containerId);
connection.setProperties(connectionProperties);
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
initialise();
/*
* This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
* but its here in case we add support for outbound connections.
* */
/*
* This can be null which is in effect an empty map, also we really don't need to check this for in bound connections
* but its here in case we add support for outbound connections.
* */
if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
long nextKeepAliveTime = handler.tick(true);
if (nextKeepAliveTime != 0 && scheduledPool != null) {
scheduledPool.schedule(new Runnable() {
@Override
public void run() {
Long rescheduleAt = handler.tick(false);
if (rescheduleAt == null) {
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
} else if (rescheduleAt != 0) {
scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
}
}
}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
}
}
}
class TickerRunnable implements Runnable {
final ScheduleRunnable scheduleRunnable;
TickerRunnable(ScheduleRunnable scheduleRunnable) {
this.scheduleRunnable = scheduleRunnable;
}
@Override
public void run() {
Long rescheduleAt = handler.tick(false);
if (rescheduleAt == null) {
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS);
} else if (rescheduleAt != 0) {
scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
}
}
}
class ScheduleRunnable implements Runnable {
TickerRunnable tickerRunnable = new TickerRunnable(this);
@Override
public void run() {
// The actual tick has to happen within a Netty Worker, to avoid requiring a lock
// this will also be used to flush the data directly into netty connection's executor
handler.runLater(tickerRunnable);
}
}
@Override
public void onRemoteClose(Connection connection) {
lock();
try {
connection.close();
connection.free();
} finally {
unlock();
}
handler.requireHandler();
connection.close();
connection.free();
for (AMQPSessionContext protonSession : sessions.values()) {
protonSession.close();
@ -430,31 +456,24 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteOpen(Session session) throws Exception {
handler.requireHandler();
getSessionExtension(session).initialise();
lock();
try {
session.open();
} finally {
unlock();
}
session.open();
}
@Override
public void onRemoteClose(Session session) throws Exception {
lock();
try {
handler.runLater(() -> {
session.close();
session.free();
} finally {
unlock();
}
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
if (sessionContext != null) {
sessionContext.close();
sessions.remove(session);
session.setContext(null);
}
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
if (sessionContext != null) {
sessionContext.close();
sessions.remove(session);
session.setContext(null);
}
});
}
@Override
@ -471,40 +490,42 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Link link) throws Exception {
lock();
try {
handler.requireHandler();
// We scheduled it for later, as that will work through anything that's pending on the current deliveries.
runNow(() -> {
link.close();
link.free();
} finally {
unlock();
}
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close(true);
}
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
try {
linkContext.close(true);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
flush();
});
}
@Override
public void onRemoteDetach(Link link) throws Exception {
boolean handleAsClose = link.getSource() != null
&& ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
handler.requireHandler();
boolean handleAsClose = link.getSource() != null && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH;
if (handleAsClose) {
onRemoteClose(link);
} else {
lock();
try {
link.detach();
link.free();
} finally {
unlock();
}
link.detach();
link.free();
}
}
@Override
public void onLocalDetach(Link link) throws Exception {
handler.requireHandler();
Object context = link.getContext();
if (context instanceof ProtonServerSenderContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
@ -514,6 +535,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onDelivery(Delivery delivery) throws Exception {
handler.requireHandler();
ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
if (handler != null) {
handler.onMessage(delivery);

View File

@ -150,13 +150,11 @@ public class AMQPSessionContext extends ProtonInitializable {
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
receiver.setContext(transactionHandler);
connection.lock();
try {
connection.runNow(() -> {
receiver.open();
receiver.flow(connection.getAmqpCredits());
} finally {
connection.unlock();
}
connection.flush();
});
}
public void addSender(Sender sender) throws Exception {
@ -169,24 +167,20 @@ public class AMQPSessionContext extends ProtonInitializable {
senders.put(sender, protonSender);
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
sender.setContext(protonSender);
connection.lock();
try {
connection.runNow(() -> {
sender.open();
} finally {
connection.unlock();
}
connection.flush();
});
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
connection.lock();
try {
connection.runNow(() -> {
sender.close();
} finally {
connection.unlock();
}
connection.flush();
});
}
}
@ -206,22 +200,18 @@ public class AMQPSessionContext extends ProtonInitializable {
ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
sessionSPI.addProducer(serverProducer);
receiver.setContext(protonReceiver);
connection.lock();
try {
connection.runNow(() -> {
receiver.open();
} finally {
connection.unlock();
}
connection.flush();
});
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
connection.lock();
try {
connection.runNow(() -> {
receiver.close();
} finally {
connection.unlock();
}
connection.flush();
});
}
}
}

View File

@ -25,7 +25,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -49,6 +51,9 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
/**
* This is the equivalent for the ServerProducer
*/
public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
@ -63,35 +68,43 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final AMQPSessionCallback sessionSPI;
/** We create this AtomicRunnable with setRan.
* This is because we always reuse the same instance.
* In case the creditRunnable was run, we reset and send it over.
* We set it as ran as the first one should always go through */
RoutingContext routingContext = new RoutingContextImpl(null);
/**
* We create this AtomicRunnable with setRan.
* This is because we always reuse the same instance.
* In case the creditRunnable was run, we reset and send it over.
* We set it as ran as the first one should always go through
*/
protected final AtomicRunnable creditRunnable;
/**
* This Credit Runnable may be used in Mock tests to simulate the credit semantic here
*/
public static AtomicRunnable createCreditRunnable(int refill,
int threshold,
Receiver receiver,
AMQPConnectionContext connection) {
Runnable creditRunnable = () -> {
/** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
connection.requireInHandler();
if (receiver.getCredit() <= threshold) {
int topUp = refill - receiver.getCredit();
if (topUp > 0) {
// System.out.println("Sending " + topUp + " towards client");
receiver.flow(topUp);
connection.flush();
}
}
};
return new AtomicRunnable() {
@Override
public void atomicRun() {
connection.lock();
try {
if (receiver.getCredit() <= threshold) {
int topUp = refill - receiver.getCredit();
if (topUp > 0) {
receiver.flow(topUp);
}
}
} finally {
connection.unlock();
}
connection.flush();
connection.runNow(creditRunnable);
}
};
}
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
@ -249,41 +262,46 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
*/
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
try {
Receiver receiver = ((Receiver) delivery.getLink());
connection.requireInHandler();
Receiver receiver = ((Receiver) delivery.getLink());
if (receiver.current() != delivery) {
return;
}
if (receiver.current() != delivery) {
return;
}
if (delivery.isAborted()) {
// Aborting implicitly remotely settles, so advance
// receiver to the next delivery and settle locally.
receiver.advance();
delivery.settle();
// Replenish the credit if not doing a drain
if (!receiver.getDrain()) {
receiver.flow(1);
}
return;
} else if (delivery.isPartial()) {
return;
}
Transaction tx = null;
ReadableBuffer data = receiver.recv();
if (delivery.isAborted()) {
// Aborting implicitly remotely settles, so advance
// receiver to the next delivery and settle locally.
receiver.advance();
delivery.settle();
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
// Replenish the credit if not doing a drain
if (!receiver.getDrain()) {
receiver.flow(1);
}
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
return;
} else if (delivery.isPartial()) {
return;
}
flow();
ReadableBuffer data = receiver.recv();
receiver.advance();
Transaction tx = null;
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
}
final Transaction txUsed = tx;
actualDelivery(delivery, receiver, data, txUsed);
}
private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer data, Transaction tx) {
try {
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
} catch (Exception e) {
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
@ -294,13 +312,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} else {
condition.setCondition(Symbol.valueOf("failed"));
}
connection.runLater(() -> {
condition.setDescription(e.getMessage());
rejected.setError(condition);
condition.setDescription(e.getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
flow();
connection.flush();
});
delivery.disposition(rejected);
delivery.settle();
flow();
}
}
@ -324,6 +346,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
public void flow() {
connection.requireInHandler();
if (!creditRunnable.isRun()) {
return; // nothing to be done as the previous one did not run yet
}
@ -339,13 +362,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
public void drain(int credits) {
connection.lock();
try {
connection.runNow(() -> {
receiver.drain(credits);
} finally {
connection.unlock();
}
connection.flush();
connection.flush();
});
}
public int drained() {

View File

@ -20,7 +20,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -32,7 +33,10 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -49,7 +53,6 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
@ -74,9 +77,9 @@ import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
* This is the Equivalent for the ServerConsumer
*/
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback {
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
@ -104,6 +107,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean isVolatile = false;
private boolean preSettle;
private SimpleString tempQueueName;
private final AtomicBoolean draining = new AtomicBoolean(false);
private int credits = 0;
private AtomicInteger pending = new AtomicInteger(0);
/**
* The model proton uses requires us to hold a lock in certain times
* to sync the credits we have versus the credits that are being held in proton
* */
private final Object creditsLock = new Object();
public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
@ -122,7 +135,51 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
@Override
public void onFlow(int currentCredits, boolean drain) {
sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
connection.requireInHandler();
setupCredit();
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer;
if (drain) {
// If the draining is already running, then don't do anything
if (draining.compareAndSet(false, true)) {
final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext();
serverConsumer.forceDelivery(1, new Runnable() {
@Override
public void run() {
try {
connection.runNow(() -> {
plugSender.reportDrained();
setupCredit();
});
} finally {
draining.set(false);
}
}
});
}
} else {
serverConsumer.receiveCredits(-1);
}
}
public boolean hasCredits() {
if (!connection.flowControl(brokerConsumer::promptDelivery)) {
return false;
}
synchronized (creditsLock) {
return credits > 0 && sender.getLocalState() != EndpointState.CLOSED;
}
}
private void setupCredit() {
synchronized (creditsLock) {
this.credits = sender.getCredit() - pending.get();
if (credits < 0) {
credits = 0;
}
}
}
public Sender getSender() {
@ -469,20 +526,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sender.setCondition(condition);
}
protonSession.removeSender(sender);
connection.lock();
try {
sender.close();
} finally {
connection.unlock();
}
connection.flush();
try {
sessionSPI.closeSender(brokerConsumer);
} catch (Exception e) {
log.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage());
}
connection.runLater(() -> {
sender.close();
try {
sessionSPI.closeSender(brokerConsumer);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
sender.close();
connection.flush();
});
}
/*
@ -666,12 +720,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
public void settle(Delivery delivery) {
connection.lock();
try {
delivery.settle();
} finally {
connection.unlock();
}
connection.requireInHandler();
delivery.settle();
}
public synchronized void checkState() {
@ -681,42 +731,59 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/**
* handle an out going message from ActiveMQ Artemis, send via the Proton Sender
*/
public int deliverMessage(MessageReference messageReference, int deliveryCount, Connection transportConnection) throws Exception {
public int deliverMessage(final MessageReference messageReference, final ServerConsumer consumer) throws Exception {
if (closed) {
return 0;
}
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
// we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
// Let the Message decide how to present the message bytes
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
boolean releaseRequired = sendBuffer instanceof NettyReadable;
try {
int size = sendBuffer.remaining();
while (!connection.tryLock(1, TimeUnit.SECONDS)) {
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
// If we're waiting on the connection lock, the link might be in the process of closing. If this happens
// we return.
synchronized (creditsLock) {
if (sender.getLocalState() == EndpointState.CLOSED) {
return 0;
} else {
if (log.isDebugEnabled()) {
log.debug("Couldn't get lock on deliverMessage " + this);
}
}
pending.incrementAndGet();
credits--;
}
if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
messageReference.setCallback(this);
connection.runNow((Runnable)messageReference);
} else {
connection.runNow(() -> executeDelivery(messageReference));
}
// This is because on AMQP we only send messages based in credits, not bytes
return 1;
} finally {
}
}
@Override
public void executeDelivery(MessageReference messageReference) {
try {
if (sender.getLocalState() == EndpointState.CLOSED) {
log.debug("Not delivering message " + messageReference + " as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times");
return;
}
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection());
// Let the Message decide how to present the message bytes
ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount());
// we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
boolean releaseRequired = sendBuffer instanceof NettyReadable;
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
try {
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
if (releaseRequired) {
sender.send(sendBuffer);
@ -730,7 +797,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
try {
sessionSPI.ack(null, brokerConsumer, messageReference.getMessage());
} catch (Exception e) {
log.debug(e.getMessage(), e);
}
delivery.settle();
} else {
sender.advance();
@ -738,14 +809,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
connection.flush();
} finally {
connection.unlock();
}
return size;
} finally {
if (releaseRequired) {
((NettyReadable) sendBuffer).getByteBuf().release();
synchronized (creditsLock) {
pending.decrementAndGet();
}
if (releaseRequired) {
((NettyReadable) sendBuffer).getByteBuf().release();
}
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
brokerConsumer.errorProcessing(e, messageReference);
}
}
@ -806,13 +879,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
* Update link state to reflect that the previous drain attempt has completed.
*/
public void reportDrained() {
connection.lock();
try {
sender.drained();
} finally {
connection.unlock();
}
connection.requireInHandler();
sender.drained();
connection.flush();
}
}

View File

@ -0,0 +1,221 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ProgressivePromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
/** Test cases may supply a simple executor instead of the real Netty Executor
* On that case this is a simple adapter for what's needed from these tests.
* Not intended to be used in production.
*
* TODO: This could be refactored out of the main codebase but at a high cost.
* We may do it some day if we find an easy way that won't clutter the code too much.
* */
public class ExecutorNettyAdapter implements EventLoop {
final ArtemisExecutor executor;
public ExecutorNettyAdapter(ArtemisExecutor executor) {
this.executor = executor;
}
@Override
public EventLoopGroup parent() {
return null;
}
@Override
public EventLoop next() {
return null;
}
@Override
public ChannelFuture register(Channel channel) {
return null;
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return null;
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return null;
}
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return false;
}
@Override
public <V> Promise<V> newPromise() {
return null;
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return null;
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return null;
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return null;
}
@Override
public boolean isShuttingDown() {
return false;
}
@Override
public Future<?> shutdownGracefully() {
return null;
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return null;
}
@Override
public Future<?> terminationFuture() {
return null;
}
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public Iterator<EventExecutor> iterator() {
return null;
}
@Override
public Future<?> submit(Runnable task) {
execute(task);
return null;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
execute(task);
return null;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return null;
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return null;
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return null;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return null;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(Runnable command) {
executor.execute(command);
}
}

View File

@ -16,22 +16,24 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import javax.security.auth.Subject;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
@ -46,9 +48,6 @@ import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
public class ProtonHandler extends ProtonInitializable implements SaslListener {
private static final Logger log = Logger.getLogger(ProtonHandler.class);
@ -68,8 +67,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
private ServerSASL chosenMechanism;
private ClientSASL clientSASLMechanism;
private final ReentrantLock lock = new ReentrantLock();
private final long creationTime;
private final boolean isServer;
@ -80,17 +77,20 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
protected boolean receivedFirstPacket = false;
private final Executor flushExecutor;
private final EventLoop workerExecutor;
private final ArtemisExecutor poolExecutor;
protected final ReadyListener readyListener;
boolean inDispatch = false;
public ProtonHandler(Executor flushExecutor, boolean isServer) {
this.flushExecutor = flushExecutor;
this.readyListener = () -> this.flushExecutor.execute(() -> {
flush();
});
boolean scheduledFlush = false;
public ProtonHandler(EventLoop workerExecutor, ArtemisExecutor poolExecutor, boolean isServer) {
this.workerExecutor = workerExecutor;
this.poolExecutor = poolExecutor;
this.readyListener = () -> runLater(this::flush);
this.creationTime = System.currentTimeMillis();
this.isServer = isServer;
@ -106,45 +106,33 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
}
public Long tick(boolean firstTick) {
if (firstTick) {
// the first tick needs to guarantee a lock here
lock.lock();
} else {
if (!lock.tryLock()) {
log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
// if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here
return null;
}
}
try {
if (!firstTick) {
try {
if (connection.getLocalState() != EndpointState.CLOSED) {
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
if (transport.isClosed()) {
throw new IllegalStateException("Channel was inactive for to long");
}
return rescheduleAt;
requireHandler();
if (!firstTick) {
try {
if (connection.getLocalState() != EndpointState.CLOSED) {
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
if (transport.isClosed()) {
throw new IllegalStateException("Channel was inactive for to long");
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
transport.close();
connection.setCondition(new ErrorCondition());
return rescheduleAt;
}
return 0L;
} catch (Exception e) {
log.warn(e.getMessage(), e);
transport.close();
connection.setCondition(new ErrorCondition());
} finally {
flush();
}
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
} finally {
lock.unlock();
flushBytes();
return 0L;
}
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
}
/**
* We cannot flush until the initial handshake was finished.
* If this happens before the handshake, the connection response will happen without SASL
* and the client will respond and fail with an invalid code.
* */
*/
public void scheduledFlush() {
if (receivedFirstPacket) {
flush();
@ -152,29 +140,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
}
public int capacity() {
lock.lock();
try {
return transport.capacity();
} finally {
lock.unlock();
}
requireHandler();
return transport.capacity();
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
public boolean tryLock(long time, TimeUnit timeUnit) {
try {
return lock.tryLock(time, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
public void requireHandler() {
if (!workerExecutor.inEventLoop()) {
new Exception("saco!!!").printStackTrace();
// this should not happen unless there is an obvious programming error
log.warn("Using inHandler is required", new Exception("trace"));
System.exit(-1);
throw new IllegalStateException("this method requires to be called within the handler, use the executor");
}
}
@ -192,21 +168,34 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
}
public void createServerSASL(String[] mechanisms) {
requireHandler();
Sasl sasl = transport.sasl();
sasl.server();
sasl.setMechanisms(mechanisms);
sasl.setListener(this);
}
public void flushBytes() {
requireHandler();
if (!scheduledFlush) {
scheduledFlush = true;
workerExecutor.execute(this::actualFlush);
}
}
private void actualFlush() {
requireHandler();
for (EventHandler handler : handlers) {
if (!handler.flowControl(readyListener)) {
scheduledFlush = false;
return;
}
}
lock.lock();
try {
while (true) {
ByteBuffer head = transport.head();
@ -227,7 +216,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
transport.pop(pending);
}
} finally {
lock.unlock();
scheduledFlush = false;
}
}
@ -236,36 +225,32 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
}
public void inputBuffer(ByteBuf buffer) {
requireHandler();
dataReceived = true;
lock.lock();
try {
while (buffer.readableBytes() > 0) {
int capacity = transport.capacity();
while (buffer.readableBytes() > 0) {
int capacity = transport.capacity();
if (!receivedFirstPacket) {
handleFirstPacket(buffer);
// there is a chance that if SASL Handshake has been carried out that the capacity may change.
capacity = transport.capacity();
}
if (capacity > 0) {
ByteBuffer tail = transport.tail();
int min = Math.min(capacity, buffer.readableBytes());
tail.limit(min);
buffer.readBytes(tail);
flush();
} else {
if (capacity == 0) {
log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
} else {
log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
}
break;
}
if (!receivedFirstPacket) {
handleFirstPacket(buffer);
// there is a chance that if SASL Handshake has been carried out that the capacity may change.
capacity = transport.capacity();
}
if (capacity > 0) {
ByteBuffer tail = transport.tail();
int min = Math.min(capacity, buffer.readableBytes());
tail.limit(min);
buffer.readBytes(tail);
flush();
} else {
if (capacity == 0) {
log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
} else {
log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
}
break;
}
} finally {
lock.unlock();
}
}
@ -281,29 +266,55 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
return creationTime;
}
public void flush() {
lock.lock();
try {
transport.process();
} finally {
lock.unlock();
}
dispatch();
public void runOnPool(Runnable runnable) {
poolExecutor.execute(runnable);
}
public void close(ErrorCondition errorCondition) {
lock.lock();
try {
public void runNow(Runnable runnable) {
if (workerExecutor.inEventLoop()) {
runnable.run();
} else {
workerExecutor.execute(runnable);
}
}
public void runLater(Runnable runnable) {
workerExecutor.execute(runnable);
}
public void flush() {
if (workerExecutor.inEventLoop()) {
transport.process();
dispatch();
} else {
runLater(() -> {
transport.process();
dispatch();
});
}
}
public void close(ErrorCondition errorCondition, AMQPConnectionContext connectionContext) {
runNow(() -> {
if (errorCondition != null) {
connection.setCondition(errorCondition);
}
connection.close();
} finally {
lock.unlock();
}
flush();
});
flush();
/*try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} */
// this needs to be done in two steps
// we first flush what we have to the client
// after flushed, we close the local connection
// otherwise this could close the netty connection before the Writable is complete
runLater(() -> {
connectionContext.getConnectionCallback().getTransportConnection().close();
});
}
// server side SASL Listener
@ -462,45 +473,59 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
private void dispatch() {
Event ev;
lock.lock();
if (inDispatch) {
// Avoid recursion from events
return;
}
try {
if (inDispatch) {
// Avoid recursion from events
return;
}
try {
inDispatch = true;
while ((ev = collector.peek()) != null) {
for (EventHandler h : handlers) {
if (log.isTraceEnabled()) {
log.trace("Handling " + ev + " towards " + h);
}
try {
Events.dispatch(ev, h);
} catch (Exception e) {
log.warn(e.getMessage(), e);
ErrorCondition error = new ErrorCondition();
error.setCondition(AmqpError.INTERNAL_ERROR);
error.setDescription("Unrecoverable error: " +
(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
connection.setCondition(error);
connection.close();
}
inDispatch = true;
while ((ev = collector.peek()) != null) {
for (EventHandler h : handlers) {
if (log.isTraceEnabled()) {
log.trace("Handling " + ev + " towards " + h);
}
try {
Events.dispatch(ev, h);
} catch (Exception e) {
log.warn(e.getMessage(), e);
ErrorCondition error = new ErrorCondition();
error.setCondition(AmqpError.INTERNAL_ERROR);
error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
connection.setCondition(error);
connection.close();
}
collector.pop();
}
} finally {
inDispatch = false;
collector.pop();
}
} finally {
lock.unlock();
inDispatch = false;
}
flushBytes();
}
public void handleError(Exception e) {
if (workerExecutor.inEventLoop()) {
internalHandlerError(e);
} else {
runLater(() -> internalHandlerError(e));
}
}
private void internalHandlerError(Exception e) {
log.warn(e.getMessage(), e);
ErrorCondition error = new ErrorCondition();
error.setCondition(AmqpError.INTERNAL_ERROR);
error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
connection.setCondition(error);
connection.close();
flush();
}
public void open(String containerId, Map<Symbol, Object> connectionProperties) {
this.transport.open();
this.connection.setContainer(containerId);

View File

@ -107,14 +107,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
IOCallback ioAction = new IOCallback() {
@Override
public void done() {
connection.lock();
try {
connection.runLater(() -> {
delivery.settle();
delivery.disposition(declared);
} finally {
connection.unlock();
connection.flush();
}
});
}
@Override
@ -133,15 +130,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
IOCallback ioAction = new IOCallback() {
@Override
public void done() {
connection.lock();
try {
connection.runLater(() -> {
delivery.settle();
delivery.disposition(new Accepted());
currentTx = null;
} finally {
connection.unlock();
connection.flush();
}
});
}
@Override

View File

@ -25,11 +25,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.qpid.proton.engine.Delivery;
/**
* AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled
* or not. This class extends the Core TransactionImpl used for normal TX behaviour. In the case where deliveries
@ -46,8 +48,22 @@ public class ProtonTransactionImpl extends TransactionImpl {
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
super(xid, storageManager, timeoutSeconds);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
super.afterCommit(tx);
connection.runNow(() -> {
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
if (!p.getA().isSettled())
p.getB().settle(p.getA());
}
connection.flush();
});
}
});
}
@Override
@ -71,11 +87,6 @@ public class ProtonTransactionImpl extends TransactionImpl {
@Override
public void commit() throws Exception {
super.commit();
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
if (!p.getA().isSettled()) p.getB().settle(p.getA());
}
}
public boolean isDischarged() {

View File

@ -16,13 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.never;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -34,29 +27,65 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.never;
public class AMQPSessionCallbackTest {
@Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Rule
public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock private AMQPConnectionCallback protonSPI;
@Mock private ProtonProtocolManager manager;
@Mock private AMQPConnectionContext connection;
@Mock private Connection transportConnection;
@Mock private Executor executor;
@Mock private OperationContext operationContext;
@Mock private Receiver receiver;
@Mock private ActiveMQServer server;
@Mock private PagingManager pagingManager;
@Mock private PagingStore pagingStore;
@Mock
private AMQPConnectionCallback protonSPI;
@Mock
private ProtonProtocolManager manager;
@Mock
private AMQPConnectionContext connection;
@Mock
private Connection transportConnection;
@Mock
private Executor executor;
@Mock
private OperationContext operationContext;
@Mock
private Receiver receiver;
@Mock
private ActiveMQServer server;
@Mock
private PagingManager pagingManager;
@Mock
private PagingStore pagingStore;
@Before
public void setRule() {
// The connection will call the runnable now on this mock, as these would happen on a different thread.
Mockito.doAnswer(new Answer() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
((Runnable) invocation.getArguments()[0]).run();
return null;
}
}).when(connection).runNow(Mockito.isA(Runnable.class));
}
/**
* Test that the AMQPSessionCallback grants no credit when not at threshold
@ -69,8 +98,7 @@ public class AMQPSessionCallbackTest {
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(
protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
@ -100,8 +128,7 @@ public class AMQPSessionCallbackTest {
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(
protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@ -132,8 +159,7 @@ public class AMQPSessionCallbackTest {
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(
protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
@ -164,8 +190,7 @@ public class AMQPSessionCallbackTest {
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(
protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@ -195,8 +220,7 @@ public class AMQPSessionCallbackTest {
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(
protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
@ -227,8 +251,7 @@ public class AMQPSessionCallbackTest {
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(
protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
@ -31,7 +32,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.jboss.logging.Logger;
public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference {
public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference, Runnable {
private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class);
@ -74,6 +75,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private long messageSize = -1;
private MessageReferenceCallback callback;
@Override
public Object getProtocolData() {
return protocolData;
@ -89,6 +92,23 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return getPagedMessage().getMessage();
}
@Override
public void setCallback(MessageReferenceCallback callback) {
this.callback = callback;
}
@Override
public void run() {
MessageReferenceCallback callback = this.callback;
try {
if (callback != null) {
callback.executeDelivery(this);
}
} finally {
this.callback = null;
}
}
@Override
public synchronized PagedMessage getPagedMessage() {
PagedMessage returnMessage = message != null ? message.get() : null;

View File

@ -26,6 +26,10 @@ import org.apache.activemq.artemis.core.server.group.UnproposalListener;
public interface Binding extends UnproposalListener {
default boolean isLocal() {
return false;
}
SimpleString getAddress();
Bindable getBindable();

View File

@ -26,6 +26,9 @@ import org.apache.activemq.artemis.core.server.group.UnproposalListener;
public interface Bindings extends UnproposalListener {
// this is to inform the parent there was an udpate on the bindings
void updated(QueueBinding binding);
Collection<Binding> getBindings();
void addBinding(Binding binding);

View File

@ -26,12 +26,14 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -63,6 +65,13 @@ public final class BindingsImpl implements Bindings {
private final SimpleString name;
private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE);
/**
* This has a version about adds and removes
*/
private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet());
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
this.groupingHandler = groupingHandler;
this.name = name;
@ -92,61 +101,78 @@ public final class BindingsImpl implements Bindings {
@Override
public void addBinding(final Binding binding) {
if (logger.isTraceEnabled()) {
logger.trace("addBinding(" + binding + ") being called");
}
if (binding.isExclusive()) {
exclusiveBindings.add(binding);
} else {
SimpleString routingName = binding.getRoutingName();
try {
if (logger.isTraceEnabled()) {
logger.trace("addBinding(" + binding + ") being called");
}
if (binding.isExclusive()) {
exclusiveBindings.add(binding);
} else {
SimpleString routingName = binding.getRoutingName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings == null) {
bindings = new CopyOnWriteArrayList<>();
if (bindings == null) {
bindings = new CopyOnWriteArrayList<>();
List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
if (oldBindings != null) {
bindings = oldBindings;
if (oldBindings != null) {
bindings = oldBindings;
}
}
if (!bindings.contains(binding)) {
bindings.add(binding);
}
}
if (!bindings.contains(binding)) {
bindings.add(binding);
bindingsMap.put(binding.getID(), binding);
if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
}
}
bindingsMap.put(binding.getID(), binding);
if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
} finally {
updated();
}
}
@Override
public void updated(QueueBinding binding) {
updated();
}
private void updated() {
version.set(sequenceVersion.incrementAndGet());
}
@Override
public void removeBinding(final Binding binding) {
if (binding.isExclusive()) {
exclusiveBindings.remove(binding);
} else {
SimpleString routingName = binding.getRoutingName();
try {
if (binding.isExclusive()) {
exclusiveBindings.remove(binding);
} else {
SimpleString routingName = binding.getRoutingName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings != null) {
bindings.remove(binding);
if (bindings != null) {
bindings.remove(binding);
if (bindings.isEmpty()) {
routingNameBindingMap.remove(routingName);
if (bindings.isEmpty()) {
routingNameBindingMap.remove(routingName);
}
}
}
}
bindingsMap.remove(binding.getID());
bindingsMap.remove(binding.getID());
if (logger.isTraceEnabled()) {
logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
if (logger.isTraceEnabled()) {
logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
}
} finally {
updated();
}
}
@ -267,11 +293,9 @@ public final class BindingsImpl implements Bindings {
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
}
if (!routed) {
// Remove the ids now, in order to avoid double check
ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
@ -280,30 +304,53 @@ public final class BindingsImpl implements Bindings {
SimpleString groupId = message.getGroupID();
if (ids != null) {
context.clear();
routeFromCluster(message, context, ids);
} else if (groupingHandler != null && groupRouting && groupId != null) {
context.clear();
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else {
if (logger.isTraceEnabled()) {
logger.trace("Routing message " + message + " on binding=" + this);
// in a optimization, we are reusing the previous context if everything is right for it
// so the simpleRouting will only happen if neededk
if (!context.isReusable(message, version.get())) {
context.clear();
simpleRouting(message, context);
}
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
SimpleString routingName = entry.getKey();
}
}
}
List<Binding> bindings = entry.getValue();
private void simpleRouting(Message message, RoutingContext context) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Routing message " + message + " on binding=" + this);
}
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
// We check at the version before we started routing,
// this is because if something changed in between we want to check the correct version
int currentVersion = version.get();
Binding theBinding = getNextBinding(message, routingName, bindings);
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
SimpleString routingName = entry.getKey();
if (theBinding != null) {
theBinding.route(message, context);
}
}
List<Binding> bindings = entry.getValue();
if (bindings == null) {
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
Binding theBinding = getNextBinding(message, routingName, bindings);
if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) {
context.setReusable(true, currentVersion);
} else {
// notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it
context.setReusable(false, currentVersion);
}
if (theBinding != null) {
theBinding.route(message, context);
}
}
}

View File

@ -46,6 +46,11 @@ public class LocalQueueBinding implements QueueBinding {
clusterName = queue.getName().concat(nodeID);
}
@Override
public boolean isLocal() {
return true;
}
@Override
public long getID() {
return queue.getID();

View File

@ -485,82 +485,91 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return null;
}
final Queue queue = queueBinding.getQueue();
Bindings bindingsOnQueue = addressManager.getBindingsForRoutingAddress(queueBinding.getAddress());
boolean changed = false;
try {
//validate update
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
final int consumerCount = queue.getConsumerCount();
if (consumerCount > maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
final Queue queue = queueBinding.getQueue();
boolean changed = false;
//validate update
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
final int consumerCount = queue.getConsumerCount();
if (consumerCount > maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
}
}
}
if (routingType != null) {
final SimpleString address = queue.getAddress();
final AddressInfo addressInfo = addressManager.getAddressInfo(address);
final EnumSet<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
if (!addressRoutingTypes.contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
if (routingType != null) {
final SimpleString address = queue.getAddress();
final AddressInfo addressInfo = addressManager.getAddressInfo(address);
final EnumSet<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
if (!addressRoutingTypes.contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
}
}
}
//atomic update
if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
changed = true;
queue.setMaxConsumer(maxConsumers);
}
if (routingType != null && queue.getRoutingType() != routingType) {
changed = true;
queue.setRoutingType(routingType);
}
if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
changed = true;
queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
}
if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) {
changed = true;
queue.setExclusive(exclusive);
}
if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
changed = true;
queue.setNonDestructive(nonDestructive);
}
if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
changed = true;
queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
}
if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
changed = true;
queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
}
if (filter != null && !filter.equals(queue.getFilter())) {
changed = true;
queue.setFilter(filter);
}
if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
changed = true;
queue.setConfigurationManaged(configurationManaged);
}
if (logger.isDebugEnabled()) {
if (user == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user");
//atomic update
if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) {
changed = true;
queue.setMaxConsumer(maxConsumers);
}
if (routingType != null && queue.getRoutingType() != routingType) {
changed = true;
queue.setRoutingType(routingType);
}
if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) {
changed = true;
queue.setPurgeOnNoConsumers(purgeOnNoConsumers);
}
if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) {
changed = true;
queue.setExclusive(exclusive);
}
if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
changed = true;
queue.setNonDestructive(nonDestructive);
}
if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
changed = true;
queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
}
if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
changed = true;
queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
}
if (filter != null && !filter.equals(queue.getFilter())) {
changed = true;
queue.setFilter(filter);
}
if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
changed = true;
queue.setConfigurationManaged(configurationManaged);
}
if (logger.isDebugEnabled()) {
if (user == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user");
}
}
if (user != null && !user.equals(queue.getUser())) {
changed = true;
queue.setUser(user);
}
}
if (user != null && !user.equals(queue.getUser())) {
changed = true;
queue.setUser(user);
}
if (changed) {
final long txID = storageManager.generateID();
try {
storageManager.updateQueueBinding(txID, queueBinding);
storageManager.commitBindings(txID);
} catch (Throwable throwable) {
storageManager.rollback(txID);
logger.warn(throwable.getMessage(), throwable);
throw throwable;
if (changed) {
final long txID = storageManager.generateID();
try {
storageManager.updateQueueBinding(txID, queueBinding);
storageManager.commitBindings(txID);
} catch (Throwable throwable) {
storageManager.rollback(txID);
logger.warn(throwable.getMessage(), throwable);
throw throwable;
}
}
} finally {
if (bindingsOnQueue != null) {
bindingsOnQueue.updated(queueBinding);
}
}
@ -876,6 +885,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AddressInfo addressInfo = addressManager.getAddressInfo(address);
if (bindingMove != null) {
context.clear();
bindingMove.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
@ -1341,7 +1351,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void done() {
addReferences(refs, direct);
context.processReferences(refs, direct);
}
});
}
@ -1476,16 +1486,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return true;
}
/**
* @param refs
*/
private void addReferences(final List<MessageReference> refs, final boolean direct) {
for (MessageReference ref : refs) {
ref.getQueue().addTail(ref, direct);
}
}
/**
/**
* The expiry scanner can't be started until the whole server has been started other wise you may get races
*/
@Override

View File

@ -263,7 +263,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
for (Entry<String, ProtocolManagerFactory> entry : selectedProtocolFactories.entrySet()) {
selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getCombinedParams(), incomingInterceptors, outgoingInterceptors));
}
acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);

View File

@ -1048,7 +1048,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222151, value = "removing consumer which did not handle a message, consumer={0}, message={1}",
format = Message.Format.MESSAGE_FORMAT)
void removingBadConsumer(@Cause Throwable e, Consumer consumer, MessageReference reference);
void removingBadConsumer(@Cause Throwable e, Consumer consumer, Object reference);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222152, value = "Unable to decrement reference counting on queue",

View File

@ -46,6 +46,10 @@ public interface Consumer {
*/
HandleStatus handle(MessageReference reference) throws Exception;
/** wakes up internal threads to deliver more messages */
default void promptDelivery() {
}
/**
* This will proceed with the actual delivery.
* Notice that handle should hold a readLock and proceedDelivery should release the readLock
@ -80,4 +84,8 @@ public interface Consumer {
/** an unique sequential ID for this consumer */
long sequentialID();
default void errorProcessing(Throwable e, MessageReference reference) {
}
}

View File

@ -44,6 +44,8 @@ public interface MessageReference {
SimpleString getLastValueProperty();
void setCallback(MessageReferenceCallback callback);
/**
* We define this method aggregation here because on paging we need to hold the original estimate,
* so we need to perform some extra steps on paging.

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
/** This is to be used in cases where a message delivery happens on an executor.
* Most MessageReference implementations will allow execution, and if it does,
* and the protocol requires an execution per message, this callback may be used.
*
* At the time of this implementation only AMQP was used. */
public interface MessageReferenceCallback {
void executeDelivery(MessageReference reference);
}

View File

@ -53,6 +53,10 @@ public interface Queue extends Bindable,CriticalComponent {
void setRoutingType(RoutingType routingType);
/** the current queue and consumer settings will allow use of the Reference Execution and callback.
* This is because */
boolean allowsReferenceCallback();
boolean isDurable();
/**
@ -392,4 +396,8 @@ public interface Queue extends Bindable,CriticalComponent {
/** This is to perform a check on the counter again */
void recheckRefCount(OperationContext context);
default void errorProcessing(Consumer consumer, Throwable t, MessageReference messageReference) {
}
}

View File

@ -26,6 +26,24 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public interface RoutingContext {
/*
This will return true if the RoutingContext can be reused
false if it cannot
null, if we don't know.
Once false, it can't be set to true
*/
boolean isReusable();
int getPreviousBindingsVersion();
SimpleString getPreviousAddress();
void setReusable(boolean reusable);
RoutingContext setReusable(boolean reusable, int version);
Transaction getTransaction();
void setTransaction(Transaction transaction);
@ -54,5 +72,16 @@ public interface RoutingContext {
SimpleString getAddress(Message message);
SimpleString getAddress();
RoutingType getRoutingType();
RoutingType getPreviousRoutingType();
void processReferences(List<MessageReference> refs, boolean direct);
boolean isReusable(Message message, int version);
}

View File

@ -31,6 +31,10 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void fireSlowConsumer();
/** the current queue settings will allow use of the Reference Execution and callback.
* This is because */
boolean allowReferenceCallback();
/**
* this is to be used with anything specific on a protocol head.
*/
@ -105,6 +109,4 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
long getCreationTime();
String getSessionID();
void promptDelivery();
}

View File

@ -22,6 +22,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.Message;
@ -43,6 +44,8 @@ public interface ServerSession extends SecurityAuth {
Object getConnectionID();
Executor getSessionExecutor();
/**
* Certain protocols may create an internal session that shouldn't go through security checks.
* make sure you don't expose this property through any protocol layer as that would be a security breach
@ -241,12 +244,26 @@ public interface ServerSession extends SecurityAuth {
boolean direct,
boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception;
RoutingStatus doSend(Transaction tx,
Message msg,
SimpleString originalAddress,
boolean direct,
boolean noAutoCreateQueue) throws Exception;
RoutingStatus doSend(Transaction tx,
Message msg,
SimpleString originalAddress,
boolean direct,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception;
RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(Message message, boolean direct) throws Exception;

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -146,6 +147,11 @@ public class LastValueQueue extends QueueImpl {
}
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
MessageReference oldRef = hr.getReference();
@ -231,6 +237,11 @@ public class LastValueQueue extends QueueImpl {
this.ref = ref;
}
@Override
public void setCallback(MessageReferenceCallback callback) {
// HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
}
MessageReference getReference() {
return ref;
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -30,7 +31,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
/**
* Implementation of a MessageReference
*/
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MessageReferenceImpl.class, "deliveryCount");
@ -54,6 +55,8 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
private Object protocolData;
private MessageReferenceCallback callback;
// Static --------------------------------------------------------
private static final int memoryOffset = 64;
@ -84,6 +87,24 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
// MessageReference implementation -------------------------------
@Override
public void setCallback(MessageReferenceCallback callback) {
this.callback = callback;
}
@Override
public void run() {
MessageReferenceCallback callback = this.callback;
try {
if (callback != null) {
callback.executeDelivery(this);
}
} finally {
this.callback = null;
}
}
@Override
public Object getProtocolData() {
return protocolData;

View File

@ -535,6 +535,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Bindable implementation -------------------------------------------------------------------------------------
@Override
public boolean allowsReferenceCallback() {
// non descructive queues will reuse the same reference between multiple consumers
// so you cannot really use the callback from the MessageReference
return !nonDestructive;
}
public SimpleString getRoutingName() {
return name;
}
@ -627,8 +634,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
if (purgeOnNoConsumers && getConsumerCount() == 0) {
return;
if (purgeOnNoConsumers) {
context.setReusable(false);
if (getConsumerCount() == 0) {
return;
}
}
context.addQueue(address, this);
}
@ -849,11 +859,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Go into direct delivery mode
directDeliver = supportsDirectDeliver;
if (logger.isTraceEnabled()) {
logger.trace("Setting direct deliverer to " + supportsDirectDeliver);
logger.trace("Setting direct deliverer to " + supportsDirectDeliver + " on queue " + this.getName());
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Couldn't set direct deliver back");
logger.trace("Couldn't set direct deliver back on queue " + this.getName());
}
}
}
@ -1414,6 +1424,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (nonDestructive && reason == AckReason.NORMAL) {
decDelivering(ref);
if (logger.isDebugEnabled()) {
logger.debug("acknowledge ignored nonDestructive=true and reason=NORMAL");
}
@ -3141,6 +3152,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
break;
}
}
if (logger.isTraceEnabled()) {
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
}
return false;
}
}
@ -3160,24 +3175,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
try {
consumer.proceedDeliver(reference);
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
synchronized (this) {
// If the consumer throws an exception we remove the consumer
try {
removeConsumer(consumer);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
}
// The message failed to be delivered, hence we try again
addHead(reference, false);
}
errorProcessing(consumer, t, reference);
} finally {
deliveriesInTransit.countDown();
}
}
/** This will print errors and decide what to do with the errored consumer from the protocol layer. */
@Override
public void errorProcessing(Consumer consumer, Throwable t, MessageReference reference) {
synchronized (this) {
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
// If the consumer throws an exception we remove the consumer
try {
removeConsumer(consumer);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
}
// The message failed to be delivered, hence we try again
addHead(reference, false);
}
}
private boolean checkExpired(final MessageReference reference) {
try {
if (reference.getMessage().isExpired()) {

View File

@ -20,9 +20,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -40,19 +42,68 @@ public final class RoutingContextImpl implements RoutingContext {
private SimpleString address;
private SimpleString previousAddress;
private RoutingType previousRoutingType;
private RoutingType routingType;
Boolean reusable = null;
volatile int version;
private final Executor executor;
public RoutingContextImpl(final Transaction transaction) {
this(transaction, null);
}
public RoutingContextImpl(final Transaction transaction, Executor executor) {
this.transaction = transaction;
this.executor = executor;
}
@Override
public boolean isReusable() {
return reusable != null && reusable;
}
@Override
public int getPreviousBindingsVersion() {
return version;
}
@Override
public SimpleString getPreviousAddress() {
return previousAddress;
}
@Override
public void setReusable(boolean reusable) {
this.reusable = reusable;
}
@Override
public RoutingContext setReusable(boolean reusable, int previousBindings) {
this.version = previousBindings;
this.previousAddress = address;
this.previousRoutingType = routingType;
if (this.reusable != null && !this.reusable.booleanValue()) {
// cannot set to Reusable once it was set to false
return this;
}
this.reusable = reusable;
return this;
}
@Override
public void clear() {
transaction = null;
map.clear();
queueCount = 0;
this.version = 0;
this.reusable = null;
}
@Override
@ -69,6 +120,18 @@ public final class RoutingContextImpl implements RoutingContext {
queueCount++;
}
@Override
public void processReferences(final List<MessageReference> refs, final boolean direct) {
internalprocessReferences(refs, direct);
}
private void internalprocessReferences(final List<MessageReference> refs, final boolean direct) {
for (MessageReference ref : refs) {
ref.getQueue().addTail(ref, direct);
}
}
@Override
public void addQueueWithAck(SimpleString address, Queue queue) {
addQueue(address, queue);
@ -82,6 +145,11 @@ public final class RoutingContextImpl implements RoutingContext {
return listing == null ? false : listing.isAlreadyAcked(queue);
}
@Override
public boolean isReusable(Message message, int version) {
return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType && getPreviousBindingsVersion() == version;
}
@Override
public void setAddress(SimpleString address) {
this.address = address;
@ -100,11 +168,21 @@ public final class RoutingContextImpl implements RoutingContext {
return address;
}
@Override
public SimpleString getAddress() {
return address;
}
@Override
public RoutingType getRoutingType() {
return routingType;
}
@Override
public RoutingType getPreviousRoutingType() {
return previousRoutingType;
}
@Override
public RouteContextList getContextListing(SimpleString address) {
RouteContextList listing = map.get(address);

View File

@ -256,6 +256,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// ----------------------------------------------------------------------
@Override
public boolean allowReferenceCallback() {
if (browseOnly) {
return false;
} else {
return messageQueue.allowsReferenceCallback();
}
}
@Override
public long sequentialID() {
return sequentialID;
@ -346,6 +355,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return callback.supportsDirectDelivery();
}
@Override
public void errorProcessing(Throwable e, MessageReference deliveryObject) {
messageQueue.errorProcessing(this, e, deliveryObject);
}
@Override
public HandleStatus handle(final MessageReference ref) throws Exception {
@ -582,13 +595,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
reference.setDeliveryCount(0);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
applyPrefixForLegacyConsumer(forcedDeliveryMessage);
callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
callback.sendMessage(reference, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
});
}
@ -949,7 +963,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} catch (ActiveMQException e) {
if (startedTransaction) {
tx.rollback();
} else {
} else if (tx != null) {
tx.markAsRollbackOnly(e);
}
throw e;
@ -958,7 +972,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
if (startedTransaction) {
tx.rollback();
} else {
} else if (tx != null) {
tx.markAsRollbackOnly(hqex);
}
throw hqex;

View File

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.Closeable;
@ -190,6 +191,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private Set<Closeable> closeables;
private final Executor sessionExecutor;
public ServerSessionImpl(final String name,
final String username,
final String password,
@ -264,6 +267,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.addFailureListener(this);
this.context = context;
this.sessionExecutor = server.getExecutorFactory().getExecutor();
if (!xa) {
tx = newTransaction();
}
@ -283,6 +288,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
this.closeables.add(closeable);
}
@Override
public Executor getSessionExecutor() {
return sessionExecutor;
}
@Override
public void disableSecurity() {
this.securityEnabled = false;
@ -1467,12 +1477,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return lsm;
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
final boolean direct,
boolean noAutoCreateQueue) throws Exception {
return send(tx, msg, direct, noAutoCreateQueue, routingContext);
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
final boolean direct,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception {
final Message message;
if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) {
@ -1527,7 +1545,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue);
result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext);
}
} catch (Exception e) {
@ -1766,7 +1784,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
reply.setAddress(replyTo);
doSend(tx, reply, null, direct, false);
doSend(tx, reply, null, direct, false, routingContext);
}
return RoutingStatus.OK;
@ -1823,12 +1841,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
theTx.rollback();
}
@Override
public synchronized RoutingStatus doSend(final Transaction tx,
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
return doSend(tx, msg, originalAddress, direct, noAutoCreateQueue, routingContext);
}
@Override
public synchronized RoutingStatus doSend(final Transaction tx,
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue,
final RoutingContext routingContext) throws Exception {
RoutingStatus result = RoutingStatus.OK;
@ -1861,6 +1891,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
if (tx == null || autoCommitSends) {
routingContext.setTransaction(null);
} else {
routingContext.setTransaction(tx);
}
@ -1880,7 +1911,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
value.getB().incrementAndGet();
}
} finally {
routingContext.clear();
if (!routingContext.isReusable()) {
routingContext.clear();
}
}
return result;
}

View File

@ -793,6 +793,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
@Override
public int getConsumersBeforeDispatch() {
return 0;

View File

@ -250,7 +250,7 @@ public class AddressingTest extends ActiveMQTestBase {
// there are no consumers so no messages should be routed to the queue
producer.send(session.createMessage(true));
assertEquals(0, queue.getMessageCount());
Wait.assertEquals(0, queue::getMessageCount);
}
@Test

View File

@ -138,7 +138,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -74,7 +75,8 @@ public class AmqpFlowControlFailTest extends JMSClientTestSupport {
}
receiver.close();
session2.close();
assertEquals(1000, sender.getSender().getCredit());
Wait.assertEquals(1000, sender.getSender()::getCredit);
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[100];

View File

@ -47,6 +47,7 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Sender;
import org.jgroups.util.UUID;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1154,4 +1155,52 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testReceiveRejecting() throws Exception {
final int MSG_COUNT = 1000;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = getQueueName();
AmqpSender sender = session.createSender(address);
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + i);
sender.send(message);
}
Queue queueView = getProxyToQueue(address);
for (int i = 0; i < MSG_COUNT; i++) {
final AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(MSG_COUNT);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
Assert.assertEquals("msg" + i, received.getMessageId());
received.accept();
receiver.close();
}
final AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(MSG_COUNT);
Assert.assertNull(receiver.receive(1, TimeUnit.MILLISECONDS));
Wait.assertEquals(0, queueView::getDeliveringCount);
connection.close();
}
}

View File

@ -766,7 +766,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// We should have now drained the Queue
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message = receiver.receive(1, TimeUnit.SECONDS);
if (message != null) {
System.out.println("Read message: " + message.getApplicationProperty("msgId"));
}

View File

@ -324,7 +324,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
TextMessage msg = (TextMessage) consumer.receive(200);
TextMessage msg = (TextMessage) consumer.receive(2000);
assertNotNull(msg);
consumer.close();
}
@ -336,7 +336,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
TextMessage msg = (TextMessage) consumer.receive(200);
TextMessage msg = (TextMessage) consumer.receive(2000);
assertNull(msg);
consumer.close();
}
@ -349,8 +349,8 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
MessageConsumer consumer = createConsumer(consumerConnection, queueName);
MessageConsumer consumer2 = createConsumer(consumerConnection2, queueName);
TextMessage msg = (TextMessage) consumer.receive(200);
TextMessage msg2 = (TextMessage) consumer2.receive(200);
TextMessage msg = (TextMessage) consumer.receive(2000);
TextMessage msg2 = (TextMessage) consumer2.receive(2000);
assertNotNull(msg);
assertNotNull(msg2);

View File

@ -45,6 +45,11 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
public boolean allowReferenceCallback() {
return false;
}
@Override
public Object getProtocolData() {
return null;

View File

@ -500,10 +500,16 @@ public class ConsumerTest extends ActiveMQTestBase {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
if (durable) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
long time = System.currentTimeMillis();
int NUMBER_OF_MESSAGES = 100;
int NUMBER_OF_MESSAGES = durable ? 500 : 5000;
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage msg = session.createTextMessage("hello " + i);
msg.setIntProperty("mycount", i);

View File

@ -241,6 +241,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
addressSettingsRepository, executor, server, null);
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
@Override
public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter) throws Exception {
latchDelete.countDown();

View File

@ -83,6 +83,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
@Override
public boolean isExclusive() {
// no-op

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
@ -334,6 +335,10 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
public void unproposed(SimpleString groupID) {
}
@Override
public void updated(QueueBinding binding) {
}
@Override
public boolean redistribute(Message message,
Queue originatingQueue,