diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 45d92299e4..1756153ca1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -117,6 +117,7 @@ public interface CoreRemotingConnection extends RemotingConnection { * @param size size we are trying to write * @param timeout * @return + * @throws IllegalStateException if the connection is closed */ boolean blockUntilWritable(int size, long timeout); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 71baaeb649..b8eb22c9fe 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -808,21 +808,25 @@ public class ActiveMQSessionContext extends SessionContext { final CoreRemotingConnection connection = channel.getConnection(); final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout()); final long startFlowControl = System.nanoTime(); - final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); - if (!isWritable) { - final long endFlowControl = System.nanoTime(); - final long elapsedFlowControl = endFlowControl - startFlowControl; - final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); - ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); - logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + try { + final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); + if (!isWritable) { + final long endFlowControl = System.nanoTime(); + final long elapsedFlowControl = endFlowControl - startFlowControl; + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); + ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); + logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + } + if (requiresResponse) { + // When sending it blocking, only the last chunk will be blocking. + channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } else { + channel.send(chunkPacket); + } + return chunkPacket.getPacketSize(); + } catch (Throwable e) { + throw new ActiveMQException(e.getMessage()); } - if (requiresResponse) { - // When sending it blocking, only the last chunk will be blocking. - channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); - } else { - channel.send(chunkPacket); - } - return chunkPacket.getPacketSize(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 3ce40c97b5..384ca5e863 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -294,8 +294,15 @@ public class NettyConnection implements Connection { write(buffer, flush, batched, null); } + private void checkConnectionState() { + if (this.closed || !this.channel.isActive()) { + throw new IllegalStateException("Connection " + getID() + " closed or disconnected"); + } + } + @Override public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + checkConnectionState(); final boolean isAllowedToBlock = isAllowedToBlock(); if (!isAllowedToBlock) { @@ -324,6 +331,8 @@ public class NettyConnection implements Connection { } boolean canWrite; while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) { + //periodically check the connection state + checkConnectionState(); LockSupport.parkNanos(parkNanos); } return canWrite; @@ -361,9 +370,7 @@ public class NettyConnection implements Connection { if (logger.isDebugEnabled()) { final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; if (remainingBytes < 0) { - logger.debug("a write request is exceeding by " + (-remainingBytes) + - " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + - " ] : consider to set it at least of " + readableBytes + " bytes"); + logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); } } //no need to lock because the Netty's channel is thread-safe diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index 56d1bc3e16..63dbcfb3bc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -53,6 +53,7 @@ public interface Connection { * @param timeout the maximum time to wait * @param timeUnit the time unit of the timeout argument * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise + * @throws IllegalStateException if the connection is closed */ default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { return true; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index 8d8e4829fd..05ae1f6a9c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -73,6 +74,16 @@ public class NettyConnectionTest extends ActiveMQTestBase { } + @Test(expected = IllegalStateException.class) + public void throwsExceptionOnBlockUntilWritableIfClosed() { + EmbeddedChannel channel = createChannel(); + NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false); + conn.close(); + //to make sure the channel is closed it needs to run the pending tasks + channel.runPendingTasks(); + conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS); + } + private static EmbeddedChannel createChannel() { return new EmbeddedChannel(new ChannelInboundHandlerAdapter()); }