From a40a459f8c536a10a0dccae6e522ec38f09dd544 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Sat, 3 Mar 2018 18:58:21 +0100 Subject: [PATCH 1/3] ARTEMIS-2205 Netty is used in a more idiomatic way This helped decreasing a lot of pressure on GC by not creating as many runnables for each write. Besides this helps fixing some of the issues I would have had on refactoring AMQP over flushing writes and other asynchronous issues. --- .../remoting/impl/netty/NettyConnection.java | 125 +++++------------- 1 file changed, 35 insertions(+), 90 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index f8195fbdce..1032a352bb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import io.netty.buffer.ByteBuf; @@ -60,19 +59,12 @@ public class NettyConnection implements Connection { * here for when the connection (or Netty Channel) becomes available again. */ private final List readyListeners = new ArrayList<>(); - private final ThreadLocal> localListenersPool = ThreadLocal.withInitial(ArrayList::new); + private final ThreadLocal> localListenersPool = new ThreadLocal<>(); private final boolean batchingEnabled; private final int writeBufferHighWaterMark; private final int batchLimit; - /** - * This counter is splitted in 2 variables to write it with less performance - * impact: no volatile get is required to update its value - */ - private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong(); - private long pendingWritesOnEventLoop = 0; - private boolean closed; private RemotingConnection protocolConnection; @@ -129,18 +121,6 @@ public class NettyConnection implements Connection { return batchBufferSize(this.channel, this.writeBufferHighWaterMark); } - public final long pendingWritesOnEventLoop() { - final EventLoop eventLoop = channel.eventLoop(); - final boolean inEventLoop = eventLoop.inEventLoop(); - final long pendingWritesOnEventLoop; - if (inEventLoop) { - pendingWritesOnEventLoop = this.pendingWritesOnEventLoop; - } else { - pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get(); - } - return pendingWritesOnEventLoop; - } - public final Channel getNettyChannel() { return channel; } @@ -163,19 +143,27 @@ public class NettyConnection implements Connection { @Override public final void fireReady(final boolean ready) { - final ArrayList readyToCall = localListenersPool.get(); + ArrayList readyToCall = localListenersPool.get(); + if (readyToCall != null) { + localListenersPool.set(null); + } synchronized (readyListeners) { this.ready = ready; if (ready) { final int size = this.readyListeners.size(); - readyToCall.ensureCapacity(size); + if (readyToCall != null) { + readyToCall.ensureCapacity(size); + } try { for (int i = 0; i < size; i++) { final ReadyListener readyListener = readyListeners.get(i); if (readyListener == null) { break; } + if (readyToCall == null) { + readyToCall = new ArrayList<>(size); + } readyToCall.add(readyListener); } } finally { @@ -183,18 +171,23 @@ public class NettyConnection implements Connection { } } } - try { - final int size = readyToCall.size(); - for (int i = 0; i < size; i++) { - try { - final ReadyListener readyListener = readyToCall.get(i); - readyListener.readyForWriting(); - } catch (Throwable logOnly) { - ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); + if (readyToCall != null) { + try { + readyToCall.forEach(readyListener -> { + try { + readyListener.readyForWriting(); + } catch (Throwable logOnly) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); + } + }); + } catch (Throwable t) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t); + } finally { + readyToCall.clear(); + if (localListenersPool.get() != null) { + localListenersPool.set(readyToCall); } } - } finally { - readyToCall.clear(); } } @@ -256,7 +249,7 @@ public class NettyConnection implements Connection { } catch (OutOfMemoryError oom) { final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark); // I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here - logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom); + logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom); throw oom; } } @@ -342,10 +335,7 @@ public class NettyConnection implements Connection { private boolean canWrite(final int requiredCapacity) { //evaluate if the write request could be taken: //there is enough space in the write buffer? - //The pending writes on event loop will eventually go into the Netty write buffer, hence consider them - //as part of the heuristic! - final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop(); - final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel(); + final long totalPendingWrites = this.pendingWritesOnChannel(); final boolean canWrite; if (requiredCapacity > this.writeBufferHighWaterMark) { canWrite = totalPendingWrites == 0; @@ -369,34 +359,6 @@ public class NettyConnection implements Connection { } //no need to lock because the Netty's channel is thread-safe //and the order of write is ensured by the order of the write calls - final EventLoop eventLoop = channel.eventLoop(); - final boolean inEventLoop = eventLoop.inEventLoop(); - if (!inEventLoop) { - writeNotInEventLoop(buffer, flush, batched, futureListener); - } else { - // OLD COMMENT: - // create a task which will be picked up by the eventloop and trigger the write. - // This is mainly needed as this method is triggered by different threads for the same channel. - // if we not do this we may produce out of order writes. - // NOTE: - // the submitted task does not effect in any way the current written size in the batch - // until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!! - // To solve it, will be necessary to manually perform the count of the current batch instead of rely on the - // Channel:Config::writeBufferHighWaterMark value. - this.pendingWritesOnEventLoop += readableBytes; - this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop); - eventLoop.execute(() -> { - this.pendingWritesOnEventLoop -= readableBytes; - this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop); - writeInEventLoop(buffer, flush, batched, futureListener); - }); - } - } - - private void writeNotInEventLoop(ActiveMQBuffer buffer, - final boolean flush, - final boolean batched, - final ChannelFutureListener futureListener) { final Channel channel = this.channel; final ChannelPromise promise; if (flush || (futureListener != null)) { @@ -406,7 +368,6 @@ public class NettyConnection implements Connection { } final ChannelFuture future; final ByteBuf bytes = buffer.byteBuf(); - final int readableBytes = bytes.readableBytes(); assert readableBytes >= 0; final int writeBatchSize = this.batchLimit; final boolean batchingEnabled = this.batchingEnabled; @@ -420,33 +381,17 @@ public class NettyConnection implements Connection { } if (flush) { //NOTE: this code path seems used only on RemotingConnection::disconnect - waitFor(promise, DEFAULT_WAIT_MILLIS); + flushAndWait(channel, promise); } } - private void writeInEventLoop(ActiveMQBuffer buffer, - final boolean flush, - final boolean batched, - final ChannelFutureListener futureListener) { - //no need to lock because the Netty's channel is thread-safe - //and the order of write is ensured by the order of the write calls - final ChannelPromise promise; - if (futureListener != null) { - promise = channel.newPromise(); + private static void flushAndWait(final Channel channel, final ChannelPromise promise) { + if (!channel.eventLoop().inEventLoop()) { + waitFor(promise, DEFAULT_WAIT_MILLIS); } else { - promise = channel.voidPromise(); - } - final ChannelFuture future; - final ByteBuf bytes = buffer.byteBuf(); - final int readableBytes = bytes.readableBytes(); - final int writeBatchSize = this.batchLimit; - if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) { - future = writeBatch(bytes, readableBytes, promise); - } else { - future = channel.writeAndFlush(bytes, promise); - } - if (futureListener != null) { - future.addListener(futureListener); + if (logger.isDebugEnabled()) { + logger.debug("Calling write with flush from a thread where it's not allowed"); + } } } From d79762fa0489954d91e417f6110c213bc3d6db06 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 17 Dec 2018 09:11:54 -0500 Subject: [PATCH 2/3] ARTEMIS-2205 Refactor AMQP Processing into Netty Thread These improvements were also part of this task: - Routing is now cached as much as possible. - A new Runnable is avoided for each individual message, since we use the Netty executor to perform delivery https://issues.apache.org/jira/browse/ARTEMIS-2205 --- .../artemis/cli/commands/etc/artemis.profile | 3 + .../api/core/TransportConfiguration.java | 11 + .../apache/activemq/artemis/junit/Wait.java | 7 +- .../amqp/broker/AMQPConnectionCallback.java | 14 +- .../amqp/broker/AMQPSessionCallback.java | 165 ++++------ .../ActiveMQProtonRemotingConnection.java | 3 +- .../amqp/broker/ProtonProtocolManager.java | 10 + .../amqp/proton/AMQPConnectionContext.java | 228 +++++++------ .../amqp/proton/AMQPSessionContext.java | 40 +-- .../proton/ProtonServerReceiverContext.java | 136 ++++---- .../proton/ProtonServerSenderContext.java | 196 ++++++++---- .../proton/handler/ExecutorNettyAdapter.java | 221 +++++++++++++ .../amqp/proton/handler/ProtonHandler.java | 301 ++++++++++-------- .../transaction/ProtonTransactionHandler.java | 14 +- .../transaction/ProtonTransactionImpl.java | 25 +- .../amqp/broker/AMQPSessionCallbackTest.java | 83 +++-- .../paging/cursor/PagedReferenceImpl.java | 22 +- .../artemis/core/postoffice/Binding.java | 4 + .../artemis/core/postoffice/Bindings.java | 3 + .../core/postoffice/impl/BindingsImpl.java | 149 ++++++--- .../postoffice/impl/LocalQueueBinding.java | 5 + .../core/postoffice/impl/PostOfficeImpl.java | 161 +++++----- .../server/impl/RemotingServiceImpl.java | 2 +- .../core/server/ActiveMQServerLogger.java | 2 +- .../artemis/core/server/Consumer.java | 8 + .../artemis/core/server/MessageReference.java | 2 + .../core/server/MessageReferenceCallback.java | 27 ++ .../activemq/artemis/core/server/Queue.java | 8 + .../artemis/core/server/RoutingContext.java | 29 ++ .../artemis/core/server/ServerConsumer.java | 6 +- .../artemis/core/server/ServerSession.java | 17 + .../core/server/impl/LastValueQueue.java | 11 + .../server/impl/MessageReferenceImpl.java | 23 +- .../artemis/core/server/impl/QueueImpl.java | 54 +++- .../core/server/impl/RoutingContextImpl.java | 82 ++++- .../core/server/impl/ServerConsumerImpl.java | 22 +- .../core/server/impl/ServerSessionImpl.java | 41 ++- .../impl/ScheduledDeliveryHandlerTest.java | 5 + .../addressing/AddressingTest.java | 2 +- .../amqp/AmqpExpiredMessageTest.java | 2 +- .../amqp/AmqpFlowControlFailTest.java | 4 +- .../integration/amqp/AmqpSendReceiveTest.java | 49 +++ .../integration/amqp/AmqpTransactionTest.java | 2 +- .../amqp/JMSNonDestructiveTest.java | 8 +- .../integration/cli/DummyServerConsumer.java | 5 + .../integration/client/ConsumerTest.java | 10 +- .../integration/client/HangConsumerTest.java | 5 + .../unit/core/postoffice/impl/FakeQueue.java | 5 + .../impl/WildcardAddressManagerUnitTest.java | 5 + 49 files changed, 1512 insertions(+), 725 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile index 876b4cba96..af8aa86daf 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile @@ -43,3 +43,6 @@ JAVA_ARGS="${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -Xms512M -Xmx2G -D # Debug args: Uncomment to enable debug #DEBUG_ARGS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" + +# Debug args: Uncomment for async profiler +#DEBUG_ARGS="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints" diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 7fcfcd5e1b..ee285a314e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -183,6 +183,17 @@ public class TransportConfiguration implements Serializable { return extraProps; } + public Map getCombinedParams() { + Map combined = new HashMap<>(); + if (params != null) { + combined.putAll(params); + } + if (extraProps != null) { + combined.putAll(extraProps); + } + return combined; + } + @Override public int hashCode() { int result = name != null ? name.hashCode() : 0; diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java index c0aa55d40c..5c817fb1cc 100644 --- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java +++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java @@ -94,7 +94,12 @@ public class Wait { public static void assertTrue(String failureMessage, Condition condition) throws Exception { - boolean result = waitFor(condition); + assertTrue(failureMessage, condition, MAX_WAIT_MILLIS); + } + + public static void assertTrue(String failureMessage, Condition condition, final long duration) throws Exception { + + boolean result = waitFor(condition, duration); if (!result) { Assert.fail(failureMessage); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 84fdd24aa6..d34ce80fca 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -73,7 +73,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { protected AMQPConnectionContext amqpConnection; - private final Executor closeExecutor; + private final Executor sessionExecutor; private String remoteContainerId; @@ -85,15 +85,19 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { public AMQPConnectionCallback(ProtonProtocolManager manager, Connection connection, - Executor closeExecutor, + Executor sessionExecutor, ActiveMQServer server) { this.manager = manager; this.connection = connection; - this.closeExecutor = closeExecutor; + this.sessionExecutor = sessionExecutor; this.server = server; saslMechanisms = manager.getSaslMechanisms(); } + public Connection getTransportConnection() { + return connection; + } + public String[] getSaslMechanisms() { return saslMechanisms; } @@ -213,7 +217,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { - return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext()); + return new AMQPSessionCallback(this, manager, connection, this.connection, sessionExecutor, server.newOperationContext()); } public void sendSASLSupported() { @@ -256,7 +260,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { public Binary newTransaction() { XidImpl xid = newXID(); Binary binary = new Binary(xid.getGlobalTransactionId()); - Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1); + Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1, amqpConnection); transactions.put(binary, transaction); return binary; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 1ca4410a4d..0e2cf6dfcb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -16,10 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -40,15 +37,14 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerProducer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; @@ -104,7 +100,8 @@ public class AMQPSessionCallback implements SessionCallback { private final Executor sessionExecutor; - private final AtomicBoolean draining = new AtomicBoolean(false); + private final boolean directDeliver; + private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); @@ -125,6 +122,7 @@ public class AMQPSessionCallback implements SessionCallback { this.transportConnection = transportConnection; this.sessionExecutor = executor; this.operationContext = operationContext; + this.directDeliver = manager.isDirectDeliver(); } @Override @@ -133,28 +131,6 @@ public class AMQPSessionCallback implements SessionCallback { return transportConnection.isWritable(callback) && senderContext.getSender().getLocalState() != EndpointState.CLOSED; } - public void onFlowConsumer(Object consumer, int credits, final boolean drain) { - ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; - if (drain) { - // If the draining is already running, then don't do anything - if (draining.compareAndSet(false, true)) { - final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext(); - serverConsumer.forceDelivery(1, new Runnable() { - @Override - public void run() { - try { - plugSender.reportDrained(); - } finally { - draining.set(false); - } - } - }); - } - } else { - serverConsumer.receiveCredits(-1); - } - } - public void withinContext(RunnableEx run) throws Exception { OperationContext context = recoverContext(); try { @@ -180,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback { @Override public boolean supportsDirectDelivery() { - return false; + return manager.isDirectDeliver(); } public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception { @@ -347,7 +323,6 @@ public class AMQPSessionCallback implements SessionCallback { return result; } - public AddressQueryResult addressQuery(SimpleString addressName, RoutingType routingType, boolean autoCreate) throws Exception { @@ -373,41 +348,8 @@ public class AMQPSessionCallback implements SessionCallback { } public void closeSender(final Object brokerConsumer) throws Exception { - final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); - final CountDownLatch latch = new CountDownLatch(1); - - Runnable runnable = new Runnable() { - @Override - public void run() { - try { - consumer.close(false); - latch.countDown(); - } catch (Exception e) { - } - } - }; - - // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol) - // to avoid deadlocks the close has to be done outside of the main thread on an executor - // otherwise you could get a deadlock - Executor executor = protonSPI.getExeuctor(); - - if (executor != null) { - executor.execute(runnable); - } else { - runnable.run(); - } - - try { - // a short timeout will do.. 1 second is already long enough - if (!latch.await(1, TimeUnit.SECONDS)) { - logger.debug("Could not close consumer on time"); - } - } catch (InterruptedException e) { - throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue()); - } - + consumer.close(false); consumer.getQueue().recheckRefCount(serverSession.getSessionContext()); } @@ -418,12 +360,19 @@ public class AMQPSessionCallback implements SessionCallback { public void close() throws Exception { //need to check here as this can be called if init fails if (serverSession != null) { - OperationContext context = recoverContext(); - try { - serverSession.close(false); - } finally { - resetContext(context); - } + // we cannot hold the nettyExecutor on this rollback here, otherwise other connections will be waiting + sessionExecutor.execute(() -> { + OperationContext context = recoverContext(); + try { + try { + serverSession.close(false); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } finally { + resetContext(context); + } + }); } } @@ -468,7 +417,8 @@ public class AMQPSessionCallback implements SessionCallback { final Delivery delivery, SimpleString address, int messageFormat, - ReadableBuffer data) throws Exception { + ReadableBuffer data, + RoutingContext routingContext) throws Exception { AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools); if (address != null) { message.setAddress(address); @@ -503,7 +453,7 @@ public class AMQPSessionCallback implements SessionCallback { rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); } } else { - serverSend(transaction, message, delivery, receiver); + serverSend(context, transaction, message, delivery, receiver, routingContext); } } finally { resetContext(oldcontext); @@ -520,14 +470,11 @@ public class AMQPSessionCallback implements SessionCallback { afterIO(new IOCallback() { @Override public void done() { - connection.lock(); - try { + connection.runLater(() -> { delivery.disposition(rejected); delivery.settle(); - } finally { - connection.unlock(); - } - connection.flush(); + connection.flush(); + }); } @Override @@ -538,19 +485,20 @@ public class AMQPSessionCallback implements SessionCallback { } - private void serverSend(final Transaction transaction, + private void serverSend(final ProtonServerReceiverContext context, + final Transaction transaction, final Message message, final Delivery delivery, - final Receiver receiver) throws Exception { + final Receiver receiver, + final RoutingContext routingContext) throws Exception { message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()); - serverSession.send(transaction, message, false, false); + serverSession.send(transaction, message, directDeliver, false, routingContext); afterIO(new IOCallback() { @Override public void done() { - connection.lock(); - try { + connection.runLater(() -> { if (delivery.getRemoteState() instanceof TransactionalState) { TransactionalState txAccepted = new TransactionalState(); txAccepted.setOutcome(Accepted.getInstance()); @@ -561,21 +509,17 @@ public class AMQPSessionCallback implements SessionCallback { delivery.disposition(Accepted.getInstance()); } delivery.settle(); - } finally { - connection.unlock(); - } - connection.flush(); + context.flow(); + connection.flush(); + }); } @Override public void onError(int errorCode, String errorMessage) { - connection.lock(); - try { + connection.runNow(() -> { receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); connection.flush(); - } finally { - connection.unlock(); - } + }); } }); } @@ -635,15 +579,12 @@ public class AMQPSessionCallback implements SessionCallback { ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); try { - return plugSender.deliverMessage(ref, deliveryCount, transportConnection); + return plugSender.deliverMessage(ref, consumer); } catch (Exception e) { - connection.lock(); - try { + connection.runNow(() -> { plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); connection.flush(); - } finally { - connection.unlock(); - } + }); throw new IllegalStateException("Can't deliver message " + e, e); } @@ -673,23 +614,22 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void disconnect(ServerConsumer consumer, SimpleString queueName) { ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName); - connection.lock(); - try { - ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec); - connection.flush(); - } catch (ActiveMQAMQPException e) { - logger.error("Error closing link for " + consumer.getQueue().getAddress()); - } finally { - connection.unlock(); - } + connection.runNow(() -> { + try { + ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec); + connection.flush(); + } catch (ActiveMQAMQPException e) { + logger.error("Error closing link for " + consumer.getQueue().getAddress()); + } + }); } @Override public boolean hasCredits(ServerConsumer consumer) { ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); - if (plugSender != null && plugSender.getSender().getCredit() > 0) { - return true; + if (plugSender != null) { + return plugSender.hasCredits(); } else { return false; } @@ -757,6 +697,10 @@ public class AMQPSessionCallback implements SessionCallback { this.transactionHandler = transactionHandler; } + public Connection getTransportConnection() { + return transportConnection; + } + public ProtonTransactionHandler getTransactionHandler() { return this.transactionHandler; } @@ -782,4 +726,7 @@ public class AMQPSessionCallback implements SessionCallback { } } + interface CreditRunnable extends Runnable { + boolean isRun(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 41f6e788c8..a06765d91e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -122,7 +122,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection ErrorCondition errorCondition = new ErrorCondition(); errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); amqpConnection.close(errorCondition); - getTransportConnection().close(); + // There's no need to flush, amqpConnection.close() is calling flush + // as long this semantic is kept no need to flush here } /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index d86dc81826..5b9aa38b49 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -77,6 +77,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager connectionCallback.close()); } public boolean isSyncOnFlush() { return false; } - public boolean tryLock(long time, TimeUnit timeUnit) { - return handler.tryLock(time, timeUnit); - } - - public void lock() { - handler.lock(); - } - - public void unlock() { - handler.unlock(); - } - - public int capacity() { - return handler.capacity(); - } - public void flush() { handler.flush(); } public void close(ErrorCondition errorCondition) { - handler.close(errorCondition); + handler.close(errorCondition, this); } protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException { @@ -201,6 +197,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH return sessionExtension; } + public void runOnPool(Runnable run) { + handler.runOnPool(run); + } + + public void runNow(Runnable run) { + handler.runNow(run); + } + + public void runLater(Runnable run) { + handler.runLater(run); + } + protected boolean validateConnection(Connection connection) { return connectionCallback.validateConnection(connection, handler.getSASLResult()); } @@ -224,6 +232,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH protected void initInternal() throws Exception { } + public AMQPConnectionCallback getConnectionCallback() { + return connectionCallback; + } + protected void remoteLinkOpened(Link link) throws Exception { AMQPSessionContext protonSession = getSessionExtension(link.getSession()); @@ -314,7 +326,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH if (!connectionCallback.isSupportsAnonymous()) { connectionCallback.sendSASLSupported(); connectionCallback.close(); - handler.close(null); + handler.close(null, this); } } } @@ -334,7 +346,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) { connectionCallback.close(); - handler.close(null); + handler.close(null, this); } @Override @@ -359,59 +371,73 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteOpen(Connection connection) throws Exception { - lock(); + handler.requireHandler(); try { - try { - initInternal(); - } catch (Exception e) { - log.error("Error init connection", e); - } - if (!validateConnection(connection)) { - connection.close(); - } else { - connection.setContext(AMQPConnectionContext.this); - connection.setContainer(containerId); - connection.setProperties(connectionProperties); - connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); - connection.open(); - } - } finally { - unlock(); + initInternal(); + } catch (Exception e) { + log.error("Error init connection", e); + } + if (!validateConnection(connection)) { + connection.close(); + } else { + connection.setContext(AMQPConnectionContext.this); + connection.setContainer(containerId); + connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); + connection.open(); } initialise(); - /* - * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections - * but its here in case we add support for outbound connections. - * */ + /* + * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections + * but its here in case we add support for outbound connections. + * */ if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { long nextKeepAliveTime = handler.tick(true); if (nextKeepAliveTime != 0 && scheduledPool != null) { - scheduledPool.schedule(new Runnable() { - @Override - public void run() { - Long rescheduleAt = handler.tick(false); - if (rescheduleAt == null) { - // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. - scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS); - } else if (rescheduleAt != 0) { - scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); - } - } - }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); + scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); } } } + class TickerRunnable implements Runnable { + + final ScheduleRunnable scheduleRunnable; + + TickerRunnable(ScheduleRunnable scheduleRunnable) { + this.scheduleRunnable = scheduleRunnable; + } + + @Override + public void run() { + Long rescheduleAt = handler.tick(false); + if (rescheduleAt == null) { + // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. + scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS); + } else if (rescheduleAt != 0) { + scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); + } + } + } + + class ScheduleRunnable implements Runnable { + + TickerRunnable tickerRunnable = new TickerRunnable(this); + + @Override + public void run() { + + // The actual tick has to happen within a Netty Worker, to avoid requiring a lock + // this will also be used to flush the data directly into netty connection's executor + handler.runLater(tickerRunnable); + } + } + @Override public void onRemoteClose(Connection connection) { - lock(); - try { - connection.close(); - connection.free(); - } finally { - unlock(); - } + handler.requireHandler(); + connection.close(); + connection.free(); for (AMQPSessionContext protonSession : sessions.values()) { protonSession.close(); @@ -430,31 +456,24 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteOpen(Session session) throws Exception { + handler.requireHandler(); getSessionExtension(session).initialise(); - lock(); - try { - session.open(); - } finally { - unlock(); - } + session.open(); } @Override public void onRemoteClose(Session session) throws Exception { - lock(); - try { + handler.runLater(() -> { session.close(); session.free(); - } finally { - unlock(); - } - AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); - if (sessionContext != null) { - sessionContext.close(); - sessions.remove(session); - session.setContext(null); - } + AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); + if (sessionContext != null) { + sessionContext.close(); + sessions.remove(session); + session.setContext(null); + } + }); } @Override @@ -471,40 +490,42 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteClose(Link link) throws Exception { - lock(); - try { + handler.requireHandler(); + + // We scheduled it for later, as that will work through anything that's pending on the current deliveries. + runNow(() -> { link.close(); link.free(); - } finally { - unlock(); - } - ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); - if (linkContext != null) { - linkContext.close(true); - } + ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); + if (linkContext != null) { + try { + linkContext.close(true); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + flush(); + + }); } @Override public void onRemoteDetach(Link link) throws Exception { - boolean handleAsClose = link.getSource() != null - && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH; + handler.requireHandler(); + boolean handleAsClose = link.getSource() != null && ((Source) link.getSource()).getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH; if (handleAsClose) { onRemoteClose(link); } else { - lock(); - try { - link.detach(); - link.free(); - } finally { - unlock(); - } + link.detach(); + link.free(); } } @Override public void onLocalDetach(Link link) throws Exception { + handler.requireHandler(); Object context = link.getContext(); if (context instanceof ProtonServerSenderContext) { ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; @@ -514,6 +535,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onDelivery(Delivery delivery) throws Exception { + handler.requireHandler(); ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); if (handler != null) { handler.onMessage(delivery); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index 5cd3515203..c8bb13e0c4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -150,13 +150,11 @@ public class AMQPSessionContext extends ProtonInitializable { coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); receiver.setContext(transactionHandler); - connection.lock(); - try { + connection.runNow(() -> { receiver.open(); receiver.flow(connection.getAmqpCredits()); - } finally { - connection.unlock(); - } + connection.flush(); + }); } public void addSender(Sender sender) throws Exception { @@ -169,24 +167,20 @@ public class AMQPSessionContext extends ProtonInitializable { senders.put(sender, protonSender); serverSenders.put(protonSender.getBrokerConsumer(), protonSender); sender.setContext(protonSender); - connection.lock(); - try { + connection.runNow(() -> { sender.open(); - } finally { - connection.unlock(); - } + connection.flush(); + }); protonSender.start(); } catch (ActiveMQAMQPException e) { senders.remove(sender); sender.setSource(null); sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - connection.lock(); - try { + connection.runNow(() -> { sender.close(); - } finally { - connection.unlock(); - } + connection.flush(); + }); } } @@ -206,22 +200,18 @@ public class AMQPSessionContext extends ProtonInitializable { ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress()); sessionSPI.addProducer(serverProducer); receiver.setContext(protonReceiver); - connection.lock(); - try { + connection.runNow(() -> { receiver.open(); - } finally { - connection.unlock(); - } + connection.flush(); + }); } catch (ActiveMQAMQPException e) { receivers.remove(receiver); receiver.setTarget(null); receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - connection.lock(); - try { + connection.runNow(() -> { receiver.close(); - } finally { - connection.unlock(); - } + connection.flush(); + }); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index b0cfba0a93..14463730a6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -25,7 +25,9 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; @@ -49,6 +51,9 @@ import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.jboss.logging.Logger; +/** + * This is the equivalent for the ServerProducer + */ public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class); @@ -63,35 +68,43 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements protected final AMQPSessionCallback sessionSPI; - /** We create this AtomicRunnable with setRan. - * This is because we always reuse the same instance. - * In case the creditRunnable was run, we reset and send it over. - * We set it as ran as the first one should always go through */ + RoutingContext routingContext = new RoutingContextImpl(null); + + /** + * We create this AtomicRunnable with setRan. + * This is because we always reuse the same instance. + * In case the creditRunnable was run, we reset and send it over. + * We set it as ran as the first one should always go through + */ protected final AtomicRunnable creditRunnable; + /** + * This Credit Runnable may be used in Mock tests to simulate the credit semantic here + */ + public static AtomicRunnable createCreditRunnable(int refill, + int threshold, + Receiver receiver, + AMQPConnectionContext connection) { + Runnable creditRunnable = () -> { - /** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */ - public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) { + connection.requireInHandler(); + if (receiver.getCredit() <= threshold) { + int topUp = refill - receiver.getCredit(); + if (topUp > 0) { + // System.out.println("Sending " + topUp + " towards client"); + receiver.flow(topUp); + connection.flush(); + } + } + }; return new AtomicRunnable() { @Override public void atomicRun() { - connection.lock(); - try { - if (receiver.getCredit() <= threshold) { - int topUp = refill - receiver.getCredit(); - if (topUp > 0) { - receiver.flow(topUp); - } - } - } finally { - connection.unlock(); - } - connection.flush(); + connection.runNow(creditRunnable); } }; } - /* The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. @@ -249,41 +262,46 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements */ @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { - try { - Receiver receiver = ((Receiver) delivery.getLink()); + connection.requireInHandler(); + Receiver receiver = ((Receiver) delivery.getLink()); - if (receiver.current() != delivery) { - return; - } + if (receiver.current() != delivery) { + return; + } - if (delivery.isAborted()) { - // Aborting implicitly remotely settles, so advance - // receiver to the next delivery and settle locally. - receiver.advance(); - delivery.settle(); - - // Replenish the credit if not doing a drain - if (!receiver.getDrain()) { - receiver.flow(1); - } - - return; - } else if (delivery.isPartial()) { - return; - } - - Transaction tx = null; - ReadableBuffer data = receiver.recv(); + if (delivery.isAborted()) { + // Aborting implicitly remotely settles, so advance + // receiver to the next delivery and settle locally. receiver.advance(); + delivery.settle(); - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); + // Replenish the credit if not doing a drain + if (!receiver.getDrain()) { + receiver.flow(1); } - sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data); + return; + } else if (delivery.isPartial()) { + return; + } - flow(); + ReadableBuffer data = receiver.recv(); + receiver.advance(); + Transaction tx = null; + + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); + } + + final Transaction txUsed = tx; + + actualDelivery(delivery, receiver, data, txUsed); + } + + private void actualDelivery(Delivery delivery, Receiver receiver, ReadableBuffer data, Transaction tx) { + try { + sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext); } catch (Exception e) { log.warn(e.getMessage(), e); Rejected rejected = new Rejected(); @@ -294,13 +312,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } else { condition.setCondition(Symbol.valueOf("failed")); } + connection.runLater(() -> { - condition.setDescription(e.getMessage()); - rejected.setError(condition); + condition.setDescription(e.getMessage()); + rejected.setError(condition); + + delivery.disposition(rejected); + delivery.settle(); + flow(); + connection.flush(); + }); - delivery.disposition(rejected); - delivery.settle(); - flow(); } } @@ -324,6 +346,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } public void flow() { + connection.requireInHandler(); if (!creditRunnable.isRun()) { return; // nothing to be done as the previous one did not run yet } @@ -339,13 +362,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } public void drain(int credits) { - connection.lock(); - try { + connection.runNow(() -> { receiver.drain(credits); - } finally { - connection.unlock(); - } - connection.flush(); + connection.flush(); + }); } public int drained() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index c4aca48e64..843d1fedcc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -20,7 +20,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; @@ -32,7 +33,10 @@ import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -49,7 +53,6 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; -import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -74,9 +77,9 @@ import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; /** - * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links + * This is the Equivalent for the ServerConsumer */ -public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler { +public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback { private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class); @@ -104,6 +107,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean isVolatile = false; private boolean preSettle; private SimpleString tempQueueName; + private final AtomicBoolean draining = new AtomicBoolean(false); + + private int credits = 0; + + private AtomicInteger pending = new AtomicInteger(0); + /** + * The model proton uses requires us to hold a lock in certain times + * to sync the credits we have versus the credits that are being held in proton + * */ + private final Object creditsLock = new Object(); public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, @@ -122,7 +135,51 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr @Override public void onFlow(int currentCredits, boolean drain) { - sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); + connection.requireInHandler(); + + setupCredit(); + + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer; + if (drain) { + // If the draining is already running, then don't do anything + if (draining.compareAndSet(false, true)) { + final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext(); + serverConsumer.forceDelivery(1, new Runnable() { + @Override + public void run() { + try { + connection.runNow(() -> { + plugSender.reportDrained(); + setupCredit(); + }); + } finally { + draining.set(false); + } + } + }); + } + } else { + serverConsumer.receiveCredits(-1); + } + } + + public boolean hasCredits() { + if (!connection.flowControl(brokerConsumer::promptDelivery)) { + return false; + } + + synchronized (creditsLock) { + return credits > 0 && sender.getLocalState() != EndpointState.CLOSED; + } + } + + private void setupCredit() { + synchronized (creditsLock) { + this.credits = sender.getCredit() - pending.get(); + if (credits < 0) { + credits = 0; + } + } } public Sender getSender() { @@ -469,20 +526,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.setCondition(condition); } protonSession.removeSender(sender); - connection.lock(); - try { - sender.close(); - } finally { - connection.unlock(); - } - connection.flush(); - try { - sessionSPI.closeSender(brokerConsumer); - } catch (Exception e) { - log.warn(e.getMessage(), e); - throw new ActiveMQAMQPInternalErrorException(e.getMessage()); - } + connection.runLater(() -> { + sender.close(); + try { + sessionSPI.closeSender(brokerConsumer); + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + sender.close(); + connection.flush(); + }); } /* @@ -666,12 +720,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } public void settle(Delivery delivery) { - connection.lock(); - try { - delivery.settle(); - } finally { - connection.unlock(); - } + connection.requireInHandler(); + delivery.settle(); } public synchronized void checkState() { @@ -681,42 +731,59 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * handle an out going message from ActiveMQ Artemis, send via the Proton Sender */ - public int deliverMessage(MessageReference messageReference, int deliveryCount, Connection transportConnection) throws Exception { + public int deliverMessage(final MessageReference messageReference, final ServerConsumer consumer) throws Exception { if (closed) { return 0; } - AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage()); - sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()); - - // we only need a tag if we are going to settle later - byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); - - // Let the Message decide how to present the message bytes - ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount); - boolean releaseRequired = sendBuffer instanceof NettyReadable; - try { - int size = sendBuffer.remaining(); - - while (!connection.tryLock(1, TimeUnit.SECONDS)) { - if (closed || sender.getLocalState() == EndpointState.CLOSED) { - // If we're waiting on the connection lock, the link might be in the process of closing. If this happens - // we return. + synchronized (creditsLock) { + if (sender.getLocalState() == EndpointState.CLOSED) { return 0; - } else { - if (log.isDebugEnabled()) { - log.debug("Couldn't get lock on deliverMessage " + this); - } } + pending.incrementAndGet(); + credits--; } + if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) { + messageReference.setCallback(this); + connection.runNow((Runnable)messageReference); + } else { + connection.runNow(() -> executeDelivery(messageReference)); + } + + // This is because on AMQP we only send messages based in credits, not bytes + return 1; + } finally { + + } + } + + @Override + public void executeDelivery(MessageReference messageReference) { + + try { + if (sender.getLocalState() == EndpointState.CLOSED) { + log.debug("Not delivering message " + messageReference + " as the sender is closed and credits were available, if you see too many of these it means clients are issuing credits and closing the connection with pending credits a lot of times"); + return; + } + AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage()); + + sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()); + + // Let the Message decide how to present the message bytes + ReadableBuffer sendBuffer = message.getSendBuffer(messageReference.getDeliveryCount()); + // we only need a tag if we are going to settle later + byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); + + boolean releaseRequired = sendBuffer instanceof NettyReadable; + final Delivery delivery; + delivery = sender.delivery(tag, 0, tag.length); + delivery.setMessageFormat((int) message.getMessageFormat()); + delivery.setContext(messageReference); + try { - final Delivery delivery; - delivery = sender.delivery(tag, 0, tag.length); - delivery.setMessageFormat((int) message.getMessageFormat()); - delivery.setContext(messageReference); if (releaseRequired) { sender.send(sendBuffer); @@ -730,7 +797,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); @@ -738,14 +809,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr connection.flush(); } finally { - connection.unlock(); - } - - return size; - } finally { - if (releaseRequired) { - ((NettyReadable) sendBuffer).getByteBuf().release(); + synchronized (creditsLock) { + pending.decrementAndGet(); + } + if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); + } } + } catch (Exception e) { + log.warn(e.getMessage(), e); + brokerConsumer.errorProcessing(e, messageReference); } } @@ -806,13 +879,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr * Update link state to reflect that the previous drain attempt has completed. */ public void reportDrained() { - connection.lock(); - try { - sender.drained(); - } finally { - connection.unlock(); - } - + connection.requireInHandler(); + sender.drained(); connection.flush(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java new file mode 100644 index 0000000000..9d0f09eb8a --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExecutorNettyAdapter.java @@ -0,0 +1,221 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.proton.handler; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** Test cases may supply a simple executor instead of the real Netty Executor + * On that case this is a simple adapter for what's needed from these tests. + * Not intended to be used in production. + * + * TODO: This could be refactored out of the main codebase but at a high cost. + * We may do it some day if we find an easy way that won't clutter the code too much. + * */ +public class ExecutorNettyAdapter implements EventLoop { + + final ArtemisExecutor executor; + + public ExecutorNettyAdapter(ArtemisExecutor executor) { + this.executor = executor; + } + + @Override + public EventLoopGroup parent() { + return null; + } + + @Override + public EventLoop next() { + return null; + } + + @Override + public ChannelFuture register(Channel channel) { + return null; + } + + @Override + public ChannelFuture register(ChannelPromise promise) { + return null; + } + + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + return null; + } + + @Override + public boolean inEventLoop() { + return inEventLoop(Thread.currentThread()); + } + + @Override + public boolean inEventLoop(Thread thread) { + return false; + } + + @Override + public Promise newPromise() { + return null; + } + + @Override + public ProgressivePromise newProgressivePromise() { + return null; + } + + @Override + public Future newSucceededFuture(V result) { + return null; + } + + @Override + public Future newFailedFuture(Throwable cause) { + return null; + } + + @Override + public boolean isShuttingDown() { + return false; + } + + @Override + public Future shutdownGracefully() { + return null; + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return null; + } + + @Override + public Future terminationFuture() { + return null; + } + + @Override + public void shutdown() { + + } + + @Override + public List shutdownNow() { + return null; + } + + @Override + public Iterator iterator() { + return null; + } + + @Override + public Future submit(Runnable task) { + execute(task); + return null; + } + + @Override + public Future submit(Runnable task, T result) { + execute(task); + return null; + } + + @Override + public Future submit(Callable task) { + return null; + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return null; + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return null; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return null; + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return null; + } + + @Override + public List> invokeAll(Collection> tasks, + long timeout, + TimeUnit unit) throws InterruptedException { + return null; + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return null; + } + + @Override + public T invokeAny(Collection> tasks, + long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 694c1d36e7..2f730fb304 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -16,22 +16,24 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton.handler; +import javax.security.auth.Subject; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -import javax.security.auth.Subject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.EventLoop; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; @@ -46,9 +48,6 @@ import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.TransportInternal; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - public class ProtonHandler extends ProtonInitializable implements SaslListener { private static final Logger log = Logger.getLogger(ProtonHandler.class); @@ -68,8 +67,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { private ServerSASL chosenMechanism; private ClientSASL clientSASLMechanism; - private final ReentrantLock lock = new ReentrantLock(); - private final long creationTime; private final boolean isServer; @@ -80,17 +77,20 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { protected boolean receivedFirstPacket = false; - private final Executor flushExecutor; + private final EventLoop workerExecutor; + + private final ArtemisExecutor poolExecutor; protected final ReadyListener readyListener; boolean inDispatch = false; - public ProtonHandler(Executor flushExecutor, boolean isServer) { - this.flushExecutor = flushExecutor; - this.readyListener = () -> this.flushExecutor.execute(() -> { - flush(); - }); + boolean scheduledFlush = false; + + public ProtonHandler(EventLoop workerExecutor, ArtemisExecutor poolExecutor, boolean isServer) { + this.workerExecutor = workerExecutor; + this.poolExecutor = poolExecutor; + this.readyListener = () -> runLater(this::flush); this.creationTime = System.currentTimeMillis(); this.isServer = isServer; @@ -106,45 +106,33 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { } public Long tick(boolean firstTick) { - if (firstTick) { - // the first tick needs to guarantee a lock here - lock.lock(); - } else { - if (!lock.tryLock()) { - log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly"); - // if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here - return null; - } - } - try { - if (!firstTick) { - try { - if (connection.getLocalState() != EndpointState.CLOSED) { - long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); - if (transport.isClosed()) { - throw new IllegalStateException("Channel was inactive for to long"); - } - return rescheduleAt; + requireHandler(); + if (!firstTick) { + try { + if (connection.getLocalState() != EndpointState.CLOSED) { + long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + if (transport.isClosed()) { + throw new IllegalStateException("Channel was inactive for to long"); } - } catch (Exception e) { - log.warn(e.getMessage(), e); - transport.close(); - connection.setCondition(new ErrorCondition()); + return rescheduleAt; } - return 0L; + } catch (Exception e) { + log.warn(e.getMessage(), e); + transport.close(); + connection.setCondition(new ErrorCondition()); + } finally { + flush(); } - return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); - } finally { - lock.unlock(); - flushBytes(); + return 0L; } + return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); } /** * We cannot flush until the initial handshake was finished. * If this happens before the handshake, the connection response will happen without SASL * and the client will respond and fail with an invalid code. - * */ + */ public void scheduledFlush() { if (receivedFirstPacket) { flush(); @@ -152,29 +140,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { } public int capacity() { - lock.lock(); - try { - return transport.capacity(); - } finally { - lock.unlock(); - } + requireHandler(); + return transport.capacity(); } - public void lock() { - lock.lock(); - } - - public void unlock() { - lock.unlock(); - } - - public boolean tryLock(long time, TimeUnit timeUnit) { - try { - return lock.tryLock(time, timeUnit); - } catch (InterruptedException e) { - - Thread.currentThread().interrupt(); - return false; + public void requireHandler() { + if (!workerExecutor.inEventLoop()) { + new Exception("saco!!!").printStackTrace(); + // this should not happen unless there is an obvious programming error + log.warn("Using inHandler is required", new Exception("trace")); + System.exit(-1); + throw new IllegalStateException("this method requires to be called within the handler, use the executor"); } } @@ -192,21 +168,34 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { } public void createServerSASL(String[] mechanisms) { + requireHandler(); Sasl sasl = transport.sasl(); sasl.server(); sasl.setMechanisms(mechanisms); sasl.setListener(this); } + + public void flushBytes() { + requireHandler(); + + if (!scheduledFlush) { + scheduledFlush = true; + workerExecutor.execute(this::actualFlush); + } + } + + private void actualFlush() { + requireHandler(); for (EventHandler handler : handlers) { if (!handler.flowControl(readyListener)) { + scheduledFlush = false; return; } } - lock.lock(); try { while (true) { ByteBuffer head = transport.head(); @@ -227,7 +216,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { transport.pop(pending); } } finally { - lock.unlock(); + scheduledFlush = false; } } @@ -236,36 +225,32 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { } public void inputBuffer(ByteBuf buffer) { + requireHandler(); dataReceived = true; - lock.lock(); - try { - while (buffer.readableBytes() > 0) { - int capacity = transport.capacity(); + while (buffer.readableBytes() > 0) { + int capacity = transport.capacity(); - if (!receivedFirstPacket) { - handleFirstPacket(buffer); - // there is a chance that if SASL Handshake has been carried out that the capacity may change. - capacity = transport.capacity(); - } - - if (capacity > 0) { - ByteBuffer tail = transport.tail(); - int min = Math.min(capacity, buffer.readableBytes()); - tail.limit(min); - buffer.readBytes(tail); - - flush(); - } else { - if (capacity == 0) { - log.debugf("abandoning: readableBytes=%d", buffer.readableBytes()); - } else { - log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity()); - } - break; - } + if (!receivedFirstPacket) { + handleFirstPacket(buffer); + // there is a chance that if SASL Handshake has been carried out that the capacity may change. + capacity = transport.capacity(); + } + + if (capacity > 0) { + ByteBuffer tail = transport.tail(); + int min = Math.min(capacity, buffer.readableBytes()); + tail.limit(min); + buffer.readBytes(tail); + + flush(); + } else { + if (capacity == 0) { + log.debugf("abandoning: readableBytes=%d", buffer.readableBytes()); + } else { + log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity()); + } + break; } - } finally { - lock.unlock(); } } @@ -281,29 +266,55 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { return creationTime; } - public void flush() { - lock.lock(); - try { - transport.process(); - } finally { - lock.unlock(); - } - - dispatch(); + public void runOnPool(Runnable runnable) { + poolExecutor.execute(runnable); } - public void close(ErrorCondition errorCondition) { - lock.lock(); - try { + public void runNow(Runnable runnable) { + if (workerExecutor.inEventLoop()) { + runnable.run(); + } else { + workerExecutor.execute(runnable); + } + } + + public void runLater(Runnable runnable) { + workerExecutor.execute(runnable); + } + + public void flush() { + if (workerExecutor.inEventLoop()) { + transport.process(); + dispatch(); + } else { + runLater(() -> { + transport.process(); + dispatch(); + }); + } + } + + public void close(ErrorCondition errorCondition, AMQPConnectionContext connectionContext) { + runNow(() -> { if (errorCondition != null) { connection.setCondition(errorCondition); } connection.close(); - } finally { - lock.unlock(); - } + flush(); + }); - flush(); + /*try { + Thread.sleep(1000); + } catch (Exception e) { + e.printStackTrace(); + } */ + // this needs to be done in two steps + // we first flush what we have to the client + // after flushed, we close the local connection + // otherwise this could close the netty connection before the Writable is complete + runLater(() -> { + connectionContext.getConnectionCallback().getTransportConnection().close(); + }); } // server side SASL Listener @@ -462,45 +473,59 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { private void dispatch() { Event ev; - lock.lock(); + if (inDispatch) { + // Avoid recursion from events + return; + } try { - if (inDispatch) { - // Avoid recursion from events - return; - } - try { - inDispatch = true; - while ((ev = collector.peek()) != null) { - for (EventHandler h : handlers) { - if (log.isTraceEnabled()) { - log.trace("Handling " + ev + " towards " + h); - } - try { - Events.dispatch(ev, h); - } catch (Exception e) { - log.warn(e.getMessage(), e); - ErrorCondition error = new ErrorCondition(); - error.setCondition(AmqpError.INTERNAL_ERROR); - error.setDescription("Unrecoverable error: " + - (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); - connection.setCondition(error); - connection.close(); - } + inDispatch = true; + while ((ev = collector.peek()) != null) { + for (EventHandler h : handlers) { + if (log.isTraceEnabled()) { + log.trace("Handling " + ev + " towards " + h); + } + try { + Events.dispatch(ev, h); + } catch (Exception e) { + log.warn(e.getMessage(), e); + ErrorCondition error = new ErrorCondition(); + error.setCondition(AmqpError.INTERNAL_ERROR); + error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + connection.setCondition(error); + connection.close(); } - - collector.pop(); } - } finally { - inDispatch = false; + collector.pop(); } + } finally { - lock.unlock(); + inDispatch = false; } flushBytes(); } + + public void handleError(Exception e) { + if (workerExecutor.inEventLoop()) { + internalHandlerError(e); + } else { + runLater(() -> internalHandlerError(e)); + } + } + + private void internalHandlerError(Exception e) { + log.warn(e.getMessage(), e); + ErrorCondition error = new ErrorCondition(); + error.setCondition(AmqpError.INTERNAL_ERROR); + error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + connection.setCondition(error); + connection.close(); + flush(); + } + + public void open(String containerId, Map connectionProperties) { this.transport.open(); this.connection.setContainer(containerId); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 78a5b33637..15803f497c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -107,14 +107,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { IOCallback ioAction = new IOCallback() { @Override public void done() { - connection.lock(); - try { + connection.runLater(() -> { delivery.settle(); delivery.disposition(declared); - } finally { - connection.unlock(); connection.flush(); - } + }); } @Override @@ -133,15 +130,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { IOCallback ioAction = new IOCallback() { @Override public void done() { - connection.lock(); - try { + connection.runLater(() -> { delivery.settle(); delivery.disposition(new Accepted()); currentTx = null; - } finally { - connection.unlock(); connection.flush(); - } + }); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java index ab4ff42981..4c5a887cf5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java @@ -25,11 +25,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.RefsOperation; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; import org.apache.qpid.proton.engine.Delivery; - /** * AMQP Protocol has different TX Rollback behaviour for Acks depending on whether an AMQP delivery has been settled * or not. This class extends the Core TransactionImpl used for normal TX behaviour. In the case where deliveries @@ -46,8 +48,22 @@ public class ProtonTransactionImpl extends TransactionImpl { private boolean discharged; - public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { + public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) { super(xid, storageManager, timeoutSeconds); + addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + super.afterCommit(tx); + connection.runNow(() -> { + // Settle all unsettled deliveries if commit is successful + for (Pair p : deliveries.values()) { + if (!p.getA().isSettled()) + p.getB().settle(p.getA()); + } + connection.flush(); + }); + } + }); } @Override @@ -71,11 +87,6 @@ public class ProtonTransactionImpl extends TransactionImpl { @Override public void commit() throws Exception { super.commit(); - - // Settle all unsettled deliveries if commit is successful - for (Pair p : deliveries.values()) { - if (!p.getA().isSettled()) p.getB().settle(p.getA()); - } } public boolean isDischarged() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java index 30814a92c4..349c32dd03 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java @@ -16,13 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; -import static org.junit.Assert.assertNotNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.never; - import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.SimpleString; @@ -34,29 +27,65 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.qpid.proton.engine.Receiver; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.quality.Strictness; +import org.mockito.stubbing.Answer; + +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.never; public class AMQPSessionCallbackTest { - @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); + @Rule + public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); - @Mock private AMQPConnectionCallback protonSPI; - @Mock private ProtonProtocolManager manager; - @Mock private AMQPConnectionContext connection; - @Mock private Connection transportConnection; - @Mock private Executor executor; - @Mock private OperationContext operationContext; - @Mock private Receiver receiver; - @Mock private ActiveMQServer server; - @Mock private PagingManager pagingManager; - @Mock private PagingStore pagingStore; + @Mock + private AMQPConnectionCallback protonSPI; + @Mock + private ProtonProtocolManager manager; + @Mock + private AMQPConnectionContext connection; + @Mock + private Connection transportConnection; + @Mock + private Executor executor; + @Mock + private OperationContext operationContext; + @Mock + private Receiver receiver; + @Mock + private ActiveMQServer server; + @Mock + private PagingManager pagingManager; + @Mock + private PagingStore pagingStore; + + + @Before + public void setRule() { + + // The connection will call the runnable now on this mock, as these would happen on a different thread. + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + } + }).when(connection).runNow(Mockito.isA(Runnable.class)); + + } /** * Test that the AMQPSessionCallback grants no credit when not at threshold @@ -69,8 +98,7 @@ public class AMQPSessionCallbackTest { // Capture credit runnable and invoke to trigger credit top off ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); - AMQPSessionCallback session = new AMQPSessionCallback( - protonSPI, manager, connection, transportConnection, executor, operationContext); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); // Credit is above threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1); @@ -100,8 +128,7 @@ public class AMQPSessionCallbackTest { // Capture credit runnable and invoke to trigger credit top off ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); - AMQPSessionCallback session = new AMQPSessionCallback( - protonSPI, manager, connection, transportConnection, executor, operationContext); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); @@ -132,8 +159,7 @@ public class AMQPSessionCallbackTest { // Capture credit runnable and invoke to trigger credit top off ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); - AMQPSessionCallback session = new AMQPSessionCallback( - protonSPI, manager, connection, transportConnection, executor, operationContext); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); // Credit is above threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1); @@ -164,8 +190,7 @@ public class AMQPSessionCallbackTest { // Capture credit runnable and invoke to trigger credit top off ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); - AMQPSessionCallback session = new AMQPSessionCallback( - protonSPI, manager, connection, transportConnection, executor, operationContext); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); @@ -195,8 +220,7 @@ public class AMQPSessionCallbackTest { // Capture credit runnable and invoke to trigger credit top off ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); - AMQPSessionCallback session = new AMQPSessionCallback( - protonSPI, manager, connection, transportConnection, executor, operationContext); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); @@ -227,8 +251,7 @@ public class AMQPSessionCallbackTest { // Capture credit runnable and invoke to trigger credit top off ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); - AMQPSessionCallback session = new AMQPSessionCallback( - protonSPI, manager, connection, transportConnection, executor, operationContext); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 2402d09b15..e05a9af550 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; @@ -31,7 +32,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; import org.jboss.logging.Logger; -public class PagedReferenceImpl extends LinkedListImpl.Node implements PagedReference { +public class PagedReferenceImpl extends LinkedListImpl.Node implements PagedReference, Runnable { private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class); @@ -74,6 +75,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private long messageSize = -1; + private MessageReferenceCallback callback; + @Override public Object getProtocolData() { return protocolData; @@ -89,6 +92,23 @@ public class PagedReferenceImpl extends LinkedListImpl.Node return getPagedMessage().getMessage(); } + @Override + public void setCallback(MessageReferenceCallback callback) { + this.callback = callback; + } + + @Override + public void run() { + MessageReferenceCallback callback = this.callback; + + try { + if (callback != null) { + callback.executeDelivery(this); + } + } finally { + this.callback = null; + } + } @Override public synchronized PagedMessage getPagedMessage() { PagedMessage returnMessage = message != null ? message.get() : null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java index f1e83d2e2d..bd6b705e7c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java @@ -26,6 +26,10 @@ import org.apache.activemq.artemis.core.server.group.UnproposalListener; public interface Binding extends UnproposalListener { + default boolean isLocal() { + return false; + } + SimpleString getAddress(); Bindable getBindable(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index 30a268056d..053acfae7e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -26,6 +26,9 @@ import org.apache.activemq.artemis.core.server.group.UnproposalListener; public interface Bindings extends UnproposalListener { + // this is to inform the parent there was an udpate on the bindings + void updated(QueueBinding binding); + Collection getBindings(); void addBinding(Binding binding); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 56abddbc09..9cd6f73870 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -26,12 +26,14 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -63,6 +65,13 @@ public final class BindingsImpl implements Bindings { private final SimpleString name; + private static final AtomicInteger sequenceVersion = new AtomicInteger(Integer.MIN_VALUE); + + /** + * This has a version about adds and removes + */ + private final AtomicInteger version = new AtomicInteger(sequenceVersion.incrementAndGet()); + public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) { this.groupingHandler = groupingHandler; this.name = name; @@ -92,61 +101,78 @@ public final class BindingsImpl implements Bindings { @Override public void addBinding(final Binding binding) { - if (logger.isTraceEnabled()) { - logger.trace("addBinding(" + binding + ") being called"); - } - if (binding.isExclusive()) { - exclusiveBindings.add(binding); - } else { - SimpleString routingName = binding.getRoutingName(); + try { + if (logger.isTraceEnabled()) { + logger.trace("addBinding(" + binding + ") being called"); + } + if (binding.isExclusive()) { + exclusiveBindings.add(binding); + } else { + SimpleString routingName = binding.getRoutingName(); - List bindings = routingNameBindingMap.get(routingName); + List bindings = routingNameBindingMap.get(routingName); - if (bindings == null) { - bindings = new CopyOnWriteArrayList<>(); + if (bindings == null) { + bindings = new CopyOnWriteArrayList<>(); - List oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings); + List oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings); - if (oldBindings != null) { - bindings = oldBindings; + if (oldBindings != null) { + bindings = oldBindings; + } + } + + if (!bindings.contains(binding)) { + bindings.add(binding); } } - if (!bindings.contains(binding)) { - bindings.add(binding); + bindingsMap.put(binding.getID(), binding); + + if (logger.isTraceEnabled()) { + logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings()); } - } - - bindingsMap.put(binding.getID(), binding); - - if (logger.isTraceEnabled()) { - logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings()); + } finally { + updated(); } } + @Override + public void updated(QueueBinding binding) { + updated(); + } + + private void updated() { + version.set(sequenceVersion.incrementAndGet()); + } + @Override public void removeBinding(final Binding binding) { - if (binding.isExclusive()) { - exclusiveBindings.remove(binding); - } else { - SimpleString routingName = binding.getRoutingName(); + try { + if (binding.isExclusive()) { + exclusiveBindings.remove(binding); + } else { + SimpleString routingName = binding.getRoutingName(); - List bindings = routingNameBindingMap.get(routingName); + List bindings = routingNameBindingMap.get(routingName); - if (bindings != null) { - bindings.remove(binding); + if (bindings != null) { + bindings.remove(binding); - if (bindings.isEmpty()) { - routingNameBindingMap.remove(routingName); + if (bindings.isEmpty()) { + routingNameBindingMap.remove(routingName); + } } } - } - bindingsMap.remove(binding.getID()); + bindingsMap.remove(binding.getID()); - if (logger.isTraceEnabled()) { - logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings()); + if (logger.isTraceEnabled()) { + logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings()); + } + } finally { + updated(); } } @@ -267,11 +293,9 @@ public final class BindingsImpl implements Bindings { if (binding.getFilter() == null || binding.getFilter().match(message)) { binding.getBindable().route(message, context); - routed = true; } } - if (!routed) { // Remove the ids now, in order to avoid double check ids = message.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS); @@ -280,30 +304,53 @@ public final class BindingsImpl implements Bindings { SimpleString groupId = message.getGroupID(); if (ids != null) { + context.clear(); routeFromCluster(message, context, ids); } else if (groupingHandler != null && groupRouting && groupId != null) { + context.clear(); routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); } else { - if (logger.isTraceEnabled()) { - logger.trace("Routing message " + message + " on binding=" + this); + // in a optimization, we are reusing the previous context if everything is right for it + // so the simpleRouting will only happen if neededk + if (!context.isReusable(message, version.get())) { + context.clear(); + simpleRouting(message, context); } - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { - SimpleString routingName = entry.getKey(); + } + } + } - List bindings = entry.getValue(); + private void simpleRouting(Message message, RoutingContext context) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("Routing message " + message + " on binding=" + this); + } - if (bindings == null) { - // The value can become null if it's concurrently removed while we're iterating - this is expected - // ConcurrentHashMap behaviour! - continue; - } + // We check at the version before we started routing, + // this is because if something changed in between we want to check the correct version + int currentVersion = version.get(); - Binding theBinding = getNextBinding(message, routingName, bindings); + for (Map.Entry> entry : routingNameBindingMap.entrySet()) { + SimpleString routingName = entry.getKey(); - if (theBinding != null) { - theBinding.route(message, context); - } - } + List bindings = entry.getValue(); + + if (bindings == null) { + // The value can become null if it's concurrently removed while we're iterating - this is expected + // ConcurrentHashMap behaviour! + continue; + } + + Binding theBinding = getNextBinding(message, routingName, bindings); + + if (theBinding != null && theBinding.getFilter() == null && bindings.size() == 1 && theBinding.isLocal()) { + context.setReusable(true, currentVersion); + } else { + // notice that once this is set to false, any calls to setReusable(true) will be moot as the context will ignore it + context.setReusable(false, currentVersion); + } + + if (theBinding != null) { + theBinding.route(message, context); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 79af5d075d..79ab4d3480 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -46,6 +46,11 @@ public class LocalQueueBinding implements QueueBinding { clusterName = queue.getName().concat(nodeID); } + @Override + public boolean isLocal() { + return true; + } + @Override public long getID() { return queue.getID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 5346d6c736..bf12baf32f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -485,82 +485,91 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return null; } - final Queue queue = queueBinding.getQueue(); + Bindings bindingsOnQueue = addressManager.getBindingsForRoutingAddress(queueBinding.getAddress()); - boolean changed = false; + try { - //validate update - if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) { - final int consumerCount = queue.getConsumerCount(); - if (consumerCount > maxConsumers) { - throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount); + final Queue queue = queueBinding.getQueue(); + + boolean changed = false; + + //validate update + if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) { + final int consumerCount = queue.getConsumerCount(); + if (consumerCount > maxConsumers) { + throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount); + } } - } - if (routingType != null) { - final SimpleString address = queue.getAddress(); - final AddressInfo addressInfo = addressManager.getAddressInfo(address); - final EnumSet addressRoutingTypes = addressInfo.getRoutingTypes(); - if (!addressRoutingTypes.contains(routingType)) { - throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes); + if (routingType != null) { + final SimpleString address = queue.getAddress(); + final AddressInfo addressInfo = addressManager.getAddressInfo(address); + final EnumSet addressRoutingTypes = addressInfo.getRoutingTypes(); + if (!addressRoutingTypes.contains(routingType)) { + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes); + } } - } - //atomic update - if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) { - changed = true; - queue.setMaxConsumer(maxConsumers); - } - if (routingType != null && queue.getRoutingType() != routingType) { - changed = true; - queue.setRoutingType(routingType); - } - if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) { - changed = true; - queue.setPurgeOnNoConsumers(purgeOnNoConsumers); - } - if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) { - changed = true; - queue.setExclusive(exclusive); - } - if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) { - changed = true; - queue.setNonDestructive(nonDestructive); - } - if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) { - changed = true; - queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue()); - } - if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) { - changed = true; - queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue()); - } - if (filter != null && !filter.equals(queue.getFilter())) { - changed = true; - queue.setFilter(filter); - } - if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) { - changed = true; - queue.setConfigurationManaged(configurationManaged); - } - if (logger.isDebugEnabled()) { - if (user == null && queue.getUser() != null) { - logger.debug("Ignoring updating Queue to a NULL user"); + //atomic update + if (maxConsumers != null && queue.getMaxConsumers() != maxConsumers.intValue()) { + changed = true; + queue.setMaxConsumer(maxConsumers); + } + if (routingType != null && queue.getRoutingType() != routingType) { + changed = true; + queue.setRoutingType(routingType); + } + if (purgeOnNoConsumers != null && queue.isPurgeOnNoConsumers() != purgeOnNoConsumers.booleanValue()) { + changed = true; + queue.setPurgeOnNoConsumers(purgeOnNoConsumers); + } + if (exclusive != null && queue.isExclusive() != exclusive.booleanValue()) { + changed = true; + queue.setExclusive(exclusive); + } + if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) { + changed = true; + queue.setNonDestructive(nonDestructive); + } + if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) { + changed = true; + queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue()); + } + if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) { + changed = true; + queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue()); + } + if (filter != null && !filter.equals(queue.getFilter())) { + changed = true; + queue.setFilter(filter); + } + if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) { + changed = true; + queue.setConfigurationManaged(configurationManaged); + } + if (logger.isDebugEnabled()) { + if (user == null && queue.getUser() != null) { + logger.debug("Ignoring updating Queue to a NULL user"); + } + } + if (user != null && !user.equals(queue.getUser())) { + changed = true; + queue.setUser(user); } - } - if (user != null && !user.equals(queue.getUser())) { - changed = true; - queue.setUser(user); - } - if (changed) { - final long txID = storageManager.generateID(); - try { - storageManager.updateQueueBinding(txID, queueBinding); - storageManager.commitBindings(txID); - } catch (Throwable throwable) { - storageManager.rollback(txID); - logger.warn(throwable.getMessage(), throwable); - throw throwable; + if (changed) { + final long txID = storageManager.generateID(); + try { + storageManager.updateQueueBinding(txID, queueBinding); + storageManager.commitBindings(txID); + } catch (Throwable throwable) { + storageManager.rollback(txID); + logger.warn(throwable.getMessage(), throwable); + throw throwable; + } + } + } finally { + if (bindingsOnQueue != null) { + bindingsOnQueue.updated(queueBinding); } } @@ -876,6 +885,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding AddressInfo addressInfo = addressManager.getAddressInfo(address); if (bindingMove != null) { + context.clear(); bindingMove.route(message, context); if (addressInfo != null) { addressInfo.incrementRoutedMessageCount(); @@ -1341,7 +1351,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public void done() { - addReferences(refs, direct); + context.processReferences(refs, direct); } }); } @@ -1476,16 +1486,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return true; } - /** - * @param refs - */ - private void addReferences(final List refs, final boolean direct) { - for (MessageReference ref : refs) { - ref.getQueue().addTail(ref, direct); - } - } - - /** + /** * The expiry scanner can't be started until the whole server has been started other wise you may get races */ @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 09d67ad075..87a3c30340 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -263,7 +263,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif Map selectedProtocols = new ConcurrentHashMap<>(); for (Entry entry : selectedProtocolFactories.entrySet()) { - selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors)); + selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getCombinedParams(), incomingInterceptors, outgoingInterceptors)); } acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 43bed889ac..4b5509e087 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1048,7 +1048,7 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 222151, value = "removing consumer which did not handle a message, consumer={0}, message={1}", format = Message.Format.MESSAGE_FORMAT) - void removingBadConsumer(@Cause Throwable e, Consumer consumer, MessageReference reference); + void removingBadConsumer(@Cause Throwable e, Consumer consumer, Object reference); @LogMessage(level = Logger.Level.WARN) @Message(id = 222152, value = "Unable to decrement reference counting on queue", diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index 6df48890da..1dfff29e1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -46,6 +46,10 @@ public interface Consumer { */ HandleStatus handle(MessageReference reference) throws Exception; + /** wakes up internal threads to deliver more messages */ + default void promptDelivery() { + } + /** * This will proceed with the actual delivery. * Notice that handle should hold a readLock and proceedDelivery should release the readLock @@ -80,4 +84,8 @@ public interface Consumer { /** an unique sequential ID for this consumer */ long sequentialID(); + + default void errorProcessing(Throwable e, MessageReference reference) { + + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 2e2fb8d85a..886af36285 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -44,6 +44,8 @@ public interface MessageReference { SimpleString getLastValueProperty(); + void setCallback(MessageReferenceCallback callback); + /** * We define this method aggregation here because on paging we need to hold the original estimate, * so we need to perform some extra steps on paging. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java new file mode 100644 index 0000000000..4804ddee13 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server; + +/** This is to be used in cases where a message delivery happens on an executor. + * Most MessageReference implementations will allow execution, and if it does, + * and the protocol requires an execution per message, this callback may be used. + * + * At the time of this implementation only AMQP was used. */ +public interface MessageReferenceCallback { + void executeDelivery(MessageReference reference); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 8a120ea012..031d01a58a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -53,6 +53,10 @@ public interface Queue extends Bindable,CriticalComponent { void setRoutingType(RoutingType routingType); + /** the current queue and consumer settings will allow use of the Reference Execution and callback. + * This is because */ + boolean allowsReferenceCallback(); + boolean isDurable(); /** @@ -392,4 +396,8 @@ public interface Queue extends Bindable,CriticalComponent { /** This is to perform a check on the counter again */ void recheckRefCount(OperationContext context); + default void errorProcessing(Consumer consumer, Throwable t, MessageReference messageReference) { + + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index 9b09256103..151aa41d94 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -26,6 +26,24 @@ import org.apache.activemq.artemis.core.transaction.Transaction; public interface RoutingContext { + /* + This will return true if the RoutingContext can be reused + false if it cannot + null, if we don't know. + + + Once false, it can't be set to true + */ + boolean isReusable(); + + int getPreviousBindingsVersion(); + + SimpleString getPreviousAddress(); + + void setReusable(boolean reusable); + + RoutingContext setReusable(boolean reusable, int version); + Transaction getTransaction(); void setTransaction(Transaction transaction); @@ -54,5 +72,16 @@ public interface RoutingContext { SimpleString getAddress(Message message); + SimpleString getAddress(); + RoutingType getRoutingType(); + + RoutingType getPreviousRoutingType(); + + void processReferences(List refs, boolean direct); + + boolean isReusable(Message message, int version); + + + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index eb055631e4..4d3591954b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -31,6 +31,10 @@ public interface ServerConsumer extends Consumer, ConsumerInfo { void fireSlowConsumer(); + /** the current queue settings will allow use of the Reference Execution and callback. + * This is because */ + boolean allowReferenceCallback(); + /** * this is to be used with anything specific on a protocol head. */ @@ -105,6 +109,4 @@ public interface ServerConsumer extends Consumer, ConsumerInfo { long getCreationTime(); String getSessionID(); - - void promptDelivery(); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 37442b2c42..cfc3e014d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -22,6 +22,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.core.Message; @@ -43,6 +44,8 @@ public interface ServerSession extends SecurityAuth { Object getConnectionID(); + Executor getSessionExecutor(); + /** * Certain protocols may create an internal session that shouldn't go through security checks. * make sure you don't expose this property through any protocol layer as that would be a security breach @@ -241,12 +244,26 @@ public interface ServerSession extends SecurityAuth { boolean direct, boolean noAutoCreateQueue) throws Exception; + RoutingStatus send(Transaction tx, + Message message, + boolean direct, + boolean noAutoCreateQueue, + RoutingContext routingContext) throws Exception; + + RoutingStatus doSend(Transaction tx, Message msg, SimpleString originalAddress, boolean direct, boolean noAutoCreateQueue) throws Exception; + RoutingStatus doSend(Transaction tx, + Message msg, + SimpleString originalAddress, + boolean direct, + boolean noAutoCreateQueue, + RoutingContext routingContext) throws Exception; + RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception; RoutingStatus send(Message message, boolean direct) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 315b926684..2fd70b605f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -146,6 +147,11 @@ public class LastValueQueue extends QueueImpl { } } + @Override + public boolean allowsReferenceCallback() { + return false; + } + private void replaceLVQMessage(MessageReference ref, HolderReference hr) { MessageReference oldRef = hr.getReference(); @@ -231,6 +237,11 @@ public class LastValueQueue extends QueueImpl { this.ref = ref; } + @Override + public void setCallback(MessageReferenceCallback callback) { + // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables + } + MessageReference getReference() { return ref; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 2401c4a405..6d32c46800 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -30,7 +31,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl; /** * Implementation of a MessageReference */ -public class MessageReferenceImpl extends LinkedListImpl.Node implements MessageReference { +public class MessageReferenceImpl extends LinkedListImpl.Node implements MessageReference, Runnable { private static final AtomicIntegerFieldUpdater DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater .newUpdater(MessageReferenceImpl.class, "deliveryCount"); @@ -54,6 +55,8 @@ public class MessageReferenceImpl extends LinkedListImpl.Node refs, final boolean direct) { + internalprocessReferences(refs, direct); + } + + private void internalprocessReferences(final List refs, final boolean direct) { + for (MessageReference ref : refs) { + ref.getQueue().addTail(ref, direct); + } + } + + @Override public void addQueueWithAck(SimpleString address, Queue queue) { addQueue(address, queue); @@ -82,6 +145,11 @@ public final class RoutingContextImpl implements RoutingContext { return listing == null ? false : listing.isAlreadyAcked(queue); } + @Override + public boolean isReusable(Message message, int version) { + return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType && getPreviousBindingsVersion() == version; + } + @Override public void setAddress(SimpleString address) { this.address = address; @@ -100,11 +168,21 @@ public final class RoutingContextImpl implements RoutingContext { return address; } + @Override + public SimpleString getAddress() { + return address; + } + @Override public RoutingType getRoutingType() { return routingType; } + @Override + public RoutingType getPreviousRoutingType() { + return previousRoutingType; + } + @Override public RouteContextList getContextListing(SimpleString address) { RouteContextList listing = map.get(address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index f7a89d7a66..19e395646b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -256,6 +256,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // ---------------------------------------------------------------------- + @Override + public boolean allowReferenceCallback() { + if (browseOnly) { + return false; + } else { + return messageQueue.allowsReferenceCallback(); + } + } + @Override public long sequentialID() { return sequentialID; @@ -346,6 +355,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return callback.supportsDirectDelivery(); } + @Override + public void errorProcessing(Throwable e, MessageReference deliveryObject) { + messageQueue.errorProcessing(this, e, deliveryObject); + } @Override public HandleStatus handle(final MessageReference ref) throws Exception { @@ -582,13 +595,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { public void forceDelivery(final long sequence) { forceDelivery(sequence, () -> { Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); + MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue); + reference.setDeliveryCount(0); forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); applyPrefixForLegacyConsumer(forcedDeliveryMessage); - callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); - + callback.sendMessage(reference, forcedDeliveryMessage, ServerConsumerImpl.this, 0); }); } @@ -949,7 +963,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } catch (ActiveMQException e) { if (startedTransaction) { tx.rollback(); - } else { + } else if (tx != null) { tx.markAsRollbackOnly(e); } throw e; @@ -958,7 +972,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage()); if (startedTransaction) { tx.rollback(); - } else { + } else if (tx != null) { tx.markAsRollbackOnly(hqex); } throw hqex; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 3bc60f2084..11b096baf6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.Closeable; @@ -190,6 +191,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private Set closeables; + private final Executor sessionExecutor; + public ServerSessionImpl(final String name, final String username, final String password, @@ -264,6 +267,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { remotingConnection.addFailureListener(this); this.context = context; + this.sessionExecutor = server.getExecutorFactory().getExecutor(); + if (!xa) { tx = newTransaction(); } @@ -283,6 +288,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { this.closeables.add(closeable); } + @Override + public Executor getSessionExecutor() { + return sessionExecutor; + } + @Override public void disableSecurity() { this.securityEnabled = false; @@ -1467,12 +1477,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return lsm; } - @Override public synchronized RoutingStatus send(Transaction tx, Message msg, final boolean direct, boolean noAutoCreateQueue) throws Exception { + return send(tx, msg, direct, noAutoCreateQueue, routingContext); + } + + @Override + public synchronized RoutingStatus send(Transaction tx, + Message msg, + final boolean direct, + boolean noAutoCreateQueue, + RoutingContext routingContext) throws Exception { final Message message; if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) { @@ -1527,7 +1545,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { result = handleManagementMessage(tx, message, direct); } else { - result = doSend(tx, message, address, direct, noAutoCreateQueue); + result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext); } } catch (Exception e) { @@ -1766,7 +1784,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } reply.setAddress(replyTo); - doSend(tx, reply, null, direct, false); + doSend(tx, reply, null, direct, false, routingContext); } return RoutingStatus.OK; @@ -1823,12 +1841,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener { theTx.rollback(); } + @Override public synchronized RoutingStatus doSend(final Transaction tx, final Message msg, final SimpleString originalAddress, final boolean direct, final boolean noAutoCreateQueue) throws Exception { + return doSend(tx, msg, originalAddress, direct, noAutoCreateQueue, routingContext); + } + + + @Override + public synchronized RoutingStatus doSend(final Transaction tx, + final Message msg, + final SimpleString originalAddress, + final boolean direct, + final boolean noAutoCreateQueue, + final RoutingContext routingContext) throws Exception { RoutingStatus result = RoutingStatus.OK; @@ -1861,6 +1891,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } if (tx == null || autoCommitSends) { + routingContext.setTransaction(null); } else { routingContext.setTransaction(tx); } @@ -1880,7 +1911,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { value.getB().incrementAndGet(); } } finally { - routingContext.clear(); + if (!routingContext.isReusable()) { + routingContext.clear(); + } } return result; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 0ef7804290..2e3b691dca 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -793,6 +793,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public boolean allowsReferenceCallback() { + return false; + } + @Override public int getConsumersBeforeDispatch() { return 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 05e763f2b9..550adbe6ed 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -250,7 +250,7 @@ public class AddressingTest extends ActiveMQTestBase { // there are no consumers so no messages should be routed to the queue producer.send(session.createMessage(true)); - assertEquals(0, queue.getMessageCount()); + Wait.assertEquals(0, queue::getMessageCount); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 4529efbd4d..cfcbc209cb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -138,7 +138,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - assertEquals(1, queueView.getMessageCount()); + Wait.assertEquals(1, queueView::getMessageCount); // Now try and get the message AmqpReceiver receiver = session.createReceiver(getQueueName()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java index 2f65dfb20d..c6119a188b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -74,7 +75,8 @@ public class AmqpFlowControlFailTest extends JMSClientTestSupport { } receiver.close(); session2.close(); - assertEquals(1000, sender.getSender().getCredit()); + + Wait.assertEquals(1000, sender.getSender()::getCredit); for (int i = 0; i < 1000; i++) { final AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[100]; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index ea62df8daf..9dc1138798 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -47,6 +47,7 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Sender; import org.jgroups.util.UUID; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1154,4 +1155,52 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver.close(); connection.close(); } + + + + @Test(timeout = 60000) + public void testReceiveRejecting() throws Exception { + final int MSG_COUNT = 1000; + + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + final String address = getQueueName(); + + + AmqpSender sender = session.createSender(address); + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg" + i); + sender.send(message); + } + + + + Queue queueView = getProxyToQueue(address); + + for (int i = 0; i < MSG_COUNT; i++) { + final AmqpReceiver receiver = session.createReceiver(address); + + receiver.flow(MSG_COUNT); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(received); + Assert.assertEquals("msg" + i, received.getMessageId()); + received.accept(); + receiver.close(); + } + final AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(MSG_COUNT); + + Assert.assertNull(receiver.receive(1, TimeUnit.MILLISECONDS)); + + + Wait.assertEquals(0, queueView::getDeliveringCount); + + connection.close(); + } + + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 96d5f1c9d5..e35635d0d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -766,7 +766,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // We should have now drained the Queue receiver.flow(1); - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + AmqpMessage message = receiver.receive(1, TimeUnit.SECONDS); if (message != null) { System.out.println("Read message: " + message.getApplicationProperty("msgId")); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 50ab38966d..7b1f155f70 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -324,7 +324,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue consumerQueue = consumerSession.createQueue(queueName); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); - TextMessage msg = (TextMessage) consumer.receive(200); + TextMessage msg = (TextMessage) consumer.receive(2000); assertNotNull(msg); consumer.close(); } @@ -336,7 +336,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue consumerQueue = consumerSession.createQueue(queueName); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); - TextMessage msg = (TextMessage) consumer.receive(200); + TextMessage msg = (TextMessage) consumer.receive(2000); assertNull(msg); consumer.close(); } @@ -349,8 +349,8 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { MessageConsumer consumer = createConsumer(consumerConnection, queueName); MessageConsumer consumer2 = createConsumer(consumerConnection2, queueName); - TextMessage msg = (TextMessage) consumer.receive(200); - TextMessage msg2 = (TextMessage) consumer2.receive(200); + TextMessage msg = (TextMessage) consumer.receive(2000); + TextMessage msg2 = (TextMessage) consumer2.receive(2000); assertNotNull(msg); assertNotNull(msg2); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index 1b790a072d..58bf2d3d5a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -45,6 +45,11 @@ public class DummyServerConsumer implements ServerConsumer { } + @Override + public boolean allowReferenceCallback() { + return false; + } + @Override public Object getProtocolData() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index f103d4182f..39200e68e3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -500,10 +500,16 @@ public class ConsumerTest extends ActiveMQTestBase { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Queue queue = session.createQueue(QUEUE.toString()); MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + if (durable) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } else { + + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } long time = System.currentTimeMillis(); - int NUMBER_OF_MESSAGES = 100; + int NUMBER_OF_MESSAGES = durable ? 500 : 5000; for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { TextMessage msg = session.createTextMessage("hello " + i); msg.setIntProperty("mycount", i); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index dc57a12aa2..3e64ac5fc7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -241,6 +241,11 @@ public class HangConsumerTest extends ActiveMQTestBase { addressSettingsRepository, executor, server, null); } + @Override + public boolean allowsReferenceCallback() { + return false; + } + @Override public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter) throws Exception { latchDelete.countDown(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index bea116770f..15ac691f6c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -83,6 +83,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public boolean allowsReferenceCallback() { + return false; + } + @Override public boolean isExclusive() { // no-op diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 40fadf9d9f..01402635dd 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; @@ -334,6 +335,10 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { public void unproposed(SimpleString groupID) { } + @Override + public void updated(QueueBinding binding) { + } + @Override public boolean redistribute(Message message, Queue originatingQueue, From 8281e3b58f3f64abe2c7321aeb2c9de07d5160ab Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 17 Dec 2018 09:12:19 -0500 Subject: [PATCH 3/3] ARTEMIS-2205 Optimizing some Lambda usages https://issues.apache.org/jira/browse/ARTEMIS-2205 --- .../proton/ProtonServerSenderContext.java | 16 ++++++----- .../paging/cursor/PagedReferenceImpl.java | 26 ++++++++++-------- .../artemis/core/server/MessageReference.java | 11 +++++++- .../core/server/MessageReferenceCallback.java | 27 ------------------- .../core/server/impl/LastValueQueue.java | 5 ++-- .../server/impl/MessageReferenceImpl.java | 25 +++++++++-------- 6 files changed, 51 insertions(+), 59 deletions(-) delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 843d1fedcc..4caf2d0047 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; @@ -53,6 +52,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -79,7 +79,7 @@ import org.jboss.logging.Logger; /** * This is the Equivalent for the ServerConsumer */ -public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback { +public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class); @@ -92,7 +92,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback(); private Consumer brokerConsumer; - + private ReadyListener onflowControlReady; protected final AMQPSessionContext protonSession; protected final Sender sender; protected final AMQPConnectionContext connection; @@ -117,6 +117,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr * to sync the credits we have versus the credits that are being held in proton * */ private final Object creditsLock = new Object(); + private final java.util.function.Consumer executeDelivery; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, @@ -127,6 +128,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr this.sender = sender; this.protonSession = protonSession; this.sessionSPI = server; + this.executeDelivery = this::executeDelivery; } public Object getBrokerConsumer() { @@ -164,7 +166,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } public boolean hasCredits() { - if (!connection.flowControl(brokerConsumer::promptDelivery)) { + if (!connection.flowControl(onflowControlReady)) { return false; } @@ -488,6 +490,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); + onflowControlReady = brokerConsumer::promptDelivery; } catch (ActiveMQAMQPResourceLimitExceededException e1) { throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); } catch (ActiveMQSecurityException e) { @@ -747,7 +750,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) { - messageReference.setCallback(this); + messageReference.onDelivery(executeDelivery); connection.runNow((Runnable)messageReference); } else { connection.runNow(() -> executeDelivery(messageReference)); @@ -760,8 +763,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - @Override - public void executeDelivery(MessageReference messageReference) { + private void executeDelivery(MessageReference messageReference) { try { if (sender.getLocalState() == EndpointState.CLOSED) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index e05a9af550..893e3a746e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -18,13 +18,13 @@ package org.apache.activemq.artemis.core.paging.cursor; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; @@ -75,7 +75,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private long messageSize = -1; - private MessageReferenceCallback callback; + private Consumer onDelivery; @Override public Object getProtocolData() { @@ -93,22 +93,26 @@ public class PagedReferenceImpl extends LinkedListImpl.Node } @Override - public void setCallback(MessageReferenceCallback callback) { - this.callback = callback; + public void onDelivery(Consumer onDelivery) { + assert this.onDelivery == null; + this.onDelivery = onDelivery; } + /** + * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any. + */ @Override public void run() { - MessageReferenceCallback callback = this.callback; - - try { - if (callback != null) { - callback.executeDelivery(this); + final Consumer onDelivery = this.onDelivery; + if (onDelivery != null) { + try { + onDelivery.accept(this); + } finally { + this.onDelivery = null; } - } finally { - this.callback = null; } } + @Override public synchronized PagedMessage getPagedMessage() { PagedMessage returnMessage = message != null ? message.get() : null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 886af36285..905f93d7c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.server; +import java.util.function.Consumer; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -44,7 +46,14 @@ public interface MessageReference { SimpleString getLastValueProperty(); - void setCallback(MessageReferenceCallback callback); + /** + * This is to be used in cases where a message delivery happens on an executor. + * Most MessageReference implementations will allow execution, and if it does, + * and the protocol requires an execution per message, this callback may be used. + * + * At the time of this implementation only AMQP was used. + */ + void onDelivery(Consumer callback); /** * We define this method aggregation here because on paging we need to hold the original estimate, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java deleted file mode 100644 index 4804ddee13..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReferenceCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server; - -/** This is to be used in cases where a message delivery happens on an executor. - * Most MessageReference implementations will allow execution, and if it does, - * and the protocol requires an execution per message, this callback may be used. - * - * At the time of this implementation only AMQP was used. */ -public interface MessageReferenceCallback { - void executeDelivery(MessageReference reference); -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 2fd70b605f..0ebd7a80d4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -33,7 +34,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -50,6 +50,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; * This is useful for example, for stock prices, where you're only interested in the latest value * for a particular stock */ +@SuppressWarnings("ALL") public class LastValueQueue extends QueueImpl { private final Map map = new ConcurrentHashMap<>(); @@ -238,7 +239,7 @@ public class LastValueQueue extends QueueImpl { } @Override - public void setCallback(MessageReferenceCallback callback) { + public void onDelivery(Consumer callback) { // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 6d32c46800..12acffd579 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -17,12 +17,12 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.MessageReferenceCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -55,7 +55,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node onDelivery; // Static -------------------------------------------------------- @@ -88,20 +88,23 @@ public class MessageReferenceImpl extends LinkedListImpl.Node onDelivery) { + assert this.onDelivery == null; + this.onDelivery = onDelivery; } + /** + * It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any. + */ @Override public void run() { - MessageReferenceCallback callback = this.callback; - - try { - if (callback != null) { - callback.executeDelivery(this); + final Consumer onDelivery = this.onDelivery; + if (onDelivery != null) { + try { + onDelivery.accept(this); + } finally { + this.onDelivery = null; } - } finally { - this.callback = null; } }