diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 355e502d30..372cad4db3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -84,6 +84,13 @@ public interface Channel {
*/
boolean sendBatched(Packet packet);
+ /**
+ * Similarly to {@code flushConnection} on {@link #send(Packet, boolean)}, it requests
+ * any un-flushed previous sent packets to be flushed to the underlying connection.
+ * It can be a no-op in case of InVM transports, because they would likely to flush already on each send.
+ */
+ void flushConnection();
+
/**
* Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
* {@code flushConnection} is {@code true}.
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 377b1b5901..76f87cf106 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -135,10 +135,9 @@ public interface CoreRemotingConnection extends RemotingConnection {
/**
*
- * @param size size we are trying to write
* @param timeout
* @return
* @throws IllegalStateException if the connection is closed
*/
- boolean blockUntilWritable(int size, long timeout);
+ boolean blockUntilWritable(long timeout);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index b4c1dcdb9d..899fc2b59a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -1039,19 +1039,21 @@ public class ActiveMQSessionContext extends SessionContext {
} else {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
}
- final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
final long startFlowControl = System.nanoTime();
try {
- final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
+ final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis);
if (!isWritable) {
final long endFlowControl = System.nanoTime();
final long elapsedFlowControl = endFlowControl - startFlowControl;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
- logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
+ if (logger.isDebugEnabled()) {
+ logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]",
+ chunkPacket.expectedEncodeSize(), elapsedMillis, connection.getID());
+ }
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index a97f3818a6..4ecd2c63de 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -231,6 +231,11 @@ public final class ChannelImpl implements Channel {
}
}
+ @Override
+ public void flushConnection() {
+ connection.getTransportConnection().flush();
+ }
+
@Override
public boolean send(Packet packet, boolean flushConnection) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
@@ -557,7 +562,7 @@ public final class ChannelImpl implements Channel {
public static String invokeInterceptors(final Packet packet,
final List interceptors,
final RemotingConnection connection) {
- if (interceptors != null) {
+ if (interceptors != null && !interceptors.isEmpty()) {
for (final Interceptor interceptor : interceptors) {
try {
boolean callNext = interceptor.intercept(packet, connection);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index dcc8ecbe75..8f4e1b7c6a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -244,8 +244,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
@Override
- public boolean blockUntilWritable(int size, long timeout) {
- return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
+ public boolean blockUntilWritable(long timeout) {
+ return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS);
}
@Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index cde37ae7e9..431834ec65 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -47,7 +48,6 @@ public class NettyConnection implements Connection {
private static final Logger logger = Logger.getLogger(NettyConnection.class);
- private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192);
private static final int DEFAULT_WAIT_MILLIS = 10_000;
protected final Channel channel;
@@ -59,11 +59,9 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List readyListeners = new ArrayList<>();
- private final ThreadLocal> localListenersPool = new ThreadLocal<>();
+ private final FastThreadLocal> localListenersPool = new FastThreadLocal<>();
private final boolean batchingEnabled;
- private final int writeBufferHighWaterMark;
- private final int batchLimit;
private boolean closed;
private RemotingConnection protocolConnection;
@@ -84,10 +82,6 @@ public class NettyConnection implements Connection {
this.directDeliver = directDeliver;
this.batchingEnabled = batchingEnabled;
-
- this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
-
- this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
}
private static void waitFor(ChannelPromise promise, long millis) {
@@ -103,22 +97,9 @@ public class NettyConnection implements Connection {
/**
* Returns an estimation of the current size of the write buffer in the channel.
- * To obtain a more precise value is necessary to use the unsafe API of the channel to
- * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
- * Anyway, both these values are subject to concurrent modifications.
*/
- private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
- //Channel::bytesBeforeUnwritable is performing a volatile load
- //this is the reason why writeBufferHighWaterMark is passed as an argument
- final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
- assert bytesBeforeUnwritable >= 0;
- final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
- assert writtenBytes >= 0;
- return writtenBytes;
- }
-
- public final int pendingWritesOnChannel() {
- return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+ private static long batchBufferSize(Channel channel) {
+ return channel.unsafe().outboundBuffer().totalPendingWriteBytes();
}
public final Channel getNettyChannel() {
@@ -252,7 +233,7 @@ public class NettyConnection implements Connection {
try {
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
} catch (OutOfMemoryError oom) {
- final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+ final long totalPendingWriteBytes = batchBufferSize(this.channel);
// I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
throw oom;
@@ -268,9 +249,8 @@ public class NettyConnection implements Connection {
@Override
public final void checkFlushBatchBuffer() {
if (this.batchingEnabled) {
- //perform the flush only if necessary
- final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
- if (batchBufferSize > 0) {
+ // perform the flush only if necessary
+ if (batchBufferSize(this.channel) > 0 && !channel.isWritable()) {
this.channel.flush();
}
}
@@ -292,6 +272,12 @@ public class NettyConnection implements Connection {
}
}
+ @Override
+ public void flush() {
+ checkConnectionState();
+ this.channel.flush();
+ }
+
@Override
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
write(buffer, flush, batched, null);
@@ -304,22 +290,22 @@ public class NettyConnection implements Connection {
}
@Override
- public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
+ public final boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
checkConnectionState();
final boolean isAllowedToBlock = isAllowedToBlock();
if (!isAllowedToBlock) {
+ if (timeout > 0) {
+ if (Env.isTestEnv()) {
+ // this will only show when inside the testsuite.
+ // we may great the log for FAILURE
+ logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " + "The code will probably need fixing!", new Exception("trace"));
+ }
- if (Env.isTestEnv()) {
- // this will only show when inside the testsuite.
- // we may great the log for FAILURE
- logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " +
- "The code will probably need fixing!", new Exception("trace"));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
+ }
}
-
- if (logger.isDebugEnabled()) {
- logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
- }
- return canWrite(requiredCapacity);
+ return channel.isWritable();
} else {
final long timeoutNanos = timeUnit.toNanos(timeout);
final long deadline = System.nanoTime() + timeoutNanos;
@@ -333,7 +319,7 @@ public class NettyConnection implements Connection {
parkNanos = 1000L;
}
boolean canWrite;
- while (!(canWrite = canWrite(requiredCapacity)) && (System.nanoTime() - deadline) < 0) {
+ while (!(canWrite = channel.isWritable()) && (System.nanoTime() - deadline) < 0) {
//periodically check the connection state
checkConnectionState();
LockSupport.parkNanos(parkNanos);
@@ -348,31 +334,12 @@ public class NettyConnection implements Connection {
return !inEventLoop;
}
- private boolean canWrite(final int requiredCapacity) {
- //evaluate if the write request could be taken:
- //there is enough space in the write buffer?
- final long totalPendingWrites = this.pendingWritesOnChannel();
- final boolean canWrite;
- if (requiredCapacity > this.writeBufferHighWaterMark) {
- canWrite = totalPendingWrites == 0;
- } else {
- canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark;
- }
- return canWrite;
- }
-
@Override
public final void write(ActiveMQBuffer buffer,
final boolean flush,
final boolean batched,
final ChannelFutureListener futureListener) {
final int readableBytes = buffer.readableBytes();
- if (logger.isDebugEnabled()) {
- final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
- if (remainingBytes < 0) {
- logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
- }
- }
//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final Channel channel = this.channel;
@@ -385,10 +352,9 @@ public class NettyConnection implements Connection {
final ChannelFuture future;
final ByteBuf bytes = buffer.byteBuf();
assert readableBytes >= 0;
- final int writeBatchSize = this.batchLimit;
final boolean batchingEnabled = this.batchingEnabled;
- if (batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
- future = writeBatch(bytes, readableBytes, promise);
+ if (batchingEnabled && batched && !flush && channel.isWritable()) {
+ future = channel.write(bytes, promise);
} else {
future = channel.writeAndFlush(bytes, promise);
}
@@ -411,22 +377,6 @@ public class NettyConnection implements Connection {
}
}
- private ChannelFuture writeBatch(final ByteBuf bytes, final int readableBytes, final ChannelPromise promise) {
- final int batchBufferSize = batchBufferSize(channel, this.writeBufferHighWaterMark);
- final int nextBatchSize = batchBufferSize + readableBytes;
- if (nextBatchSize > batchLimit) {
- //request to flush before writing, to create the chance to make the channel writable again
- channel.flush();
- //let netty use its write batching ability
- return channel.write(bytes, promise);
- } else if (nextBatchSize == batchLimit) {
- return channel.writeAndFlush(bytes, promise);
- } else {
- //let netty use its write batching ability
- return channel.write(bytes, promise);
- }
- }
-
@Override
public final String getRemoteAddress() {
SocketAddress address = channel.remoteAddress();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 0f76354729..28584aee76 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -46,18 +46,17 @@ public interface Connection {
boolean isOpen();
/**
- * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
+ * Causes the current thread to wait until the connection is writable unless the specified waiting time elapses.
* The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control
* only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers.
* If the current thread is not allowed to block the timeout will be ignored dependently on the connection type.
*
- * @param requiredCapacity the capacity in bytes to be enqueued
* @param timeout the maximum time to wait
* @param timeUnit the time unit of the timeout argument
- * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise
+ * @return {@code true} if the connection is writable, {@code false} otherwise
* @throws IllegalStateException if the connection is closed
*/
- default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
+ default boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
return true;
}
@@ -85,6 +84,13 @@ public interface Connection {
*/
void write(ActiveMQBuffer buffer, boolean requestFlush);
+ /**
+ * Request to flush any previous written buffers into the wire.
+ */
+ default void flush() {
+
+ }
+
/**
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
*
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index d2eccf3378..73fa5919cf 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -237,7 +237,7 @@ public class ChannelImplTest {
}
@Override
- public boolean blockUntilWritable(int size, long timeout) {
+ public boolean blockUntilWritable(long timeout) {
return false;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index a5613fbd59..da26fa06f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -21,7 +21,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -80,7 +80,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
@@ -134,7 +133,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private List outgoingInterceptors = null;
- private final ArrayList pendingPackets;
+ private final ArrayDeque pendingPackets;
// Constructors --------------------------------------------------
@@ -146,7 +145,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
this.criticalErrorListener = criticalErrorListener;
this.wantedFailBack = wantedFailBack;
this.activation = activation;
- this.pendingPackets = new ArrayList<>();
+ this.pendingPackets = new ArrayDeque<>();
this.supportResponseBatching = false;
}
@@ -262,18 +261,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
@Override
public void endOfBatch() {
- final ArrayList pendingPackets = this.pendingPackets;
+ final ArrayDeque pendingPackets = this.pendingPackets;
if (pendingPackets.isEmpty()) {
return;
}
- try {
- for (int i = 0, size = pendingPackets.size(); i < size; i++) {
- final Packet packet = pendingPackets.get(i);
- final boolean isLast = i == (size - 1);
- channel.send(packet, isLast);
- }
- } finally {
- pendingPackets.clear();
+ for (int i = 0, size = pendingPackets.size(); i < size; i++) {
+ final Packet packet = pendingPackets.poll();
+ final boolean isLast = i == (size - 1);
+ channel.send(packet, isLast);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 9a2d629465..fa82ff6f63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,10 +26,15 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.EventLoop;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -69,6 +74,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -76,6 +82,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
@@ -126,13 +134,11 @@ public final class ReplicationManager implements ActiveMQComponent {
private final ExecutorFactory ioExecutorFactory;
- private final Executor replicationStream;
-
private SessionFailureListener failureListener;
private CoreRemotingConnection remotingConnection;
- private final long timeout;
+ private final long maxAllowedSlownessNanos;
private final long initialReplicationSyncTimeout;
@@ -140,6 +146,32 @@ public final class ReplicationManager implements ActiveMQComponent {
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
+ private static final class ReplicatePacketRequest {
+
+ final Packet packet;
+ final OperationContext context;
+ // Although this field is needed just during the initial sync,
+ // the JVM field layout would likely left 4 bytes of wasted space without it
+ // so it makes sense to use it instead.
+ final ReusableLatch done;
+
+ ReplicatePacketRequest(Packet packet, OperationContext context, ReusableLatch done) {
+ this.packet = packet;
+ this.context = context;
+ this.done = done;
+ }
+ }
+
+ private final Queue replicatePacketRequests;
+ private final Executor replicationStream;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private ScheduledFuture> slowReplicationChecker;
+ private long notWritableFrom;
+ private boolean checkSlowReplication;
+ private final ReadyListener onResume;
+ private boolean isFlushing;
+ private boolean awaitingResume;
+
/**
* @param remotingConnection
*/
@@ -153,8 +185,23 @@ public final class ReplicationManager implements ActiveMQComponent {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
- this.replicationStream = ioExecutorFactory.getExecutor();
- this.timeout = timeout;
+ final Connection transportConnection = this.remotingConnection.getTransportConnection();
+ if (transportConnection instanceof NettyConnection) {
+ final EventLoop eventLoop = ((NettyConnection) transportConnection).getNettyChannel().eventLoop();
+ this.replicationStream = eventLoop;
+ this.scheduledExecutorService = eventLoop;
+ } else {
+ this.replicationStream = ioExecutorFactory.getExecutor();
+ this.scheduledExecutorService = null;
+ }
+ this.maxAllowedSlownessNanos = timeout > 0 ? TimeUnit.MILLISECONDS.toNanos(timeout) : -1;
+ this.replicatePacketRequests = PlatformDependent.newMpscQueue();
+ this.slowReplicationChecker = null;
+ this.notWritableFrom = Long.MAX_VALUE;
+ this.awaitingResume = false;
+ this.onResume = this::resume;
+ this.isFlushing = false;
+ this.checkSlowReplication = false;
}
public void appendUpdateRecord(final byte journalID,
@@ -286,6 +333,25 @@ public final class ReplicationManager implements ActiveMQComponent {
replicatingChannel.setHandler(responseHandler);
failureListener = new ReplicatedSessionFailureListener();
remotingConnection.addFailureListener(failureListener);
+ // only Netty connections can enable slow replication checker
+ if (scheduledExecutorService != null && maxAllowedSlownessNanos >= 0) {
+ long periodNanos = maxAllowedSlownessNanos / 10;
+ if (periodNanos > TimeUnit.SECONDS.toNanos(1)) {
+ periodNanos = TimeUnit.SECONDS.toNanos(1);
+ } else if (periodNanos < TimeUnit.MILLISECONDS.toNanos(100)) {
+ logger.warnf("The cluster call timeout is too low ie %d ms: consider raising it to save CPU",
+ TimeUnit.NANOSECONDS.toMillis(maxAllowedSlownessNanos));
+ periodNanos = TimeUnit.MILLISECONDS.toNanos(100);
+ }
+ logger.debugf("Slow replication checker is running with a period of %d ms", TimeUnit.NANOSECONDS.toMillis(periodNanos));
+ // The slow detection has been implemented by using an always-on timer task
+ // instead of triggering one each time we detect an un-writable channel because:
+ // - getting temporarily an un-writable channel is rather common under load and scheduling/cancelling a
+ // timed task is a CPU and GC intensive operation
+ // - choosing a period of 100-1000 ms lead to a reasonable and constant CPU utilization while idle too
+ slowReplicationChecker = scheduledExecutorService.scheduleAtFixedRate(this::checkSlowReplication,
+ periodNanos, periodNanos, TimeUnit.NANOSECONDS);
+ }
started = true;
@@ -317,6 +383,11 @@ public final class ReplicationManager implements ActiveMQComponent {
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
}
+ if (slowReplicationChecker != null) {
+ slowReplicationChecker.cancel(false);
+ slowReplicationChecker = null;
+ }
+
enabled = false;
if (clearTokens) {
@@ -374,6 +445,10 @@ public final class ReplicationManager implements ActiveMQComponent {
}
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
+ return sendReplicatePacket(packet, lineUp, null);
+ }
+
+ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, ReusableLatch done) {
if (!enabled) {
packet.release();
return null;
@@ -383,29 +458,48 @@ public final class ReplicationManager implements ActiveMQComponent {
if (lineUp) {
repliToken.replicationLineUp();
}
-
+ final ReplicatePacketRequest request = new ReplicatePacketRequest(packet, repliToken, done);
+ replicatePacketRequests.add(request);
replicationStream.execute(() -> {
if (enabled) {
- pendingTokens.add(repliToken);
- flowControl(packet.expectedEncodeSize());
- replicatingChannel.send(packet);
+ sendReplicatedPackets(false);
} else {
- packet.release();
- repliToken.replicationDone();
+ releaseReplicatedPackets(replicatePacketRequests);
}
});
return repliToken;
}
- /**
- * This was written as a refactoring of sendReplicatePacket.
- * In case you refactor this in any way, this method must hold a lock on replication lock. .
- */
- private boolean flowControl(int size) {
- boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout);
+ private void releaseReplicatedPackets(Queue requests) {
+ assert checkEventLoop();
+ ReplicatePacketRequest req;
+ while ((req = requests.poll()) != null) {
+ req.packet.release();
+ req.context.replicationDone();
+ if (req.done != null) {
+ req.done.countDown();
+ }
+ }
+ }
- if (!flowWorked) {
+ private void checkSlowReplication() {
+ if (!enabled) {
+ return;
+ }
+ assert checkEventLoop();
+ if (!checkSlowReplication) {
+ return;
+ }
+ final boolean isWritable = replicatingChannel.getConnection().blockUntilWritable(0);
+ if (isWritable) {
+ checkSlowReplication = false;
+ return;
+ }
+ final long elapsedNanosNotWritable = System.nanoTime() - notWritableFrom;
+ if (elapsedNanosNotWritable >= maxAllowedSlownessNanos) {
+ checkSlowReplication = false;
+ releaseReplicatedPackets(replicatePacketRequests);
try {
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
stop();
@@ -413,8 +507,84 @@ public final class ReplicationManager implements ActiveMQComponent {
logger.warn(e.getMessage(), e);
}
}
+ }
- return flowWorked;
+ private void resume() {
+ sendReplicatedPackets(true);
+ }
+
+ private void sendReplicatedPackets(boolean resume) {
+ assert checkEventLoop();
+ if (resume) {
+ awaitingResume = false;
+ }
+ // We try to:
+ // - save recursive calls of resume due to flushConnection
+ // - saving flush pending writes *if* the OS hasn't notified that's writable again
+ if (awaitingResume || isFlushing || !enabled) {
+ return;
+ }
+ if (replicatePacketRequests.isEmpty()) {
+ return;
+ }
+ isFlushing = true;
+ final CoreRemotingConnection connection = replicatingChannel.getConnection();
+ try {
+ while (connection.blockUntilWritable(0)) {
+ checkSlowReplication = false;
+ final ReplicatePacketRequest request = replicatePacketRequests.poll();
+ if (request == null) {
+ replicatingChannel.flushConnection();
+ // given that there isn't any more work to do, we're not interested
+ // to check writability state to trigger the slow connection check
+ return;
+ }
+ pendingTokens.add(request.context);
+ final Packet pack = request.packet;
+ final ReusableLatch done = request.done;
+ if (done != null) {
+ done.countDown();
+ }
+ replicatingChannel.send(pack, false);
+ }
+ replicatingChannel.flushConnection();
+ assert !awaitingResume;
+ // we care about writability just if there is some work to do
+ if (!replicatePacketRequests.isEmpty()) {
+ if (!connection.isWritable(onResume)) {
+ checkSlowReplication = true;
+ notWritableFrom = System.nanoTime();
+ awaitingResume = true;
+ } else {
+ // submit itself again to continue draining:
+ // we're not trying it again here to save read starvation
+ // NOTE: maybe it's redundant because there are already others in-flights requests
+ replicationStream.execute(() -> sendReplicatedPackets(false));
+ }
+ }
+ } catch (Throwable t) {
+ assert !(t instanceof AssertionError) : t.getMessage();
+ if (!connection.getTransportConnection().isOpen()) {
+ // that's an handled state: right after this cleanup is expected to be stopped/closed
+ // or get the failure listener to be called!
+ logger.trace("Transport connection closed: cleaning up replicate tokens", t);
+ releaseReplicatedPackets(replicatePacketRequests);
+ // cleanup ReadyListener without triggering any further write/flush
+ connection.getTransportConnection().fireReady(true);
+ } else {
+ logger.warn("Unexpected error while flushing replicate packets", t);
+ }
+ } finally {
+ isFlushing = false;
+ }
+ }
+
+ private boolean checkEventLoop() {
+ if (!(replicationStream instanceof SingleThreadEventLoop)) {
+ return true;
+ }
+ final SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) replicationStream;
+ return eventLoop.inEventLoop();
}
/**
@@ -423,6 +593,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* packets were not sent with {@link #sendReplicatePacket(Packet)}.
*/
private void replicated() {
+ assert checkEventLoop();
OperationContext ctx = pendingTokens.poll();
if (ctx == null) {
@@ -528,24 +699,6 @@ public final class ReplicationManager implements ActiveMQComponent {
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
- private class FlushAction implements Runnable {
-
- ReusableLatch latch = new ReusableLatch(1);
-
- public void reset() {
- latch.setCount(1);
- }
-
- public boolean await(long timeout, TimeUnit unit) throws Exception {
- return latch.await(timeout, unit);
- }
-
- @Override
- public void run() {
- latch.countDown();
- }
- }
-
/**
* Sends large files in reasonably sized chunks to the backup during replication synchronization.
*
@@ -566,12 +719,12 @@ public final class ReplicationManager implements ActiveMQComponent {
if (!file.isOpen()) {
file.open();
}
- int size = 32 * 1024;
+ final int size = 32 * 1024;
int flowControlSize = 10;
int packetsSent = 0;
- FlushAction action = new FlushAction();
+ final ReusableLatch flushed = new ReusableLatch(1);
try {
try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
@@ -593,32 +746,33 @@ public final class ReplicationManager implements ActiveMQComponent {
maxBytesToSend = maxBytesToSend - bytesRead;
}
}
- logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
+ if (logger.isDebugEnabled()) {
+ logger.debugf("sending %d bytes on file %s", buffer.writerIndex(), file.getFileName());
+ }
// sending -1 or 0 bytes will close the file at the backup
- // We cannot simply send everything of a file through the executor,
- // otherwise we would run out of memory.
- // so we don't use the executor here
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+ final boolean lastPacket = bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0;
+ final boolean flowControlCheck = (packetsSent % flowControlSize == 0) || lastPacket;
+ if (flowControlCheck) {
+ flushed.setCount(1);
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, flushed);
+ awaitFlushOfReplicationStream(flushed);
+ } else {
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+ }
packetsSent++;
- if (packetsSent % flowControlSize == 0) {
- flushReplicationStream(action);
- }
- if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
+ if (lastPacket)
break;
}
}
- flushReplicationStream(action);
} finally {
if (file.isOpen())
file.close();
}
}
- private void flushReplicationStream(FlushAction action) throws Exception {
- action.reset();
- replicationStream.execute(action);
- if (!action.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
+ private void awaitFlushOfReplicationStream(ReusableLatch flushed) throws Exception {
+ if (!flushed.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index c88cb16693..9ddffd389a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -222,6 +222,11 @@ public class BackupSyncDelay implements Interceptor {
}
+ @Override
+ public void flushConnection() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public boolean sendAndFlush(Packet packet) {
throw new UnsupportedOperationException();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index c9c975cf85..5b58cb9e12 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -84,7 +84,7 @@ public class NettyConnectionTest extends ActiveMQTestBase {
conn.close();
//to make sure the channel is closed it needs to run the pending tasks
channel.runPendingTasks();
- conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
+ conn.blockUntilWritable(0, TimeUnit.NANOSECONDS);
}
@Test