diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index f8195fbdce..1032a352bb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import io.netty.buffer.ByteBuf; @@ -60,19 +59,12 @@ public class NettyConnection implements Connection { * here for when the connection (or Netty Channel) becomes available again. */ private final List readyListeners = new ArrayList<>(); - private final ThreadLocal> localListenersPool = ThreadLocal.withInitial(ArrayList::new); + private final ThreadLocal> localListenersPool = new ThreadLocal<>(); private final boolean batchingEnabled; private final int writeBufferHighWaterMark; private final int batchLimit; - /** - * This counter is splitted in 2 variables to write it with less performance - * impact: no volatile get is required to update its value - */ - private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong(); - private long pendingWritesOnEventLoop = 0; - private boolean closed; private RemotingConnection protocolConnection; @@ -129,18 +121,6 @@ public class NettyConnection implements Connection { return batchBufferSize(this.channel, this.writeBufferHighWaterMark); } - public final long pendingWritesOnEventLoop() { - final EventLoop eventLoop = channel.eventLoop(); - final boolean inEventLoop = eventLoop.inEventLoop(); - final long pendingWritesOnEventLoop; - if (inEventLoop) { - pendingWritesOnEventLoop = this.pendingWritesOnEventLoop; - } else { - pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get(); - } - return pendingWritesOnEventLoop; - } - public final Channel getNettyChannel() { return channel; } @@ -163,19 +143,27 @@ public class NettyConnection implements Connection { @Override public final void fireReady(final boolean ready) { - final ArrayList readyToCall = localListenersPool.get(); + ArrayList readyToCall = localListenersPool.get(); + if (readyToCall != null) { + localListenersPool.set(null); + } synchronized (readyListeners) { this.ready = ready; if (ready) { final int size = this.readyListeners.size(); - readyToCall.ensureCapacity(size); + if (readyToCall != null) { + readyToCall.ensureCapacity(size); + } try { for (int i = 0; i < size; i++) { final ReadyListener readyListener = readyListeners.get(i); if (readyListener == null) { break; } + if (readyToCall == null) { + readyToCall = new ArrayList<>(size); + } readyToCall.add(readyListener); } } finally { @@ -183,18 +171,23 @@ public class NettyConnection implements Connection { } } } - try { - final int size = readyToCall.size(); - for (int i = 0; i < size; i++) { - try { - final ReadyListener readyListener = readyToCall.get(i); - readyListener.readyForWriting(); - } catch (Throwable logOnly) { - ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); + if (readyToCall != null) { + try { + readyToCall.forEach(readyListener -> { + try { + readyListener.readyForWriting(); + } catch (Throwable logOnly) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); + } + }); + } catch (Throwable t) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t); + } finally { + readyToCall.clear(); + if (localListenersPool.get() != null) { + localListenersPool.set(readyToCall); } } - } finally { - readyToCall.clear(); } } @@ -256,7 +249,7 @@ public class NettyConnection implements Connection { } catch (OutOfMemoryError oom) { final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark); // I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here - logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom); + logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom); throw oom; } } @@ -342,10 +335,7 @@ public class NettyConnection implements Connection { private boolean canWrite(final int requiredCapacity) { //evaluate if the write request could be taken: //there is enough space in the write buffer? - //The pending writes on event loop will eventually go into the Netty write buffer, hence consider them - //as part of the heuristic! - final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop(); - final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel(); + final long totalPendingWrites = this.pendingWritesOnChannel(); final boolean canWrite; if (requiredCapacity > this.writeBufferHighWaterMark) { canWrite = totalPendingWrites == 0; @@ -369,34 +359,6 @@ public class NettyConnection implements Connection { } //no need to lock because the Netty's channel is thread-safe //and the order of write is ensured by the order of the write calls - final EventLoop eventLoop = channel.eventLoop(); - final boolean inEventLoop = eventLoop.inEventLoop(); - if (!inEventLoop) { - writeNotInEventLoop(buffer, flush, batched, futureListener); - } else { - // OLD COMMENT: - // create a task which will be picked up by the eventloop and trigger the write. - // This is mainly needed as this method is triggered by different threads for the same channel. - // if we not do this we may produce out of order writes. - // NOTE: - // the submitted task does not effect in any way the current written size in the batch - // until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!! - // To solve it, will be necessary to manually perform the count of the current batch instead of rely on the - // Channel:Config::writeBufferHighWaterMark value. - this.pendingWritesOnEventLoop += readableBytes; - this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop); - eventLoop.execute(() -> { - this.pendingWritesOnEventLoop -= readableBytes; - this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop); - writeInEventLoop(buffer, flush, batched, futureListener); - }); - } - } - - private void writeNotInEventLoop(ActiveMQBuffer buffer, - final boolean flush, - final boolean batched, - final ChannelFutureListener futureListener) { final Channel channel = this.channel; final ChannelPromise promise; if (flush || (futureListener != null)) { @@ -406,7 +368,6 @@ public class NettyConnection implements Connection { } final ChannelFuture future; final ByteBuf bytes = buffer.byteBuf(); - final int readableBytes = bytes.readableBytes(); assert readableBytes >= 0; final int writeBatchSize = this.batchLimit; final boolean batchingEnabled = this.batchingEnabled; @@ -420,33 +381,17 @@ public class NettyConnection implements Connection { } if (flush) { //NOTE: this code path seems used only on RemotingConnection::disconnect - waitFor(promise, DEFAULT_WAIT_MILLIS); + flushAndWait(channel, promise); } } - private void writeInEventLoop(ActiveMQBuffer buffer, - final boolean flush, - final boolean batched, - final ChannelFutureListener futureListener) { - //no need to lock because the Netty's channel is thread-safe - //and the order of write is ensured by the order of the write calls - final ChannelPromise promise; - if (futureListener != null) { - promise = channel.newPromise(); + private static void flushAndWait(final Channel channel, final ChannelPromise promise) { + if (!channel.eventLoop().inEventLoop()) { + waitFor(promise, DEFAULT_WAIT_MILLIS); } else { - promise = channel.voidPromise(); - } - final ChannelFuture future; - final ByteBuf bytes = buffer.byteBuf(); - final int readableBytes = bytes.readableBytes(); - final int writeBatchSize = this.batchLimit; - if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) { - future = writeBatch(bytes, readableBytes, promise); - } else { - future = channel.writeAndFlush(bytes, promise); - } - if (futureListener != null) { - future.addListener(futureListener); + if (logger.isDebugEnabled()) { + logger.debug("Calling write with flush from a thread where it's not allowed"); + } } }