diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 8ac7c9ee50..a05a8af123 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -587,7 +587,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery @Override public ClientProtocolManager newProtocolManager() { - return getProtocolManagerFactory().newProtocolManager(); + if (threadPool == null) { + throw new NullPointerException("No Thread Pool"); + } + return getProtocolManagerFactory().newProtocolManager().setExecutor(new OrderedExecutor(threadPool)); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index 9eb0ee548b..b1d6cc8d59 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -82,6 +83,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { private ClientSessionFactoryInternal factoryInternal; + private Executor executor; + /** * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch} */ @@ -157,6 +160,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } + @Override + public ActiveMQClientProtocolManager setExecutor(Executor executor) { + this.executor = executor; + return this; + } + @Override public Lock lockSessionCreation() { try { @@ -412,7 +421,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { List incomingInterceptors, List outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) { - this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors); + this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor); this.topologyResponseHandler = topologyResponseHandler; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 065277ac27..418e3f150b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -92,8 +92,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement final long blockingCallTimeout, final long blockingCallFailoverTimeout, final List incomingInterceptors, - final List outgoingInterceptors) { - this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, null); + final List outgoingInterceptors, + final Executor connectionExecutor) { + this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor); } /* @@ -103,9 +104,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement final Connection transportConnection, final List incomingInterceptors, final List outgoingInterceptors, - final Executor executor, - final SimpleString nodeID) { - this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors, false, executor, nodeID); + final SimpleString nodeID, + final Executor connectionExecutor) { + this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors, false, nodeID, connectionExecutor); } private RemotingConnectionImpl(final PacketDecoder packetDecoder, @@ -115,9 +116,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement final List incomingInterceptors, final List outgoingInterceptors, final boolean client, - final Executor executor, - final SimpleString nodeID) { - super(transportConnection, executor); + final SimpleString nodeID, + final Executor connectionExecutor) { + super(transportConnection, connectionExecutor); this.packetDecoder = packetDecoder; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java index e2c9fc1ee2..37e699ea2f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.spi.core.remoting; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; import io.netty.channel.ChannelPipeline; @@ -27,6 +28,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public interface ClientProtocolManager { + ClientProtocolManager setExecutor(Executor executor); + /// Life Cycle Methods: RemotingConnection connect(Connection transportConnection, diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index d34ce80fca..1667945e30 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -182,14 +182,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { } } - public Executor getExeuctor() { - if (protonConnectionDelegate != null) { - return protonConnectionDelegate.getExecutor(); - } else { - return null; - } - } - public void setConnection(AMQPConnectionContext connection) { this.amqpConnection = connection; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 56920796dd..971702e6c9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -46,17 +46,13 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager, AMQPConnectionContext amqpConnection, Connection transportConnection, - Executor executor) { - super(transportConnection, executor); + Executor connectionExecutor) { + super(transportConnection, connectionExecutor); this.manager = manager; this.amqpConnection = amqpConnection; transportConnection.setProtocolConnection(this); } - public Executor getExecutor() { - return this.executor; - } - public ProtonProtocolManager getManager() { return manager; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java index 54b8c670f2..43ae226b65 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; /** @@ -44,6 +45,11 @@ public class ProtonClientProtocolManager extends ProtonProtocolManager implement super(factory, server, Collections.emptyList(), Collections.emptyList()); } + @Override + public ClientProtocolManager setExecutor(Executor executor) { + return null; + } + @Override public void stop() { throw new UnsupportedOperationException(); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 0b94ab2130..6c846ba571 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -198,9 +198,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public OpenWireConnection(Connection connection, ActiveMQServer server, - Executor executor, OpenWireProtocolManager openWireProtocolManager, - OpenWireFormat wf) { + OpenWireFormat wf, + Executor executor) { super(connection, executor); this.server = server; this.operationContext = server.newOperationContext(); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 505564d0ed..44cc8da241 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -232,7 +232,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); - OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf); + OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor()); owConn.sendHandshake(); //first we setup ttl to -1 diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 40fb4f3e85..c3fb9853d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -328,15 +328,18 @@ public interface Configuration { Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming); /** + * deprecated: we decide based on the semantic context when to make things async or not * Returns whether code coming from connection is executed asynchronously or not.
* Default value is * {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED}. */ + @Deprecated boolean isAsyncConnectionExecutionEnabled(); /** * Sets whether code coming from connection is executed asynchronously or not. */ + @Deprecated Configuration setEnabledAsyncConnectionExecution(boolean enabled); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index 7a416d9a05..5862003672 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager { Executor connectionExecutor = server.getExecutorFactory().getExecutor(); - final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID()); + final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(), connectionExecutor); Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java index 812de2c716..f01e5e6adb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java @@ -49,7 +49,7 @@ public class ReplicationSyncFileMessageTest extends ActiveMQTestBase { FileChannel fileChannel = raf.getChannel(); ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, null, 10, raf, fileChannel, 0, dataSize); - RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize); @@ -69,7 +69,7 @@ public class ReplicationSyncFileMessageTest extends ActiveMQTestBase { FileChannel fileChannel = raf.getChannel(); ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, null, fileId, raf, fileChannel, 0, 0); - RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java index 6579e0b8f2..3206c2cfe8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.jms.connection; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.JMSException; @@ -50,6 +51,18 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest @Test(timeout = 60000) public void testOnAcknowledge() throws Exception { + testOnAcknowledge(false); + } + + @Test(timeout = 60000) + public void testOnAcknowledgeBlockOnFailover() throws Exception { + // this is validating a case where failover would block + // and yet the exception should already happen asynchronously + testOnAcknowledge(true); + } + + public void testOnAcknowledge(boolean blockOnFailover) throws Exception { + mayBlock.set(blockOnFailover); Connection sendConnection = null; Connection connection = null; AtomicReference exceptionOnConnection = new AtomicReference<>(); @@ -86,6 +99,10 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest fail("JMSException expected"); } catch (JMSException e) { + if (blockOnFailover) { + Wait.assertTrue(blocked::get); + unblock(); + } assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException); //Ensure JMS Connection ExceptionListener was also invoked assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100)); @@ -102,6 +119,16 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest @Test(timeout = 60000) public void testOnSend() throws Exception { + testOnSend(false); + } + + @Test(timeout = 60000) + public void testOnSendBlockOnFailover() throws Exception { + testOnSend(true); + } + + public void testOnSend(boolean blockOnFailover) throws Exception { + mayBlock.set(blockOnFailover); Connection sendConnection = null; Connection connection = null; AtomicReference exceptionOnConnection = new AtomicReference<>(); @@ -125,6 +152,10 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest fail("JMSException expected"); } catch (JMSException e) { + if (blockOnFailover) { + Wait.assertTrue(blocked::get); + unblock(); + } assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException); //Ensure JMS Connection ExceptionListener was also invoked assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100)); @@ -140,6 +171,30 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest } } + static AtomicBoolean mayBlock = new AtomicBoolean(true); + static AtomicBoolean blocked = new AtomicBoolean(false); + + private static void block() { + if (!mayBlock.get()) { + return; + } + + blocked.set(true); + + try { + long timeOut = System.currentTimeMillis() + 5000; + while (mayBlock.get() && System.currentTimeMillis() < timeOut) { + Thread.yield(); + } + } finally { + blocked.set(false); + } + } + + private static void unblock() { + mayBlock.set(false); + } + static Packet lastPacketSent; @@ -156,6 +211,12 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest @Override public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + // CheckForFailoverReply is ignored here, as this is simulating an issue where the server is completely not responding, the blocked call should throw an exception asynchrnously to the retry + if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) { + block(); + return true; + } + if (lastPacketSent.getType() == PacketImpl.SESS_ACKNOWLEDGE && packet.getType() == PacketImpl.NULL_RESPONSE) { return false; } @@ -167,9 +228,16 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest @Override public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + // CheckForFailoverReply is ignored here, as this is simulating an issue where the server is completely not responding, the blocked call should throw an exception asynchrnously to the retry + if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) { + block(); + return true; + } + if (lastPacketSent.getType() == PacketImpl.SESS_SEND && packet.getType() == PacketImpl.NULL_RESPONSE) { return false; } + return true; } }