ARTEMIS-2205 Netty is used in a more idiomatic way

This helped decreasing a lot of pressure on GC by not creating
as many runnables for each write.

Besides this helps fixing some of the issues I would have had on refactoring AMQP
over flushing writes and other asynchronous issues.
This commit is contained in:
Francesco Nigro 2018-03-03 18:58:21 +01:00
parent 7b34b5648b
commit a40a459f8c
1 changed files with 35 additions and 90 deletions

View File

@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -60,19 +59,12 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again. * here for when the connection (or Netty Channel) becomes available again.
*/ */
private final List<ReadyListener> readyListeners = new ArrayList<>(); private final List<ReadyListener> readyListeners = new ArrayList<>();
private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = ThreadLocal.withInitial(ArrayList::new); private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();
private final boolean batchingEnabled; private final boolean batchingEnabled;
private final int writeBufferHighWaterMark; private final int writeBufferHighWaterMark;
private final int batchLimit; private final int batchLimit;
/**
* This counter is splitted in 2 variables to write it with less performance
* impact: no volatile get is required to update its value
*/
private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong();
private long pendingWritesOnEventLoop = 0;
private boolean closed; private boolean closed;
private RemotingConnection protocolConnection; private RemotingConnection protocolConnection;
@ -129,18 +121,6 @@ public class NettyConnection implements Connection {
return batchBufferSize(this.channel, this.writeBufferHighWaterMark); return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
} }
public final long pendingWritesOnEventLoop() {
final EventLoop eventLoop = channel.eventLoop();
final boolean inEventLoop = eventLoop.inEventLoop();
final long pendingWritesOnEventLoop;
if (inEventLoop) {
pendingWritesOnEventLoop = this.pendingWritesOnEventLoop;
} else {
pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get();
}
return pendingWritesOnEventLoop;
}
public final Channel getNettyChannel() { public final Channel getNettyChannel() {
return channel; return channel;
} }
@ -163,19 +143,27 @@ public class NettyConnection implements Connection {
@Override @Override
public final void fireReady(final boolean ready) { public final void fireReady(final boolean ready) {
final ArrayList<ReadyListener> readyToCall = localListenersPool.get(); ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);
}
synchronized (readyListeners) { synchronized (readyListeners) {
this.ready = ready; this.ready = ready;
if (ready) { if (ready) {
final int size = this.readyListeners.size(); final int size = this.readyListeners.size();
if (readyToCall != null) {
readyToCall.ensureCapacity(size); readyToCall.ensureCapacity(size);
}
try { try {
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i); final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) { if (readyListener == null) {
break; break;
} }
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener); readyToCall.add(readyListener);
} }
} finally { } finally {
@ -183,18 +171,23 @@ public class NettyConnection implements Connection {
} }
} }
} }
if (readyToCall != null) {
try { try {
final int size = readyToCall.size(); readyToCall.forEach(readyListener -> {
for (int i = 0; i < size; i++) {
try { try {
final ReadyListener readyListener = readyToCall.get(i);
readyListener.readyForWriting(); readyListener.readyForWriting();
} catch (Throwable logOnly) { } catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
} }
} });
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally { } finally {
readyToCall.clear(); readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
}
}
} }
} }
@ -256,7 +249,7 @@ public class NettyConnection implements Connection {
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark); final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
// I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here // I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom); logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
throw oom; throw oom;
} }
} }
@ -342,10 +335,7 @@ public class NettyConnection implements Connection {
private boolean canWrite(final int requiredCapacity) { private boolean canWrite(final int requiredCapacity) {
//evaluate if the write request could be taken: //evaluate if the write request could be taken:
//there is enough space in the write buffer? //there is enough space in the write buffer?
//The pending writes on event loop will eventually go into the Netty write buffer, hence consider them final long totalPendingWrites = this.pendingWritesOnChannel();
//as part of the heuristic!
final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop();
final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel();
final boolean canWrite; final boolean canWrite;
if (requiredCapacity > this.writeBufferHighWaterMark) { if (requiredCapacity > this.writeBufferHighWaterMark) {
canWrite = totalPendingWrites == 0; canWrite = totalPendingWrites == 0;
@ -369,34 +359,6 @@ public class NettyConnection implements Connection {
} }
//no need to lock because the Netty's channel is thread-safe //no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls //and the order of write is ensured by the order of the write calls
final EventLoop eventLoop = channel.eventLoop();
final boolean inEventLoop = eventLoop.inEventLoop();
if (!inEventLoop) {
writeNotInEventLoop(buffer, flush, batched, futureListener);
} else {
// OLD COMMENT:
// create a task which will be picked up by the eventloop and trigger the write.
// This is mainly needed as this method is triggered by different threads for the same channel.
// if we not do this we may produce out of order writes.
// NOTE:
// the submitted task does not effect in any way the current written size in the batch
// until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!!
// To solve it, will be necessary to manually perform the count of the current batch instead of rely on the
// Channel:Config::writeBufferHighWaterMark value.
this.pendingWritesOnEventLoop += readableBytes;
this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
eventLoop.execute(() -> {
this.pendingWritesOnEventLoop -= readableBytes;
this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
writeInEventLoop(buffer, flush, batched, futureListener);
});
}
}
private void writeNotInEventLoop(ActiveMQBuffer buffer,
final boolean flush,
final boolean batched,
final ChannelFutureListener futureListener) {
final Channel channel = this.channel; final Channel channel = this.channel;
final ChannelPromise promise; final ChannelPromise promise;
if (flush || (futureListener != null)) { if (flush || (futureListener != null)) {
@ -406,7 +368,6 @@ public class NettyConnection implements Connection {
} }
final ChannelFuture future; final ChannelFuture future;
final ByteBuf bytes = buffer.byteBuf(); final ByteBuf bytes = buffer.byteBuf();
final int readableBytes = bytes.readableBytes();
assert readableBytes >= 0; assert readableBytes >= 0;
final int writeBatchSize = this.batchLimit; final int writeBatchSize = this.batchLimit;
final boolean batchingEnabled = this.batchingEnabled; final boolean batchingEnabled = this.batchingEnabled;
@ -420,33 +381,17 @@ public class NettyConnection implements Connection {
} }
if (flush) { if (flush) {
//NOTE: this code path seems used only on RemotingConnection::disconnect //NOTE: this code path seems used only on RemotingConnection::disconnect
waitFor(promise, DEFAULT_WAIT_MILLIS); flushAndWait(channel, promise);
} }
} }
private void writeInEventLoop(ActiveMQBuffer buffer, private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
final boolean flush, if (!channel.eventLoop().inEventLoop()) {
final boolean batched, waitFor(promise, DEFAULT_WAIT_MILLIS);
final ChannelFutureListener futureListener) {
//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final ChannelPromise promise;
if (futureListener != null) {
promise = channel.newPromise();
} else { } else {
promise = channel.voidPromise(); if (logger.isDebugEnabled()) {
logger.debug("Calling write with flush from a thread where it's not allowed");
} }
final ChannelFuture future;
final ByteBuf bytes = buffer.byteBuf();
final int readableBytes = bytes.readableBytes();
final int writeBatchSize = this.batchLimit;
if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
future = writeBatch(bytes, readableBytes, promise);
} else {
future = channel.writeAndFlush(bytes, promise);
}
if (futureListener != null) {
future.addListener(futureListener);
} }
} }