diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 679844a519..d8b2315f7f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -17,22 +17,21 @@ package org.apache.activemq.artemis.core.remoting.impl.netty; import java.net.SocketAddress; -import java.util.Deque; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.GenericFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; @@ -46,40 +45,35 @@ import org.apache.activemq.artemis.utils.IPV6Util; public class NettyConnection implements Connection { - // Constants ----------------------------------------------------- - private static final int BATCHING_BUFFER_SIZE = 8192; - - // Attributes ---------------------------------------------------- + private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192); + private static final int DEFAULT_WAIT_MILLIS = 10_000; protected final Channel channel; - - private boolean closed; - private final BaseConnectionLifeCycleListener listener; - - private final boolean batchingEnabled; - private final boolean directDeliver; - - private volatile ActiveMQBuffer batchBuffer; - private final Map configuration; - - private final Semaphore writeLock = new Semaphore(1); - - private RemotingConnection protocolConnection; - - private boolean ready = true; - /** * if {@link #isWritable(ReadyListener)} returns false, we add a callback * here for when the connection (or Netty Channel) becomes available again. */ - private final Deque readyListeners = new LinkedList<>(); + private final List readyListeners = new ArrayList<>(); + private final ThreadLocal> localListenersPool = ThreadLocal.withInitial(ArrayList::new); - // Static -------------------------------------------------------- + private final boolean batchingEnabled; + private final int writeBufferHighWaterMark; + private final int batchLimit; - // Constructors -------------------------------------------------- + /** + * This counter is splitted in 2 variables to write it with less performance + * impact: no volatile get is required to update its value + */ + private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong(); + private long pendingWritesOnEventLoop = 0; + + private boolean closed; + private RemotingConnection protocolConnection; + + private boolean ready = true; public NettyConnection(final Map configuration, final Channel channel, @@ -92,28 +86,72 @@ public class NettyConnection implements Connection { this.listener = listener; + this.directDeliver = directDeliver; + this.batchingEnabled = batchingEnabled; - this.directDeliver = directDeliver; + this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark(); + + this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0; } - // Public -------------------------------------------------------- + private static void waitFor(ChannelPromise promise, long millis) { + try { + final boolean completed = promise.await(millis); + if (!completed) { + ActiveMQClientLogger.LOGGER.timeoutFlushingPacket(); + } + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } - public Channel getNettyChannel() { + /** + * Returns an estimation of the current size of the write buffer in the channel. + * To obtain a more precise value is necessary to use the unsafe API of the channel to + * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}. + * Anyway, both these values are subject to concurrent modifications. + */ + private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) { + //Channel::bytesBeforeUnwritable is performing a volatile load + //this is the reason why writeBufferHighWaterMark is passed as an argument + final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable(); + assert bytesBeforeUnwritable >= 0; + final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable; + assert writtenBytes >= 0; + return writtenBytes; + } + + public final int pendingWritesOnChannel() { + return batchBufferSize(this.channel, this.writeBufferHighWaterMark); + } + + public final long pendingWritesOnEventLoop() { + final EventLoop eventLoop = channel.eventLoop(); + final boolean inEventLoop = eventLoop.inEventLoop(); + final long pendingWritesOnEventLoop; + if (inEventLoop) { + pendingWritesOnEventLoop = this.pendingWritesOnEventLoop; + } else { + pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get(); + } + return pendingWritesOnEventLoop; + } + + public final Channel getNettyChannel() { return channel; } - // Connection implementation ---------------------------- @Override - public void setAutoRead(boolean autoRead) { + public final void setAutoRead(boolean autoRead) { channel.config().setAutoRead(autoRead); } @Override - public boolean isWritable(ReadyListener callback) { + public final boolean isWritable(ReadyListener callback) { synchronized (readyListeners) { if (!ready) { - readyListeners.push(callback); + readyListeners.add(callback); } return ready; @@ -121,40 +159,44 @@ public class NettyConnection implements Connection { } @Override - public void fireReady(final boolean ready) { - LinkedList readyToCall = null; + public final void fireReady(final boolean ready) { + final ArrayList readyToCall = localListenersPool.get(); synchronized (readyListeners) { this.ready = ready; if (ready) { - for (;;) { - ReadyListener readyListener = readyListeners.poll(); - if (readyListener == null) { - break; + final int size = this.readyListeners.size(); + readyToCall.ensureCapacity(size); + try { + for (int i = 0; i < size; i++) { + final ReadyListener readyListener = readyListeners.get(i); + if (readyListener == null) { + break; + } + readyToCall.add(readyListener); } - - if (readyToCall == null) { - readyToCall = new LinkedList<>(); - } - - readyToCall.add(readyListener); + } finally { + readyListeners.clear(); } } } - - if (readyToCall != null) { - for (ReadyListener readyListener : readyToCall) { + try { + final int size = readyToCall.size(); + for (int i = 0; i < size; i++) { try { + final ReadyListener readyListener = readyToCall.get(i); readyListener.readyForWriting(); } catch (Throwable logOnly) { ActiveMQClientLogger.LOGGER.warn(logOnly.getMessage(), logOnly); } } + } finally { + readyToCall.clear(); } } @Override - public void forceClose() { + public final void forceClose() { if (channel != null) { try { channel.close(); @@ -169,38 +211,35 @@ public class NettyConnection implements Connection { * * @return */ - public Channel getChannel() { + public final Channel getChannel() { return channel; } @Override - public RemotingConnection getProtocolConnection() { + public final RemotingConnection getProtocolConnection() { return protocolConnection; } @Override - public void setProtocolConnection(RemotingConnection protocolConnection) { + public final void setProtocolConnection(RemotingConnection protocolConnection) { this.protocolConnection = protocolConnection; } @Override - public void close() { + public final void close() { if (closed) { return; } - - final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); EventLoop eventLoop = channel.eventLoop(); boolean inEventLoop = eventLoop.inEventLoop(); //if we are in an event loop we need to close the channel after the writes have finished if (!inEventLoop) { + final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); closeSSLAndChannel(sslHandler, channel, false); } else { - eventLoop.execute(new Runnable() { - @Override - public void run() { - closeSSLAndChannel(sslHandler, channel, true); - } + eventLoop.execute(() -> { + final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); + closeSSLAndChannel(sslHandler, channel, true); }); } @@ -211,143 +250,206 @@ public class NettyConnection implements Connection { @Override public ActiveMQBuffer createTransportBuffer(final int size) { - return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true); + try { + return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); + } catch (OutOfMemoryError oom) { + final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark); + ActiveMQClientLogger.LOGGER.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom); + throw oom; + } } @Override - public Object getID() { + public final Object getID() { // TODO: Think of it return channel.hashCode(); } // This is called periodically to flush the batch buffer @Override - public void checkFlushBatchBuffer() { - if (!batchingEnabled) { - return; - } - - if (writeLock.tryAcquire()) { - try { - if (batchBuffer != null && batchBuffer.readable()) { - channel.writeAndFlush(batchBuffer.byteBuf()); - - batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE); - } - } finally { - writeLock.release(); + public final void checkFlushBatchBuffer() { + if (this.batchingEnabled) { + //perform the flush only if necessary + final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark); + if (batchBufferSize > 0) { + this.channel.flush(); } } } @Override - public void write(final ActiveMQBuffer buffer) { + public final void write(final ActiveMQBuffer buffer) { write(buffer, false, false); } @Override - public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) { + public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) { write(buffer, flush, batched, null); } @Override - public void write(ActiveMQBuffer buffer, - final boolean flush, - final boolean batched, - final ChannelFutureListener futureListener) { - - try { - writeLock.acquire(); - - try { - if (batchBuffer == null && batchingEnabled && batched && !flush) { - // Lazily create batch buffer - - batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE); - } - - if (batchBuffer != null) { - batchBuffer.writeBytes(buffer, 0, buffer.writerIndex()); - - if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) { - // If the batch buffer is full or it's flush param or not batched then flush the buffer - - buffer = batchBuffer; - } else { - return; - } - - if (!batched || flush) { - batchBuffer = null; - } else { - // Create a new buffer - - batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE); - } - } - - // depending on if we need to flush or not we can use a voidPromise or - // use a normal promise - final ByteBuf buf = buffer.byteBuf(); - final ChannelPromise promise; - if (flush || futureListener != null) { - promise = channel.newPromise(); - } else { - promise = channel.voidPromise(); - } - - EventLoop eventLoop = channel.eventLoop(); - boolean inEventLoop = eventLoop.inEventLoop(); - if (!inEventLoop) { - if (futureListener != null) { - channel.writeAndFlush(buf, promise).addListener(futureListener); - } else { - channel.writeAndFlush(buf, promise); - } - } else { - // create a task which will be picked up by the eventloop and trigger the write. - // This is mainly needed as this method is triggered by different threads for the same channel. - // if we not do this we may produce out of order writes. - final Runnable task = new Runnable() { - @Override - public void run() { - if (futureListener != null) { - channel.writeAndFlush(buf, promise).addListener(futureListener); - } else { - channel.writeAndFlush(buf, promise); - } - } - }; - // execute the task on the eventloop - eventLoop.execute(task); - } - - // only try to wait if not in the eventloop otherwise we will produce a deadlock - if (flush && !inEventLoop) { - while (true) { - try { - boolean ok = promise.await(10000); - - if (!ok) { - ActiveMQClientLogger.LOGGER.timeoutFlushingPacket(); - } - - break; - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - } - } - } finally { - writeLock.release(); + public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + final boolean isAllowedToBlock = isAllowedToBlock(); + if (!isAllowedToBlock) { + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { + ActiveMQClientLogger.LOGGER.debug("Calling blockUntilWritable using a thread where it's not allowed"); } - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); + return canWrite(requiredCapacity); + } else { + final long timeoutNanos = timeUnit.toNanos(timeout); + final long deadline = System.nanoTime() + timeoutNanos; + //choose wait time unit size + final long parkNanos; + //if is requested to wait more than a millisecond than we could use + if (timeoutNanos >= 1_000_000L) { + parkNanos = 100_000L; + } else { + //reduce it doesn't make sense, only a spin loop could be enough precise with the most OS + parkNanos = 1000L; + } + boolean canWrite; + while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) { + LockSupport.parkNanos(parkNanos); + } + return canWrite; + } + } + + private boolean isAllowedToBlock() { + final EventLoop eventLoop = channel.eventLoop(); + final boolean inEventLoop = eventLoop.inEventLoop(); + return !inEventLoop; + } + + private boolean canWrite(final int requiredCapacity) { + //evaluate if the write request could be taken: + //there is enough space in the write buffer? + //The pending writes on event loop will eventually go into the Netty write buffer, hence consider them + //as part of the heuristic! + final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop(); + final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel(); + final boolean canWrite; + if (requiredCapacity > this.writeBufferHighWaterMark) { + canWrite = totalPendingWrites == 0; + } else { + canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark; + } + return canWrite; + } + + @Override + public final void write(ActiveMQBuffer buffer, + final boolean flush, + final boolean batched, + final ChannelFutureListener futureListener) { + final int readableBytes = buffer.readableBytes(); + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { + final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; + if (remainingBytes < 0) { + ActiveMQClientLogger.LOGGER.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); + } + } + //no need to lock because the Netty's channel is thread-safe + //and the order of write is ensured by the order of the write calls + final EventLoop eventLoop = channel.eventLoop(); + final boolean inEventLoop = eventLoop.inEventLoop(); + if (!inEventLoop) { + writeNotInEventLoop(buffer, flush, batched, futureListener); + } else { + // OLD COMMENT: + // create a task which will be picked up by the eventloop and trigger the write. + // This is mainly needed as this method is triggered by different threads for the same channel. + // if we not do this we may produce out of order writes. + // NOTE: + // the submitted task does not effect in any way the current written size in the batch + // until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!! + // To solve it, will be necessary to manually perform the count of the current batch instead of rely on the + // Channel:Config::writeBufferHighWaterMark value. + this.pendingWritesOnEventLoop += readableBytes; + this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop); + eventLoop.execute(() -> { + this.pendingWritesOnEventLoop -= readableBytes; + this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop); + writeInEventLoop(buffer, flush, batched, futureListener); + }); + } + } + + private void writeNotInEventLoop(ActiveMQBuffer buffer, + final boolean flush, + final boolean batched, + final ChannelFutureListener futureListener) { + final Channel channel = this.channel; + final ChannelPromise promise; + if (flush || (futureListener != null)) { + promise = channel.newPromise(); + } else { + promise = channel.voidPromise(); + } + final ChannelFuture future; + final ByteBuf bytes = buffer.byteBuf(); + final int readableBytes = bytes.readableBytes(); + assert readableBytes >= 0; + final int writeBatchSize = this.batchLimit; + final boolean batchingEnabled = this.batchingEnabled; + if (batchingEnabled && batched && !flush && readableBytes < writeBatchSize) { + future = writeBatch(bytes, readableBytes, promise); + } else { + future = channel.writeAndFlush(bytes, promise); + } + if (futureListener != null) { + future.addListener(futureListener); + } + if (flush) { + //NOTE: this code path seems used only on RemotingConnection::disconnect + waitFor(promise, DEFAULT_WAIT_MILLIS); + } + } + + private void writeInEventLoop(ActiveMQBuffer buffer, + final boolean flush, + final boolean batched, + final ChannelFutureListener futureListener) { + //no need to lock because the Netty's channel is thread-safe + //and the order of write is ensured by the order of the write calls + final ChannelPromise promise; + if (futureListener != null) { + promise = channel.newPromise(); + } else { + promise = channel.voidPromise(); + } + final ChannelFuture future; + final ByteBuf bytes = buffer.byteBuf(); + final int readableBytes = bytes.readableBytes(); + final int writeBatchSize = this.batchLimit; + if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) { + future = writeBatch(bytes, readableBytes, promise); + } else { + future = channel.writeAndFlush(bytes, promise); + } + if (futureListener != null) { + future.addListener(futureListener); + } + } + + private ChannelFuture writeBatch(final ByteBuf bytes, final int readableBytes, final ChannelPromise promise) { + final int batchBufferSize = batchBufferSize(channel, this.writeBufferHighWaterMark); + final int nextBatchSize = batchBufferSize + readableBytes; + if (nextBatchSize > batchLimit) { + //request to flush before writing, to create the chance to make the channel writable again + channel.flush(); + //let netty use its write batching ability + return channel.write(bytes, promise); + } else if (nextBatchSize == batchLimit) { + return channel.writeAndFlush(bytes, promise); + } else { + //let netty use its write batching ability + return channel.write(bytes, promise); } } @Override - public String getRemoteAddress() { + public final String getRemoteAddress() { SocketAddress address = channel.remoteAddress(); if (address == null) { return null; @@ -356,7 +458,7 @@ public class NettyConnection implements Connection { } @Override - public String getLocalAddress() { + public final String getLocalAddress() { SocketAddress address = channel.localAddress(); if (address == null) { return null; @@ -364,18 +466,18 @@ public class NettyConnection implements Connection { return "tcp://" + IPV6Util.encloseHost(address.toString()); } - public boolean isDirectDeliver() { + public final boolean isDirectDeliver() { return directDeliver; } //never allow this @Override - public ActiveMQPrincipal getDefaultActiveMQPrincipal() { + public final ActiveMQPrincipal getDefaultActiveMQPrincipal() { return null; } @Override - public TransportConfiguration getConnectorConfig() { + public final TransportConfiguration getConnectorConfig() { if (configuration != null) { return new TransportConfiguration(NettyConnectorFactory.class.getName(), this.configuration); } else { @@ -384,46 +486,36 @@ public class NettyConnection implements Connection { } @Override - public boolean isUsingProtocolHandling() { + public final boolean isUsingProtocolHandling() { return true; } - // Public -------------------------------------------------------- - @Override - public String toString() { + public final String toString() { return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]"; } - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - private void closeSSLAndChannel(SslHandler sslHandler, final Channel channel, boolean inEventLoop) { + checkFlushBatchBuffer(); if (sslHandler != null) { try { ChannelFuture sslCloseFuture = sslHandler.close(); - sslCloseFuture.addListener(new GenericFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - channel.close(); - } - }); - if (!inEventLoop && !sslCloseFuture.awaitUninterruptibly(10000)) { + sslCloseFuture.addListener(future -> channel.close()); + if (!inEventLoop && !sslCloseFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) { ActiveMQClientLogger.LOGGER.timeoutClosingSSL(); } } catch (Throwable t) { // ignore + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + ActiveMQClientLogger.LOGGER.trace(t.getMessage(), t); + } } } else { ChannelFuture closeFuture = channel.close(); - if (!inEventLoop && !closeFuture.awaitUninterruptibly(10000)) { + if (!inEventLoop && !closeFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) { ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel(); } } } - // Inner classes ------------------------------------------------- } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index 7ab0c40986..56d1bc3e16 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import java.util.concurrent.TimeUnit; + import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -41,6 +43,21 @@ public interface Connection { boolean isWritable(ReadyListener listener); + /** + * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses. + * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control + * only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers. + * If the current thread is not allowed to block the timeout will be ignored dependently on the connection type. + * + * @param requiredCapacity the capacity in bytes to be enqueued + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise + */ + default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) { + return true; + } + void fireReady(boolean ready); /**