diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 9f36d81935..d69b1e1835 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -492,7 +492,7 @@ public final class ChannelImpl implements Channel { if (response == null) { ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(), packet.getType()); - connection.fail(e); + connection.asyncFail(e); throw e; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index 960cadde65..5cbcbbfdae 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -218,6 +220,23 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { fail(me, null); } + @Override + public Future asyncFail(final ActiveMQException me) { + + FutureTask task = new FutureTask(() -> { + fail(me); + return null; + }); + + if (executor == null) { + // only tests cases can do this + task.run(); + } else { + executor.execute(task); + } + return task; + } + @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { dataReceived = true; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java index f7ed73a2a0..41cd0507a7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.spi.core.protocol; import java.util.List; +import java.util.concurrent.Future; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -132,6 +133,12 @@ public interface RemotingConnection extends BufferHandler { */ void fail(ActiveMQException me); + /** Same thing as fail, but using an executor. + * semantic of send here, is asynchrounous. + * @param me + */ + Future asyncFail(ActiveMQException me); + /** * called when the underlying connection fails. * diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index e9181f8025..4a4ca39512 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -20,6 +20,7 @@ import javax.security.auth.Subject; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.Unpooled; @@ -317,6 +318,11 @@ public class ChannelImplTest { } + @Override + public Future asyncFail(ActiveMQException me) { + return null; + } + @Override public void fail(ActiveMQException me, String scaleDownTargetNodeID) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 1f31692892..9a607f8f79 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -166,6 +168,22 @@ public class MQTTConnection implements RemotingConnection { } } + @Override + public Future asyncFail(ActiveMQException me) { + FutureTask task = new FutureTask(() -> { + fail(me); + return null; + }); + + + // I don't expect asyncFail happening on MQTT, in case of happens this is semantically correct + Thread t = new Thread(task); + + t.start(); + + return task; + } + @Override public void destroy() { //TODO(mtaylor) ensure this properly destroys this connection. diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 8c32281041..60c2d56257 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; @@ -110,7 +112,7 @@ public final class StompConnection implements RemotingConnection { private final ScheduledExecutorService scheduledExecutorService; - private final ExecutorFactory factory; + private final ExecutorFactory executorFactory; @Override public boolean isSupportReconnect() { @@ -136,7 +138,7 @@ public final class StompConnection implements RemotingConnection { case ActiveMQStompException.INVALID_EOL_V10: if (version != null) throw e; - frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory); + frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, executorFactory); buffer.resetReaderIndex(); frame = decode(buffer); break; @@ -164,16 +166,16 @@ public final class StompConnection implements RemotingConnection { final Connection transportConnection, final StompProtocolManager manager, final ScheduledExecutorService scheduledExecutorService, - final ExecutorFactory factory) { + final ExecutorFactory executorFactory) { this.scheduledExecutorService = scheduledExecutorService; - this.factory = factory; + this.executorFactory = executorFactory; this.transportConnection = transportConnection; this.manager = manager; - this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory); + this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory); this.creationTime = System.currentTimeMillis(); @@ -379,6 +381,24 @@ public final class StompConnection implements RemotingConnection { internalClose(); } + @Override + public Future asyncFail(ActiveMQException me) { + + FutureTask task = new FutureTask(() -> { + fail(me); + return null; + }); + + if (this.executorFactory == null) { + // only tests cases can do this + task.run(); + } else { + executorFactory.getExecutor().execute(task); + } + + return task; + } + @Override public void fail(final ActiveMQException me, String scaleDownTargetNodeID) { fail(me); @@ -528,7 +548,7 @@ public final class StompConnection implements RemotingConnection { } if (this.version != (StompVersions.V1_0)) { - VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory); + VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, executorFactory); newHandler.initDecoder(this.frameHandler); this.frameHandler = newHandler; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java index 7e760c139d..72c52d404b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.management.impl; import javax.security.auth.Subject; import java.util.List; +import java.util.concurrent.Future; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -110,6 +111,11 @@ public class ManagementRemotingConnection implements RemotingConnection { } + @Override + public Future asyncFail(ActiveMQException me) { + return null; + } + @Override public void fail(ActiveMQException me, String scaleDownTargetNodeID) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java index ccfc42c89b..6579e0b8f2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java @@ -58,6 +58,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName()); ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessAcknowledgeCauseResponseTimeout.class.getName()); ((ActiveMQConnectionFactory) cf).setBlockOnAcknowledge(true); + ((ActiveMQConnectionFactory) cf).setCallTimeout(500); sendConnection = cf.createConnection(); @@ -108,6 +109,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest try { ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName()); ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName()); + ((ActiveMQConnectionFactory) cf).setCallTimeout(500); sendConnection = cf.createConnection(); sendConnection.setExceptionListener(exceptionOnConnection::set);