This commit is contained in:
Martyn Taylor 2017-07-20 10:33:04 +01:00
commit e305cc9875
5 changed files with 41 additions and 17 deletions

View File

@ -117,6 +117,7 @@ public interface CoreRemotingConnection extends RemotingConnection {
* @param size size we are trying to write
* @param timeout
* @return
* @throws IllegalStateException if the connection is closed
*/
boolean blockUntilWritable(int size, long timeout);
}

View File

@ -808,21 +808,25 @@ public class ActiveMQSessionContext extends SessionContext {
final CoreRemotingConnection connection = channel.getConnection();
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
final long startFlowControl = System.nanoTime();
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
if (!isWritable) {
final long endFlowControl = System.nanoTime();
final long elapsedFlowControl = endFlowControl - startFlowControl;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
try {
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
if (!isWritable) {
final long endFlowControl = System.nanoTime();
final long elapsedFlowControl = endFlowControl - startFlowControl;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
channel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
} catch (Throwable e) {
throw new ActiveMQException(e.getMessage());
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
channel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
}

View File

@ -294,8 +294,15 @@ public class NettyConnection implements Connection {
write(buffer, flush, batched, null);
}
private void checkConnectionState() {
if (this.closed || !this.channel.isActive()) {
throw new IllegalStateException("Connection " + getID() + " closed or disconnected");
}
}
@Override
public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
checkConnectionState();
final boolean isAllowedToBlock = isAllowedToBlock();
if (!isAllowedToBlock) {
@ -324,6 +331,8 @@ public class NettyConnection implements Connection {
}
boolean canWrite;
while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) {
//periodically check the connection state
checkConnectionState();
LockSupport.parkNanos(parkNanos);
}
return canWrite;
@ -361,9 +370,7 @@ public class NettyConnection implements Connection {
if (logger.isDebugEnabled()) {
final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
if (remainingBytes < 0) {
logger.debug("a write request is exceeding by " + (-remainingBytes) +
" bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark +
" ] : consider to set it at least of " + readableBytes + " bytes");
logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
}
}
//no need to lock because the Netty's channel is thread-safe

View File

@ -53,6 +53,7 @@ public interface Connection {
* @param timeout the maximum time to wait
* @param timeUnit the time unit of the timeout argument
* @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise
* @throws IllegalStateException if the connection is closed
*/
default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
return true;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -73,6 +74,16 @@ public class NettyConnectionTest extends ActiveMQTestBase {
}
@Test(expected = IllegalStateException.class)
public void throwsExceptionOnBlockUntilWritableIfClosed() {
EmbeddedChannel channel = createChannel();
NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false);
conn.close();
//to make sure the channel is closed it needs to run the pending tasks
channel.runPendingTasks();
conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
}
private static EmbeddedChannel createChannel() {
return new EmbeddedChannel(new ChannelInboundHandlerAdapter());
}