diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PendingTask.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PendingTask.java new file mode 100644 index 0000000000..fade097661 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PendingTask.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +/** This is similar to a Runnable, except that we throw exceptions. + * In certain places we need to complete tasks after deliveries, + * and this will take care of those situations. */ +public abstract class PendingTask { + + public abstract void run() throws Exception; + +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index cdaf34fd84..40eb175e6e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -100,7 +100,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory(). - createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getScheduledPool()); + createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); Executor executor = server.getExecutorFactory().getExecutor(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 3da1629fcd..12aad2212e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -115,6 +115,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se (String) null, this, null, true); } + @Override + public void afterDelivery() throws Exception { + + } + @Override public void start() { @@ -214,38 +219,57 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se @Override public void commitCurrentTX() throws Exception { - serverSession.commit(); + recoverContext(); + try { + serverSession.commit(); + } + finally { + resetContext(); + } } @Override public void rollbackCurrentTX() throws Exception { - serverSession.rollback(false); + recoverContext(); + try { + serverSession.rollback(false); + } + finally { + resetContext(); + } } @Override public void close() throws Exception { - closeExecutor.execute(new Runnable() { - @Override - public void run() { - try { - serverSession.close(false); - } - catch (Exception e) { - // TODO Logger - e.printStackTrace(); - } - } - }); + recoverContext(); + try { + serverSession.close(false); + } + finally { + resetContext(); + } } @Override public void ack(Object brokerConsumer, Object message) throws Exception { - ((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID()); + recoverContext(); + try { + ((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID()); + } + finally { + resetContext(); + } } @Override public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception { - ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts); + recoverContext(); + try { + ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts); + } + finally { + resetContext(); + } } @Override @@ -267,25 +291,40 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se message.setAddress(new SimpleString(address)); } - serverSession.send(message, false); + recoverContext(); - manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { - @Override - public void done() { - synchronized (connection.getLock()) { - delivery.settle(); - connection.flush(); - } - } + try { + serverSession.send(message, false); - @Override - public void onError(int errorCode, String errorMessage) { - synchronized (connection.getLock()) { - receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); - connection.flush(); + manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { + @Override + public void done() { + synchronized (connection.getLock()) { + delivery.settle(); + connection.flush(); + } } - } - }); + + @Override + public void onError(int errorCode, String errorMessage) { + synchronized (connection.getLock()) { + receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); + connection.flush(); + } + } + }); + } + finally { + resetContext(); + } + } + + private void resetContext() { + manager.getServer().getStorageManager().setContext(null); + } + + private void recoverContext() { + manager.getServer().getStorageManager().setContext(serverSession.getSessionContext()); } @Override diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index cf323d4781..356dc73727 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -76,6 +76,12 @@ public class MQTTSessionCallback implements SessionCallback { } } + @Override + public void afterDelivery() throws Exception { + + } + + @Override public boolean hasCredits(ServerConsumer consumerID) { return true; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index b0ec7ed3c6..3e7afa5b91 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; @@ -45,8 +46,9 @@ public class AMQServerConsumer extends ServerConsumerImpl { boolean strictUpdateDeliveryCount, ManagementService managementService, boolean supportLargeMessage, - Integer credits) throws Exception { - super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits); + Integer credits, + final ActiveMQServer server) throws Exception { + super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); } public void setBrowserListener(BrowserListener listener) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index c414319347..0a3804ce81 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -355,7 +355,7 @@ public class AMQServerSession extends ServerSessionImpl { ManagementService managementService2, boolean supportLargeMessage, Integer credits) throws Exception { - return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits); + return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, this.server); } public AMQServerConsumer getConsumer(long nativeId) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 3e642c60cd..926aebd8d4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -146,6 +146,11 @@ public class AMQSession implements SessionCallback { started.set(true); } + @Override + public void afterDelivery() throws Exception { + + } + @Override public boolean isWritable(ReadyListener callback) { return connection.isWritable(callback); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java index cafe8f1a21..d72fb016bd 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java @@ -16,6 +16,7 @@ */ package org.proton.plug; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; public abstract class AMQPConnectionContextFactory { @@ -27,10 +28,11 @@ public abstract class AMQPConnectionContextFactory { int idleTimeout, int maxFrameSize, int channelMax, + Executor dispatchExecutor, ScheduledExecutorService scheduledPool); /** * @return */ - public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool); + public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool); } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index 9486a1b5f2..fd1ae9915a 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -18,6 +18,7 @@ package org.proton.plug.context; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -45,7 +46,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); - protected ProtonHandler handler = ProtonHandler.Factory.create(); + protected final ProtonHandler handler; protected AMQPConnectionCallback connectionCallback; private final ScheduledExecutorService scheduledPool; @@ -54,18 +55,20 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl protected LocalListener listener = new LocalListener(); - public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { - this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool); + public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool); } public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax, + Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { this.connectionCallback = connectionCallback; this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); + this.handler = ProtonHandler.Factory.create(dispatchExecutor); Transport transport = handler.getTransport(); if (idleTimeout > 0) { transport.setIdleTimeout(idleTimeout); @@ -182,7 +185,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { long nextKeepAliveTime = handler.tick(true); flushBytes(); - if (nextKeepAliveTime > 0) { + if (nextKeepAliveTime > 0 && scheduledPool != null) { scheduledPool.schedule(new Runnable() { @Override public void run() { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java index 531b182619..76a7da9cc2 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java @@ -29,20 +29,22 @@ import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.context.ProtonInitializable; import org.proton.plug.util.FutureRunnable; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext { - public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { - super(connectionCallback, scheduledPool); + public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + super(connectionCallback, dispatchExecutor, scheduledPool); } public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, int idleTimeout, int maxFrameSize, int channelMax, + Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { - super(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool); + super(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); } // Maybe a client interface? diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java index 2beb95c841..88eb991eaf 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java @@ -20,6 +20,7 @@ import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionContextFactory; import org.proton.plug.AMQPConnectionCallback; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory { @@ -31,8 +32,8 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF } @Override - public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { - return new ProtonClientConnectionContext(connectionCallback, scheduledPool); + public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + return new ProtonClientConnectionContext(connectionCallback, dispatchExecutor, scheduledPool); } @@ -41,7 +42,8 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF int idleTimeout, int maxFrameSize, int channelMax, + Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { - return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool); + return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index 606a3a30df..db04a8a093 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -28,20 +28,22 @@ import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonSessionContext; import org.proton.plug.exceptions.ActiveMQAMQPException; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext { - public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, ScheduledExecutorService scheduledPool) { - super(connectionSP, scheduledPool); + public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + super(connectionSP, dispatchExecutor, scheduledPool); } public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, int idleTimeout, int maxFrameSize, int channelMax, + Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { - super(connectionSP, idleTimeout, maxFrameSize, channelMax, scheduledPool); + super(connectionSP, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java index 893c47981c..81dae32b41 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java @@ -20,6 +20,7 @@ import org.proton.plug.AMQPConnectionContextFactory; import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPServerConnectionContext; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT; @@ -35,8 +36,8 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF } @Override - public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { - return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, scheduledPool); + public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool); } @Override @@ -44,7 +45,8 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF int idleTimeout, int maxFrameSize, int channelMax, + Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { - return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool); + return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java index 1ae0dffd4c..d02546bbf6 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java @@ -16,6 +16,8 @@ */ package org.proton.plug.handler; +import java.util.concurrent.Executor; + import io.netty.buffer.ByteBuf; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Transport; @@ -31,10 +33,9 @@ public interface ProtonHandler { long tick(boolean firstTick); - public static final class Factory { - - public static ProtonHandler create() { - return new ProtonHandlerImpl(); + final class Factory { + public static ProtonHandler create(Executor dispatchExecutor) { + return new ProtonHandlerImpl(dispatchExecutor); } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java index cd5d1572e3..08cdf57ef0 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; @@ -54,6 +55,14 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand private final Collector collector = Proton.collector(); + private final Executor dispatchExecutor; + + private final Runnable dispatchRunnable = new Runnable() { + public void run() { + dispatch(); + } + }; + private ArrayList handlers = new ArrayList<>(); private Sasl serverSasl; @@ -68,18 +77,14 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand private SASLResult saslResult; - /** - * If dispatching a dispatch call is ignored to avoid infinite stack loop - */ - private boolean dispatching = false; - protected volatile boolean dataReceived; protected boolean receivedFirstPacket = false; private int offset = 0; - public ProtonHandlerImpl() { + public ProtonHandlerImpl(Executor dispatchExecutor) { + this.dispatchExecutor = dispatchExecutor; this.creationTime = System.currentTimeMillis(); transport.bind(connection); connection.collect(collector); @@ -271,20 +276,9 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand checkServerSASL(); - if (dispatching) { - return; - } - - dispatching = true; - } - try { - dispatch(); - } - finally { - dispatching = false; - } + dispatchExecutor.execute(dispatchRunnable); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java index 46d7c6418f..ea84e43229 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java @@ -16,6 +16,8 @@ */ package org.proton.plug.context; +import java.util.concurrent.Executors; + import io.netty.buffer.ByteBuf; import org.apache.qpid.proton.engine.Connection; @@ -48,7 +50,7 @@ public class AbstractConnectionContextTest { private class TestConnectionContext extends AbstractConnectionContext { public TestConnectionContext(AMQPConnectionCallback connectionCallback) { - super(connectionCallback, null); + super(connectionCallback, Executors.newSingleThreadExecutor(), null); } @Override diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java index 6a84a9565c..0843c49396 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java @@ -16,6 +16,8 @@ */ package org.proton.plug.test.invm; +import java.util.concurrent.Executors; + import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.context.client.ProtonClientConnectionContext; import org.proton.plug.test.minimalclient.Connector; @@ -32,6 +34,6 @@ public class InVMTestConnector implements Connector { @Override public AMQPClientConnectionContext connect(String host, int port) throws Exception { - return new ProtonClientConnectionContext(new ProtonINVMSPI(), null); + return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(), null); } } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java index 68a27893cc..8e2177ddb0 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java @@ -35,7 +35,7 @@ public class ProtonINVMSPI implements AMQPConnectionCallback { AMQPConnectionContext returningConnection; - ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), null); + ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(), null); final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(); diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java index 1e12410340..122477b72a 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java @@ -18,6 +18,7 @@ package org.proton.plug.test.minimalclient; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.concurrent.Executors; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; @@ -59,7 +60,7 @@ public class SimpleAMQPConnector implements Connector { AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel()); - final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, null); + final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(), null); future.channel().pipeline().addLast(new ChannelDuplexHandler() { @Override diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java index a1b1462f5c..3677035035 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java @@ -19,6 +19,7 @@ package org.proton.plug.test.minimalserver; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; +import java.util.concurrent.Executors; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -124,7 +125,7 @@ public class MinimalServer { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), null); + connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(), null); //ctx.read(); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index dd338f6e34..e94e0bc47a 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; import java.util.zip.Inflater; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -43,6 +45,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; +import org.apache.activemq.artemis.utils.PendingTask; import org.apache.activemq.artemis.utils.UUIDGenerator; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -57,6 +60,8 @@ public class StompSession implements SessionCallback { private final OperationContext sessionContext; + private final BlockingDeque afterDeliveryTasks = new LinkedBlockingDeque<>(); + private final Map subscriptions = new ConcurrentHashMap<>(); // key = message ID, value = consumer ID @@ -100,7 +105,15 @@ public class StompSession implements SessionCallback { } @Override - public int sendMessage(ServerMessage serverMessage, ServerConsumer consumer, int deliveryCount) { + public void afterDelivery() throws Exception { + PendingTask task; + while ((task = afterDeliveryTasks.poll()) != null) { + task.run(); + } + } + + @Override + public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; ServerMessage newServerMessage = serverMessage; try { @@ -144,9 +157,20 @@ public class StompSession implements SessionCallback { if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) { if (manager.send(connection, frame)) { - //we ack and commit only if the send is successful - session.acknowledge(consumer.getID(), newServerMessage.getMessageID()); - session.commit(); + final long messageID = newServerMessage.getMessageID(); + final long consumerID = consumer.getID(); + + // this will be called after the delivery is complete + // we can't call sesison.ack within the delivery + // as it could dead lock. + afterDeliveryTasks.offer(new PendingTask() { + @Override + public void run() throws Exception { + //we ack and commit only if the send is successful + session.acknowledge(consumerID, messageID); + session.commit(); + } + }); } } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 4fc7879055..0b74fd71d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -98,6 +98,12 @@ public final class CoreSessionCallback implements SessionCallback { channel.send(packet); } + + @Override + public void afterDelivery() throws Exception { + + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { Packet packet = new SessionProducerCreditsFailMessage(credits, address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index aca3c3fb15..4f82f2e9c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -756,9 +756,10 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void broadcastGroupClosed(@Cause Exception e); + @LogMessage(level = Logger.Level.WARN) - @Message(id = 222109, value = "NodeID={0} is not available on the topology. Retrying the connection to that node now", format = Message.Format.MESSAGE_FORMAT) - void nodeNotAvailable(String targetNodeID); + @Message(id = 222109, value = "Timed out waiting for write lock on consumer. Check the Thread dump", format = Message.Format.MESSAGE_FORMAT) + void timeoutLockingConsumer(); @LogMessage(level = Logger.Level.WARN) @Message(id = 222110, value = "no queue IDs defined!, originalMessage = {0}, copiedMessage = {1}, props={2}", diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 1f2be699ec..422d324809 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -86,6 +88,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private Object protocolContext; + private final ActiveMQServer server; + /** * We get a readLock when a message is handled, and return the readLock when the message is finally delivered * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished @@ -148,8 +152,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final SessionCallback callback, final boolean preAcknowledge, final boolean strictUpdateDeliveryCount, - final ManagementService managementService) throws Exception { - this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null); + final ManagementService managementService, + final ActiveMQServer server) throws Exception { + this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null, server); } public ServerConsumerImpl(final long id, @@ -164,7 +169,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final boolean strictUpdateDeliveryCount, final ManagementService managementService, final boolean supportLargeMessage, - final Integer credits) throws Exception { + final Integer credits, + final ActiveMQServer server) throws Exception { this.id = id; this.filter = filter; @@ -209,6 +215,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { availableCredits.set(credits); } } + + this.server = server; } @Override @@ -378,7 +386,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } finally { lockDelivery.readLock().unlock(); + callback.afterDelivery(); } + } @Override @@ -559,12 +569,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { synchronized (lock) { - lockDelivery.writeLock().lock(); + boolean locked = lockDelivery(); + + // This is to make sure nothing would sneak to the client while started = false + // the client will stop the session and perform a rollback in certain cases. + // in case something sneaks to the client you could get to messaging delivering forever until + // you restart the server try { this.started = browseOnly || started; } finally { - lockDelivery.writeLock().unlock(); + if (locked) { + lockDelivery.writeLock().unlock(); + } } } @@ -574,22 +591,39 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private boolean lockDelivery() { + try { + if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) { + ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); + if (server != null) { + server.threadDump(); + } + return false; + } + return true; + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return false; + } + } + @Override public void setTransferring(final boolean transferring) { synchronized (lock) { - this.transferring = transferring; + // This is to make sure that the delivery process has finished any pending delivery + // otherwise a message may sneak in on the client while we are trying to stop the consumer + boolean locked = lockDelivery(); + try { + this.transferring = transferring; + } + finally { + if (locked) { + lockDelivery.writeLock().unlock(); + } + } } - // This is to make sure that the delivery process has finished any pending delivery - // otherwise a message may sneak in on the client while we are trying to stop the consumer - try { - lockDelivery.writeLock().lock(); - } - finally { - lockDelivery.writeLock().unlock(); - } - - // Outside the lock if (transferring) { // And we must wait for any force delivery to be executed - this is executed async so we add a future to the diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index c3a979b7fb..d628bde87b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -479,7 +479,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ManagementService managementService2, boolean supportLargeMessage, Integer credits) throws Exception { - return new ServerConsumerImpl(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits); + return new ServerConsumerImpl(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 83c4e93cee..4b27bc434d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -28,6 +28,10 @@ public interface SessionCallback { */ boolean hasCredits(ServerConsumer consumerID); + /** This can be used to complete certain operations outside of the lock, + * like acks or other operations. */ + void afterDelivery() throws Exception; + void sendProducerCreditsMessage(int credits, SimpleString address); void sendProducerCreditsFailMessage(int credits, SimpleString address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 5c0e2724a6..d8aa4aceb1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -488,6 +488,11 @@ public class HangConsumerTest extends ActiveMQTestBase { return true; } + @Override + public void afterDelivery() throws Exception { + + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { targetCallback.sendProducerCreditsFailMessage(credits, address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 5d628f9d38..b32f1fa730 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -136,11 +136,6 @@ public class ProtonTest extends ActiveMQTestBase { connection.close(); } - for (long timeout = System.currentTimeMillis() + 1000; timeout > System.currentTimeMillis() && server.getRemotingService().getConnections().size() != 0; ) { - Thread.sleep(1); - } - - Assert.assertEquals("The remoting connection wasn't removed after connection.close()", 0, server.getRemotingService().getConnections().size()); server.stop(); } finally { @@ -633,6 +628,7 @@ public class ProtonTest extends ActiveMQTestBase { for (int i = 0; i < numMessages; i++) { System.out.println("Sending " + i); TextMessage message = session.createTextMessage("text" + i); + message.setStringProperty("text", "text" + i); p.send(message); }