This closes #3392
This commit is contained in:
commit
898b406430
|
@ -84,6 +84,13 @@ public interface Channel {
|
||||||
*/
|
*/
|
||||||
boolean sendBatched(Packet packet);
|
boolean sendBatched(Packet packet);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similarly to {@code flushConnection} on {@link #send(Packet, boolean)}, it requests
|
||||||
|
* any un-flushed previous sent packets to be flushed to the underlying connection.<br>
|
||||||
|
* It can be a no-op in case of InVM transports, because they would likely to flush already on each send.
|
||||||
|
*/
|
||||||
|
void flushConnection();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
|
* Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
|
||||||
* {@code flushConnection} is {@code true}.
|
* {@code flushConnection} is {@code true}.
|
||||||
|
|
|
@ -135,10 +135,9 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param size size we are trying to write
|
|
||||||
* @param timeout
|
* @param timeout
|
||||||
* @return
|
* @return
|
||||||
* @throws IllegalStateException if the connection is closed
|
* @throws IllegalStateException if the connection is closed
|
||||||
*/
|
*/
|
||||||
boolean blockUntilWritable(int size, long timeout);
|
boolean blockUntilWritable(long timeout);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1039,19 +1039,21 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
} else {
|
} else {
|
||||||
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
|
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
|
||||||
}
|
}
|
||||||
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
|
|
||||||
//perform a weak form of flow control to avoid OOM on tight loops
|
//perform a weak form of flow control to avoid OOM on tight loops
|
||||||
final CoreRemotingConnection connection = channel.getConnection();
|
final CoreRemotingConnection connection = channel.getConnection();
|
||||||
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
|
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
|
||||||
final long startFlowControl = System.nanoTime();
|
final long startFlowControl = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
|
final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis);
|
||||||
if (!isWritable) {
|
if (!isWritable) {
|
||||||
final long endFlowControl = System.nanoTime();
|
final long endFlowControl = System.nanoTime();
|
||||||
final long elapsedFlowControl = endFlowControl - startFlowControl;
|
final long elapsedFlowControl = endFlowControl - startFlowControl;
|
||||||
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
|
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
|
||||||
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
|
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
|
||||||
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]",
|
||||||
|
chunkPacket.expectedEncodeSize(), elapsedMillis, connection.getID());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
// When sending it blocking, only the last chunk will be blocking.
|
// When sending it blocking, only the last chunk will be blocking.
|
||||||
|
|
|
@ -231,6 +231,11 @@ public final class ChannelImpl implements Channel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flushConnection() {
|
||||||
|
connection.getTransportConnection().flush();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean send(Packet packet, boolean flushConnection) {
|
public boolean send(Packet packet, boolean flushConnection) {
|
||||||
if (invokeInterceptors(packet, interceptors, connection) != null) {
|
if (invokeInterceptors(packet, interceptors, connection) != null) {
|
||||||
|
@ -557,7 +562,7 @@ public final class ChannelImpl implements Channel {
|
||||||
public static String invokeInterceptors(final Packet packet,
|
public static String invokeInterceptors(final Packet packet,
|
||||||
final List<Interceptor> interceptors,
|
final List<Interceptor> interceptors,
|
||||||
final RemotingConnection connection) {
|
final RemotingConnection connection) {
|
||||||
if (interceptors != null) {
|
if (interceptors != null && !interceptors.isEmpty()) {
|
||||||
for (final Interceptor interceptor : interceptors) {
|
for (final Interceptor interceptor : interceptors) {
|
||||||
try {
|
try {
|
||||||
boolean callNext = interceptor.intercept(packet, connection);
|
boolean callNext = interceptor.intercept(packet, connection);
|
||||||
|
|
|
@ -244,8 +244,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean blockUntilWritable(int size, long timeout) {
|
public boolean blockUntilWritable(long timeout) {
|
||||||
return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
|
return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -29,6 +29,7 @@ import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.util.concurrent.FastThreadLocal;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
@ -47,7 +48,6 @@ public class NettyConnection implements Connection {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(NettyConnection.class);
|
private static final Logger logger = Logger.getLogger(NettyConnection.class);
|
||||||
|
|
||||||
private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192);
|
|
||||||
private static final int DEFAULT_WAIT_MILLIS = 10_000;
|
private static final int DEFAULT_WAIT_MILLIS = 10_000;
|
||||||
|
|
||||||
protected final Channel channel;
|
protected final Channel channel;
|
||||||
|
@ -59,11 +59,9 @@ public class NettyConnection implements Connection {
|
||||||
* here for when the connection (or Netty Channel) becomes available again.
|
* here for when the connection (or Netty Channel) becomes available again.
|
||||||
*/
|
*/
|
||||||
private final List<ReadyListener> readyListeners = new ArrayList<>();
|
private final List<ReadyListener> readyListeners = new ArrayList<>();
|
||||||
private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();
|
private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
|
||||||
|
|
||||||
private final boolean batchingEnabled;
|
private final boolean batchingEnabled;
|
||||||
private final int writeBufferHighWaterMark;
|
|
||||||
private final int batchLimit;
|
|
||||||
|
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private RemotingConnection protocolConnection;
|
private RemotingConnection protocolConnection;
|
||||||
|
@ -84,10 +82,6 @@ public class NettyConnection implements Connection {
|
||||||
this.directDeliver = directDeliver;
|
this.directDeliver = directDeliver;
|
||||||
|
|
||||||
this.batchingEnabled = batchingEnabled;
|
this.batchingEnabled = batchingEnabled;
|
||||||
|
|
||||||
this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
|
|
||||||
|
|
||||||
this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void waitFor(ChannelPromise promise, long millis) {
|
private static void waitFor(ChannelPromise promise, long millis) {
|
||||||
|
@ -103,22 +97,9 @@ public class NettyConnection implements Connection {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an estimation of the current size of the write buffer in the channel.
|
* Returns an estimation of the current size of the write buffer in the channel.
|
||||||
* To obtain a more precise value is necessary to use the unsafe API of the channel to
|
|
||||||
* call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
|
|
||||||
* Anyway, both these values are subject to concurrent modifications.
|
|
||||||
*/
|
*/
|
||||||
private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
|
private static long batchBufferSize(Channel channel) {
|
||||||
//Channel::bytesBeforeUnwritable is performing a volatile load
|
return channel.unsafe().outboundBuffer().totalPendingWriteBytes();
|
||||||
//this is the reason why writeBufferHighWaterMark is passed as an argument
|
|
||||||
final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
|
|
||||||
assert bytesBeforeUnwritable >= 0;
|
|
||||||
final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
|
|
||||||
assert writtenBytes >= 0;
|
|
||||||
return writtenBytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public final int pendingWritesOnChannel() {
|
|
||||||
return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public final Channel getNettyChannel() {
|
public final Channel getNettyChannel() {
|
||||||
|
@ -252,7 +233,7 @@ public class NettyConnection implements Connection {
|
||||||
try {
|
try {
|
||||||
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
|
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
|
||||||
} catch (OutOfMemoryError oom) {
|
} catch (OutOfMemoryError oom) {
|
||||||
final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
|
final long totalPendingWriteBytes = batchBufferSize(this.channel);
|
||||||
// I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
|
// I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
|
||||||
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
|
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
|
||||||
throw oom;
|
throw oom;
|
||||||
|
@ -268,9 +249,8 @@ public class NettyConnection implements Connection {
|
||||||
@Override
|
@Override
|
||||||
public final void checkFlushBatchBuffer() {
|
public final void checkFlushBatchBuffer() {
|
||||||
if (this.batchingEnabled) {
|
if (this.batchingEnabled) {
|
||||||
//perform the flush only if necessary
|
// perform the flush only if necessary
|
||||||
final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
|
if (batchBufferSize(this.channel) > 0 && !channel.isWritable()) {
|
||||||
if (batchBufferSize > 0) {
|
|
||||||
this.channel.flush();
|
this.channel.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -292,6 +272,12 @@ public class NettyConnection implements Connection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() {
|
||||||
|
checkConnectionState();
|
||||||
|
this.channel.flush();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
|
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
|
||||||
write(buffer, flush, batched, null);
|
write(buffer, flush, batched, null);
|
||||||
|
@ -304,22 +290,22 @@ public class NettyConnection implements Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
|
public final boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
|
||||||
checkConnectionState();
|
checkConnectionState();
|
||||||
final boolean isAllowedToBlock = isAllowedToBlock();
|
final boolean isAllowedToBlock = isAllowedToBlock();
|
||||||
if (!isAllowedToBlock) {
|
if (!isAllowedToBlock) {
|
||||||
|
if (timeout > 0) {
|
||||||
|
if (Env.isTestEnv()) {
|
||||||
|
// this will only show when inside the testsuite.
|
||||||
|
// we may great the log for FAILURE
|
||||||
|
logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " + "The code will probably need fixing!", new Exception("trace"));
|
||||||
|
}
|
||||||
|
|
||||||
if (Env.isTestEnv()) {
|
if (logger.isDebugEnabled()) {
|
||||||
// this will only show when inside the testsuite.
|
logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
|
||||||
// we may great the log for FAILURE
|
}
|
||||||
logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " +
|
|
||||||
"The code will probably need fixing!", new Exception("trace"));
|
|
||||||
}
|
}
|
||||||
|
return channel.isWritable();
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
|
|
||||||
}
|
|
||||||
return canWrite(requiredCapacity);
|
|
||||||
} else {
|
} else {
|
||||||
final long timeoutNanos = timeUnit.toNanos(timeout);
|
final long timeoutNanos = timeUnit.toNanos(timeout);
|
||||||
final long deadline = System.nanoTime() + timeoutNanos;
|
final long deadline = System.nanoTime() + timeoutNanos;
|
||||||
|
@ -333,7 +319,7 @@ public class NettyConnection implements Connection {
|
||||||
parkNanos = 1000L;
|
parkNanos = 1000L;
|
||||||
}
|
}
|
||||||
boolean canWrite;
|
boolean canWrite;
|
||||||
while (!(canWrite = canWrite(requiredCapacity)) && (System.nanoTime() - deadline) < 0) {
|
while (!(canWrite = channel.isWritable()) && (System.nanoTime() - deadline) < 0) {
|
||||||
//periodically check the connection state
|
//periodically check the connection state
|
||||||
checkConnectionState();
|
checkConnectionState();
|
||||||
LockSupport.parkNanos(parkNanos);
|
LockSupport.parkNanos(parkNanos);
|
||||||
|
@ -348,31 +334,12 @@ public class NettyConnection implements Connection {
|
||||||
return !inEventLoop;
|
return !inEventLoop;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canWrite(final int requiredCapacity) {
|
|
||||||
//evaluate if the write request could be taken:
|
|
||||||
//there is enough space in the write buffer?
|
|
||||||
final long totalPendingWrites = this.pendingWritesOnChannel();
|
|
||||||
final boolean canWrite;
|
|
||||||
if (requiredCapacity > this.writeBufferHighWaterMark) {
|
|
||||||
canWrite = totalPendingWrites == 0;
|
|
||||||
} else {
|
|
||||||
canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark;
|
|
||||||
}
|
|
||||||
return canWrite;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void write(ActiveMQBuffer buffer,
|
public final void write(ActiveMQBuffer buffer,
|
||||||
final boolean flush,
|
final boolean flush,
|
||||||
final boolean batched,
|
final boolean batched,
|
||||||
final ChannelFutureListener futureListener) {
|
final ChannelFutureListener futureListener) {
|
||||||
final int readableBytes = buffer.readableBytes();
|
final int readableBytes = buffer.readableBytes();
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
|
|
||||||
if (remainingBytes < 0) {
|
|
||||||
logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//no need to lock because the Netty's channel is thread-safe
|
//no need to lock because the Netty's channel is thread-safe
|
||||||
//and the order of write is ensured by the order of the write calls
|
//and the order of write is ensured by the order of the write calls
|
||||||
final Channel channel = this.channel;
|
final Channel channel = this.channel;
|
||||||
|
@ -385,10 +352,9 @@ public class NettyConnection implements Connection {
|
||||||
final ChannelFuture future;
|
final ChannelFuture future;
|
||||||
final ByteBuf bytes = buffer.byteBuf();
|
final ByteBuf bytes = buffer.byteBuf();
|
||||||
assert readableBytes >= 0;
|
assert readableBytes >= 0;
|
||||||
final int writeBatchSize = this.batchLimit;
|
|
||||||
final boolean batchingEnabled = this.batchingEnabled;
|
final boolean batchingEnabled = this.batchingEnabled;
|
||||||
if (batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
|
if (batchingEnabled && batched && !flush && channel.isWritable()) {
|
||||||
future = writeBatch(bytes, readableBytes, promise);
|
future = channel.write(bytes, promise);
|
||||||
} else {
|
} else {
|
||||||
future = channel.writeAndFlush(bytes, promise);
|
future = channel.writeAndFlush(bytes, promise);
|
||||||
}
|
}
|
||||||
|
@ -411,22 +377,6 @@ public class NettyConnection implements Connection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChannelFuture writeBatch(final ByteBuf bytes, final int readableBytes, final ChannelPromise promise) {
|
|
||||||
final int batchBufferSize = batchBufferSize(channel, this.writeBufferHighWaterMark);
|
|
||||||
final int nextBatchSize = batchBufferSize + readableBytes;
|
|
||||||
if (nextBatchSize > batchLimit) {
|
|
||||||
//request to flush before writing, to create the chance to make the channel writable again
|
|
||||||
channel.flush();
|
|
||||||
//let netty use its write batching ability
|
|
||||||
return channel.write(bytes, promise);
|
|
||||||
} else if (nextBatchSize == batchLimit) {
|
|
||||||
return channel.writeAndFlush(bytes, promise);
|
|
||||||
} else {
|
|
||||||
//let netty use its write batching ability
|
|
||||||
return channel.write(bytes, promise);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final String getRemoteAddress() {
|
public final String getRemoteAddress() {
|
||||||
SocketAddress address = channel.remoteAddress();
|
SocketAddress address = channel.remoteAddress();
|
||||||
|
|
|
@ -46,18 +46,17 @@ public interface Connection {
|
||||||
boolean isOpen();
|
boolean isOpen();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
|
* Causes the current thread to wait until the connection is writable unless the specified waiting time elapses.
|
||||||
* The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control
|
* The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control
|
||||||
* only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers.
|
* only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers.
|
||||||
* If the current thread is not allowed to block the timeout will be ignored dependently on the connection type.
|
* If the current thread is not allowed to block the timeout will be ignored dependently on the connection type.
|
||||||
*
|
*
|
||||||
* @param requiredCapacity the capacity in bytes to be enqueued
|
|
||||||
* @param timeout the maximum time to wait
|
* @param timeout the maximum time to wait
|
||||||
* @param timeUnit the time unit of the timeout argument
|
* @param timeUnit the time unit of the timeout argument
|
||||||
* @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise
|
* @return {@code true} if the connection is writable, {@code false} otherwise
|
||||||
* @throws IllegalStateException if the connection is closed
|
* @throws IllegalStateException if the connection is closed
|
||||||
*/
|
*/
|
||||||
default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
|
default boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +84,13 @@ public interface Connection {
|
||||||
*/
|
*/
|
||||||
void write(ActiveMQBuffer buffer, boolean requestFlush);
|
void write(ActiveMQBuffer buffer, boolean requestFlush);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request to flush any previous written buffers into the wire.
|
||||||
|
*/
|
||||||
|
default void flush() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
|
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
|
||||||
*
|
*
|
||||||
|
|
|
@ -237,7 +237,7 @@ public class ChannelImplTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean blockUntilWritable(int size, long timeout) {
|
public boolean blockUntilWritable(long timeout) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayDeque;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -80,7 +80,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||||
|
@ -134,7 +133,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
|
|
||||||
private List<Interceptor> outgoingInterceptors = null;
|
private List<Interceptor> outgoingInterceptors = null;
|
||||||
|
|
||||||
private final ArrayList<Packet> pendingPackets;
|
private final ArrayDeque<Packet> pendingPackets;
|
||||||
|
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
// Constructors --------------------------------------------------
|
||||||
|
@ -146,7 +145,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
this.criticalErrorListener = criticalErrorListener;
|
this.criticalErrorListener = criticalErrorListener;
|
||||||
this.wantedFailBack = wantedFailBack;
|
this.wantedFailBack = wantedFailBack;
|
||||||
this.activation = activation;
|
this.activation = activation;
|
||||||
this.pendingPackets = new ArrayList<>();
|
this.pendingPackets = new ArrayDeque<>();
|
||||||
this.supportResponseBatching = false;
|
this.supportResponseBatching = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,18 +261,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void endOfBatch() {
|
public void endOfBatch() {
|
||||||
final ArrayList<Packet> pendingPackets = this.pendingPackets;
|
final ArrayDeque<Packet> pendingPackets = this.pendingPackets;
|
||||||
if (pendingPackets.isEmpty()) {
|
if (pendingPackets.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
for (int i = 0, size = pendingPackets.size(); i < size; i++) {
|
||||||
for (int i = 0, size = pendingPackets.size(); i < size; i++) {
|
final Packet packet = pendingPackets.poll();
|
||||||
final Packet packet = pendingPackets.get(i);
|
final boolean isLast = i == (size - 1);
|
||||||
final boolean isLast = i == (size - 1);
|
channel.send(packet, isLast);
|
||||||
channel.send(packet, isLast);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
pendingPackets.clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,10 +26,15 @@ import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.PooledByteBufAllocator;
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.channel.SingleThreadEventLoop;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
@ -69,6 +74,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
@ -76,6 +82,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -126,13 +134,11 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
|
|
||||||
private final ExecutorFactory ioExecutorFactory;
|
private final ExecutorFactory ioExecutorFactory;
|
||||||
|
|
||||||
private final Executor replicationStream;
|
|
||||||
|
|
||||||
private SessionFailureListener failureListener;
|
private SessionFailureListener failureListener;
|
||||||
|
|
||||||
private CoreRemotingConnection remotingConnection;
|
private CoreRemotingConnection remotingConnection;
|
||||||
|
|
||||||
private final long timeout;
|
private final long maxAllowedSlownessNanos;
|
||||||
|
|
||||||
private final long initialReplicationSyncTimeout;
|
private final long initialReplicationSyncTimeout;
|
||||||
|
|
||||||
|
@ -140,6 +146,32 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
|
|
||||||
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
|
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
|
||||||
|
|
||||||
|
private static final class ReplicatePacketRequest {
|
||||||
|
|
||||||
|
final Packet packet;
|
||||||
|
final OperationContext context;
|
||||||
|
// Although this field is needed just during the initial sync,
|
||||||
|
// the JVM field layout would likely left 4 bytes of wasted space without it
|
||||||
|
// so it makes sense to use it instead.
|
||||||
|
final ReusableLatch done;
|
||||||
|
|
||||||
|
ReplicatePacketRequest(Packet packet, OperationContext context, ReusableLatch done) {
|
||||||
|
this.packet = packet;
|
||||||
|
this.context = context;
|
||||||
|
this.done = done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Queue<ReplicatePacketRequest> replicatePacketRequests;
|
||||||
|
private final Executor replicationStream;
|
||||||
|
private final ScheduledExecutorService scheduledExecutorService;
|
||||||
|
private ScheduledFuture<?> slowReplicationChecker;
|
||||||
|
private long notWritableFrom;
|
||||||
|
private boolean checkSlowReplication;
|
||||||
|
private final ReadyListener onResume;
|
||||||
|
private boolean isFlushing;
|
||||||
|
private boolean awaitingResume;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param remotingConnection
|
* @param remotingConnection
|
||||||
*/
|
*/
|
||||||
|
@ -153,8 +185,23 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||||
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
||||||
this.remotingConnection = remotingConnection;
|
this.remotingConnection = remotingConnection;
|
||||||
this.replicationStream = ioExecutorFactory.getExecutor();
|
final Connection transportConnection = this.remotingConnection.getTransportConnection();
|
||||||
this.timeout = timeout;
|
if (transportConnection instanceof NettyConnection) {
|
||||||
|
final EventLoop eventLoop = ((NettyConnection) transportConnection).getNettyChannel().eventLoop();
|
||||||
|
this.replicationStream = eventLoop;
|
||||||
|
this.scheduledExecutorService = eventLoop;
|
||||||
|
} else {
|
||||||
|
this.replicationStream = ioExecutorFactory.getExecutor();
|
||||||
|
this.scheduledExecutorService = null;
|
||||||
|
}
|
||||||
|
this.maxAllowedSlownessNanos = timeout > 0 ? TimeUnit.MILLISECONDS.toNanos(timeout) : -1;
|
||||||
|
this.replicatePacketRequests = PlatformDependent.newMpscQueue();
|
||||||
|
this.slowReplicationChecker = null;
|
||||||
|
this.notWritableFrom = Long.MAX_VALUE;
|
||||||
|
this.awaitingResume = false;
|
||||||
|
this.onResume = this::resume;
|
||||||
|
this.isFlushing = false;
|
||||||
|
this.checkSlowReplication = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void appendUpdateRecord(final byte journalID,
|
public void appendUpdateRecord(final byte journalID,
|
||||||
|
@ -286,6 +333,25 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
replicatingChannel.setHandler(responseHandler);
|
replicatingChannel.setHandler(responseHandler);
|
||||||
failureListener = new ReplicatedSessionFailureListener();
|
failureListener = new ReplicatedSessionFailureListener();
|
||||||
remotingConnection.addFailureListener(failureListener);
|
remotingConnection.addFailureListener(failureListener);
|
||||||
|
// only Netty connections can enable slow replication checker
|
||||||
|
if (scheduledExecutorService != null && maxAllowedSlownessNanos >= 0) {
|
||||||
|
long periodNanos = maxAllowedSlownessNanos / 10;
|
||||||
|
if (periodNanos > TimeUnit.SECONDS.toNanos(1)) {
|
||||||
|
periodNanos = TimeUnit.SECONDS.toNanos(1);
|
||||||
|
} else if (periodNanos < TimeUnit.MILLISECONDS.toNanos(100)) {
|
||||||
|
logger.warnf("The cluster call timeout is too low ie %d ms: consider raising it to save CPU",
|
||||||
|
TimeUnit.NANOSECONDS.toMillis(maxAllowedSlownessNanos));
|
||||||
|
periodNanos = TimeUnit.MILLISECONDS.toNanos(100);
|
||||||
|
}
|
||||||
|
logger.debugf("Slow replication checker is running with a period of %d ms", TimeUnit.NANOSECONDS.toMillis(periodNanos));
|
||||||
|
// The slow detection has been implemented by using an always-on timer task
|
||||||
|
// instead of triggering one each time we detect an un-writable channel because:
|
||||||
|
// - getting temporarily an un-writable channel is rather common under load and scheduling/cancelling a
|
||||||
|
// timed task is a CPU and GC intensive operation
|
||||||
|
// - choosing a period of 100-1000 ms lead to a reasonable and constant CPU utilization while idle too
|
||||||
|
slowReplicationChecker = scheduledExecutorService.scheduleAtFixedRate(this::checkSlowReplication,
|
||||||
|
periodNanos, periodNanos, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
started = true;
|
started = true;
|
||||||
|
|
||||||
|
@ -317,6 +383,11 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
|
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (slowReplicationChecker != null) {
|
||||||
|
slowReplicationChecker.cancel(false);
|
||||||
|
slowReplicationChecker = null;
|
||||||
|
}
|
||||||
|
|
||||||
enabled = false;
|
enabled = false;
|
||||||
|
|
||||||
if (clearTokens) {
|
if (clearTokens) {
|
||||||
|
@ -374,6 +445,10 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
|
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
|
||||||
|
return sendReplicatePacket(packet, lineUp, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, ReusableLatch done) {
|
||||||
if (!enabled) {
|
if (!enabled) {
|
||||||
packet.release();
|
packet.release();
|
||||||
return null;
|
return null;
|
||||||
|
@ -383,29 +458,48 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
if (lineUp) {
|
if (lineUp) {
|
||||||
repliToken.replicationLineUp();
|
repliToken.replicationLineUp();
|
||||||
}
|
}
|
||||||
|
final ReplicatePacketRequest request = new ReplicatePacketRequest(packet, repliToken, done);
|
||||||
|
replicatePacketRequests.add(request);
|
||||||
replicationStream.execute(() -> {
|
replicationStream.execute(() -> {
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
pendingTokens.add(repliToken);
|
sendReplicatedPackets(false);
|
||||||
flowControl(packet.expectedEncodeSize());
|
|
||||||
replicatingChannel.send(packet);
|
|
||||||
} else {
|
} else {
|
||||||
packet.release();
|
releaseReplicatedPackets(replicatePacketRequests);
|
||||||
repliToken.replicationDone();
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
return repliToken;
|
return repliToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private void releaseReplicatedPackets(Queue<ReplicatePacketRequest> requests) {
|
||||||
* This was written as a refactoring of sendReplicatePacket.
|
assert checkEventLoop();
|
||||||
* In case you refactor this in any way, this method must hold a lock on replication lock. .
|
ReplicatePacketRequest req;
|
||||||
*/
|
while ((req = requests.poll()) != null) {
|
||||||
private boolean flowControl(int size) {
|
req.packet.release();
|
||||||
boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout);
|
req.context.replicationDone();
|
||||||
|
if (req.done != null) {
|
||||||
|
req.done.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!flowWorked) {
|
private void checkSlowReplication() {
|
||||||
|
if (!enabled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
assert checkEventLoop();
|
||||||
|
if (!checkSlowReplication) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final boolean isWritable = replicatingChannel.getConnection().blockUntilWritable(0);
|
||||||
|
if (isWritable) {
|
||||||
|
checkSlowReplication = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final long elapsedNanosNotWritable = System.nanoTime() - notWritableFrom;
|
||||||
|
if (elapsedNanosNotWritable >= maxAllowedSlownessNanos) {
|
||||||
|
checkSlowReplication = false;
|
||||||
|
releaseReplicatedPackets(replicatePacketRequests);
|
||||||
try {
|
try {
|
||||||
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
|
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
|
||||||
stop();
|
stop();
|
||||||
|
@ -413,8 +507,84 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return flowWorked;
|
private void resume() {
|
||||||
|
sendReplicatedPackets(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendReplicatedPackets(boolean resume) {
|
||||||
|
assert checkEventLoop();
|
||||||
|
if (resume) {
|
||||||
|
awaitingResume = false;
|
||||||
|
}
|
||||||
|
// We try to:
|
||||||
|
// - save recursive calls of resume due to flushConnection
|
||||||
|
// - saving flush pending writes *if* the OS hasn't notified that's writable again
|
||||||
|
if (awaitingResume || isFlushing || !enabled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (replicatePacketRequests.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
isFlushing = true;
|
||||||
|
final CoreRemotingConnection connection = replicatingChannel.getConnection();
|
||||||
|
try {
|
||||||
|
while (connection.blockUntilWritable(0)) {
|
||||||
|
checkSlowReplication = false;
|
||||||
|
final ReplicatePacketRequest request = replicatePacketRequests.poll();
|
||||||
|
if (request == null) {
|
||||||
|
replicatingChannel.flushConnection();
|
||||||
|
// given that there isn't any more work to do, we're not interested
|
||||||
|
// to check writability state to trigger the slow connection check
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
pendingTokens.add(request.context);
|
||||||
|
final Packet pack = request.packet;
|
||||||
|
final ReusableLatch done = request.done;
|
||||||
|
if (done != null) {
|
||||||
|
done.countDown();
|
||||||
|
}
|
||||||
|
replicatingChannel.send(pack, false);
|
||||||
|
}
|
||||||
|
replicatingChannel.flushConnection();
|
||||||
|
assert !awaitingResume;
|
||||||
|
// we care about writability just if there is some work to do
|
||||||
|
if (!replicatePacketRequests.isEmpty()) {
|
||||||
|
if (!connection.isWritable(onResume)) {
|
||||||
|
checkSlowReplication = true;
|
||||||
|
notWritableFrom = System.nanoTime();
|
||||||
|
awaitingResume = true;
|
||||||
|
} else {
|
||||||
|
// submit itself again to continue draining:
|
||||||
|
// we're not trying it again here to save read starvation
|
||||||
|
// NOTE: maybe it's redundant because there are already others in-flights requests
|
||||||
|
replicationStream.execute(() -> sendReplicatedPackets(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
assert !(t instanceof AssertionError) : t.getMessage();
|
||||||
|
if (!connection.getTransportConnection().isOpen()) {
|
||||||
|
// that's an handled state: right after this cleanup is expected to be stopped/closed
|
||||||
|
// or get the failure listener to be called!
|
||||||
|
logger.trace("Transport connection closed: cleaning up replicate tokens", t);
|
||||||
|
releaseReplicatedPackets(replicatePacketRequests);
|
||||||
|
// cleanup ReadyListener without triggering any further write/flush
|
||||||
|
connection.getTransportConnection().fireReady(true);
|
||||||
|
} else {
|
||||||
|
logger.warn("Unexpected error while flushing replicate packets", t);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
isFlushing = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkEventLoop() {
|
||||||
|
if (!(replicationStream instanceof SingleThreadEventLoop)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
final SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) replicationStream;
|
||||||
|
return eventLoop.inEventLoop();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -423,6 +593,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
* packets were not sent with {@link #sendReplicatePacket(Packet)}.
|
* packets were not sent with {@link #sendReplicatePacket(Packet)}.
|
||||||
*/
|
*/
|
||||||
private void replicated() {
|
private void replicated() {
|
||||||
|
assert checkEventLoop();
|
||||||
OperationContext ctx = pendingTokens.poll();
|
OperationContext ctx = pendingTokens.poll();
|
||||||
|
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
|
@ -528,24 +699,6 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
|
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FlushAction implements Runnable {
|
|
||||||
|
|
||||||
ReusableLatch latch = new ReusableLatch(1);
|
|
||||||
|
|
||||||
public void reset() {
|
|
||||||
latch.setCount(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean await(long timeout, TimeUnit unit) throws Exception {
|
|
||||||
return latch.await(timeout, unit);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends large files in reasonably sized chunks to the backup during replication synchronization.
|
* Sends large files in reasonably sized chunks to the backup during replication synchronization.
|
||||||
*
|
*
|
||||||
|
@ -566,12 +719,12 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
if (!file.isOpen()) {
|
if (!file.isOpen()) {
|
||||||
file.open();
|
file.open();
|
||||||
}
|
}
|
||||||
int size = 32 * 1024;
|
final int size = 32 * 1024;
|
||||||
|
|
||||||
int flowControlSize = 10;
|
int flowControlSize = 10;
|
||||||
|
|
||||||
int packetsSent = 0;
|
int packetsSent = 0;
|
||||||
FlushAction action = new FlushAction();
|
final ReusableLatch flushed = new ReusableLatch(1);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
|
try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
|
||||||
|
@ -593,32 +746,33 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
maxBytesToSend = maxBytesToSend - bytesRead;
|
maxBytesToSend = maxBytesToSend - bytesRead;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debugf("sending %d bytes on file %s", buffer.writerIndex(), file.getFileName());
|
||||||
|
}
|
||||||
// sending -1 or 0 bytes will close the file at the backup
|
// sending -1 or 0 bytes will close the file at the backup
|
||||||
// We cannot simply send everything of a file through the executor,
|
final boolean lastPacket = bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0;
|
||||||
// otherwise we would run out of memory.
|
final boolean flowControlCheck = (packetsSent % flowControlSize == 0) || lastPacket;
|
||||||
// so we don't use the executor here
|
if (flowControlCheck) {
|
||||||
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
|
flushed.setCount(1);
|
||||||
|
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, flushed);
|
||||||
|
awaitFlushOfReplicationStream(flushed);
|
||||||
|
} else {
|
||||||
|
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
|
||||||
|
}
|
||||||
packetsSent++;
|
packetsSent++;
|
||||||
|
|
||||||
if (packetsSent % flowControlSize == 0) {
|
if (lastPacket)
|
||||||
flushReplicationStream(action);
|
|
||||||
}
|
|
||||||
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
flushReplicationStream(action);
|
|
||||||
} finally {
|
} finally {
|
||||||
if (file.isOpen())
|
if (file.isOpen())
|
||||||
file.close();
|
file.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushReplicationStream(FlushAction action) throws Exception {
|
private void awaitFlushOfReplicationStream(ReusableLatch flushed) throws Exception {
|
||||||
action.reset();
|
if (!flushed.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
|
||||||
replicationStream.execute(action);
|
|
||||||
if (!action.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
|
|
||||||
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -222,6 +222,11 @@ public class BackupSyncDelay implements Interceptor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flushConnection() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean sendAndFlush(Packet packet) {
|
public boolean sendAndFlush(Packet packet) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class NettyConnectionTest extends ActiveMQTestBase {
|
||||||
conn.close();
|
conn.close();
|
||||||
//to make sure the channel is closed it needs to run the pending tasks
|
//to make sure the channel is closed it needs to run the pending tasks
|
||||||
channel.runPendingTasks();
|
channel.runPendingTasks();
|
||||||
conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
|
conn.blockUntilWritable(0, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue