ARTEMIS-1089 Improving flow control on replication

(Cherry picked from 911888e8d1)
This commit is contained in:
Clebert Suconic 2017-04-06 11:47:31 -04:00
parent f8be6a460d
commit 8a4a307653
26 changed files with 267 additions and 104 deletions

View File

@ -111,4 +111,12 @@ public interface CoreRemotingConnection extends RemotingConnection {
* @return the principal
*/
ActiveMQPrincipal getDefaultActiveMQPrincipal();
/**
*
* @param size size we are trying to write
* @param timeout
* @return
*/
boolean blockUntilWritable(int size, long timeout);
}

View File

@ -24,6 +24,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
*/
public interface Packet {
int INITIAL_PACKET_SIZE = 1500;
/**
* Sets the channel id that should be used once the packet has been successfully decoded it is
* sent to the correct channel.
@ -32,6 +34,14 @@ public interface Packet {
*/
void setChannelID(long channelID);
/**
* This will return the expected packet size for the encoding
* @return
*/
default int expectedEncodeSize() {
return INITIAL_PACKET_SIZE;
}
/**
* Returns the channel id of the channel that should handle this packet.
*

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
@ -29,8 +31,6 @@ public class PacketImpl implements Packet {
public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG;
private static final int INITIAL_PACKET_SIZE = 1500;
protected long channelID;
private final byte type;
@ -294,6 +294,17 @@ public class PacketImpl implements Packet {
return buffer;
}
protected ActiveMQBuffer createPacket(RemotingConnection connection) {
int size = expectedEncodeSize();
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {
return connection.createTransportBuffer(size);
}
}
@Override
public void decode(final ActiveMQBuffer buffer) {
channelID = buffer.readLong();

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -230,6 +231,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
callClosingListeners();
}
@Override
public boolean blockUntilWritable(int size, long timeout) {
return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
}
@Override
public void disconnect(final boolean criticalError) {
disconnect(null, criticalError);

View File

@ -100,15 +100,6 @@ public abstract class SessionContinuationMessage extends PacketImpl {
return buffer;
}
protected final ActiveMQBuffer createPacket(RemotingConnection connection) {
final int expectedEncodedSize = expectedEncodedSize();
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
} else {
return connection.createTransportBuffer(expectedEncodedSize);
}
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(body.length);

View File

@ -70,8 +70,8 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
// Protected -----------------------------------------------------
@Override
protected final int expectedEncodedSize() {
return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
public int expectedEncodeSize() {
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
}
// Public --------------------------------------------------------
@ -128,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
return true;
}
}
}

View File

@ -52,6 +52,7 @@ public class SessionReceiveMessage extends MessagePacket {
return deliveryCount;
}
@Override
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer();

View File

@ -93,8 +93,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
}
@Override
protected final int expectedEncodedSize() {
return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
public int expectedEncodeSize() {
return super.expectedEncodeSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
}
@Override

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationAddMessage extends PacketImpl {
@ -59,10 +60,20 @@ public final class ReplicationAddMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean());
DataConstants.SIZE_LONG + // buffer.writeLong(id);
DataConstants.SIZE_BYTE + // buffer.writeByte(journalRecordType);
DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData));
persister.getEncodeSize(encodingData);// persister.encode(buffer, encodingData);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeBoolean(operation.toBoolean());
buffer.writeLong(id);
buffer.writeByte(journalRecordType);

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationAddTXMessage extends PacketImpl {
@ -63,6 +64,18 @@ public class ReplicationAddTXMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean());
DataConstants.SIZE_LONG + // buffer.writeLong(txId);
DataConstants.SIZE_LONG + // buffer.writeLong(id);
DataConstants.SIZE_BYTE + // buffer.writeByte(recordType);
DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData));
persister.getEncodeSize(encodingData); // persister.encode(buffer, encodingData);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationCommitMessage extends PacketImpl {
@ -41,6 +42,14 @@ public final class ReplicationCommitMessage extends PacketImpl {
this.txId = txId;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(rollback);
DataConstants.SIZE_LONG; // buffer.writeLong(txId);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationDeleteMessage extends PacketImpl {
@ -38,6 +39,14 @@ public final class ReplicationDeleteMessage extends PacketImpl {
this.id = id;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
DataConstants.SIZE_LONG; // buffer.writeLong(id);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);

View File

@ -21,6 +21,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationDeleteTXMessage extends PacketImpl {
@ -52,6 +53,16 @@ public class ReplicationDeleteTXMessage extends PacketImpl {
this.encodingData = encodingData;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
DataConstants.SIZE_LONG + // buffer.writeLong(txId);
DataConstants.SIZE_LONG + // buffer.writeLong(id);
DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize());
encodingData.getEncodeSize(); // encodingData.encode(buffer);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageBeginMessage extends PacketImpl {
@ -32,6 +33,14 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN);
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageEndMessage extends PacketImpl {
@ -32,6 +33,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
this.messageId = messageId;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);

View File

@ -20,6 +20,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
@ -42,6 +43,15 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
this.body = body;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG + // buffer.writeLong(messageId);
DataConstants.SIZE_LONG + // buffer.writeLong(messageId);
DataConstants.SIZE_INT + // buffer.writeInt(body.length);
body.length; // buffer.writeBytes(body);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
/**
* Message indicating that the live is stopping (a scheduled stop).
@ -59,6 +60,12 @@ public final class ReplicationLiveIsStoppingMessage extends PacketImpl {
this.liveStopping = b;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_INT; // buffer.writeInt(liveStopping.code);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(liveStopping.code);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationPageEventMessage extends PacketImpl {
@ -42,6 +43,14 @@ public class ReplicationPageEventMessage extends PacketImpl {
this.storeName = storeName;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
SimpleString.sizeofString(storeName) + // buffer.writeSimpleString(storeName);
DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(storeName);

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationPageWriteMessage extends PacketImpl {
@ -39,6 +40,13 @@ public class ReplicationPageWriteMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(pageNumber);

View File

@ -21,6 +21,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationPrepareMessage extends PacketImpl {
@ -48,6 +49,15 @@ public final class ReplicationPrepareMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
DataConstants.SIZE_LONG + // buffer.writeLong(txId);
DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize());
encodingData.getEncodeSize(); // encodingData.encode(buffer);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);

View File

@ -27,4 +27,11 @@ public class ReplicationResponseMessage extends PacketImpl {
public ReplicationResponseMessage(byte replicationResponseV2) {
super(replicationResponseV2);
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage {
@ -41,6 +42,12 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
/**
* This message may signal start or end of the replication synchronization.
@ -109,6 +110,26 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
}
@Override
public int expectedEncodeSize() {
int size = PACKET_HEADERS_SIZE +
DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(synchronizationIsFinished);
DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(allowsAutoFailBack);
nodeID.length() * 3; // buffer.writeString(nodeID); -- an estimate
if (synchronizationIsFinished) {
return size;
}
size += DataConstants.SIZE_BYTE + // buffer.writeByte(dataType.code);
DataConstants.SIZE_INT + // buffer.writeInt(ids.length);
DataConstants.SIZE_LONG * ids.length; // the write loop
return size;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeBoolean(synchronizationIsFinished);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
/**
* Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls
@ -98,6 +99,38 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
}
@Override
public int expectedEncodeSize() {
int size = PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
if (fileId == -1)
return size;
size += DataConstants.SIZE_BYTE; // buffer.writeByte(fileType.code);
switch (fileType) {
case JOURNAL: {
size += DataConstants.SIZE_BYTE; // buffer.writeByte(journalType.typeByte);
break;
}
case PAGE: {
size += SimpleString.sizeofString(pageStoreName);
break;
}
case LARGE_MESSAGE:
default:
// no-op
}
size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
if (dataSize > 0) {
size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
return size;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(fileId);
@ -126,11 +159,6 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
if (byteBuffer != null) {
byteBuffer.release();
byteBuffer = null;
}
}
@Override

View File

@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.util.Map;
import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
public class NettyServerConnection extends NettyConnection {
@ -33,8 +31,4 @@ public class NettyServerConnection extends NettyConnection {
super(configuration, channel, listener, batchingEnabled, directDeliver);
}
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
}
}

View File

@ -33,7 +33,6 @@ import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
@ -42,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@ -70,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
@ -83,7 +82,7 @@ import org.jboss.logging.Logger;
*
* @see ReplicationEndpoint
*/
public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
public final class ReplicationManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(ReplicationManager.class);
@ -118,8 +117,6 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
private final AtomicBoolean writable = new AtomicBoolean(true);
private final Object replicationLock = new Object();
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory executorFactory;
@ -289,12 +286,9 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
}
synchronized (replicationLock) {
enabled = false;
writable.set(true);
replicationLock.notifyAll();
clearReplicationTokens();
}
enabled = false;
writable.set(true);
clearReplicationTokens();
RemotingConnection toStop = remotingConnection;
if (toStop != null) {
@ -312,16 +306,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
*/
public void clearReplicationTokens() {
logger.trace("clearReplicationTokens initiating");
synchronized (replicationLock) {
logger.trace("clearReplicationTokens entered the lock");
while (!pendingTokens.isEmpty()) {
OperationContext ctx = pendingTokens.poll();
logger.trace("Calling ctx.replicationDone()");
try {
ctx.replicationDone();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
}
while (!pendingTokens.isEmpty()) {
OperationContext ctx = pendingTokens.poll();
logger.trace("Calling ctx.replicationDone()");
try {
ctx.replicationDone();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
}
}
logger.trace("clearReplicationTokens finished");
@ -359,24 +350,22 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
repliToken.replicationLineUp();
}
synchronized (replicationLock) {
if (enabled) {
pendingTokens.add(repliToken);
if (useExecutor) {
replicationStream.execute(() -> {
if (enabled) {
flowControl();
replicatingChannel.send(packet);
}
});
} else {
flowControl();
replicatingChannel.send(packet);
}
if (enabled) {
pendingTokens.add(repliToken);
if (useExecutor) {
replicationStream.execute(() -> {
if (enabled) {
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
});
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
}
// Execute outside lock
@ -392,47 +381,20 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
* This was written as a refactoring of sendReplicatePacket.
* In case you refactor this in any way, this method must hold a lock on replication lock. .
*/
private boolean flowControl() {
synchronized (replicationLock) {
// synchronized (replicationLock) { -- I'm not adding this because the caller already has it
// future maintainers of this code please be aware that the intention here is hold the lock on replication lock
if (!replicatingChannel.getConnection().isWritable(this)) {
try {
logger.trace("flowControl waiting on writable replication");
writable.set(false);
//don't wait for ever as this may hang tests etc, we've probably been closed anyway
long now = System.currentTimeMillis();
long deadline = now + timeout;
while (!writable.get() && now < deadline) {
replicationLock.wait(deadline - now);
now = System.currentTimeMillis();
}
logger.trace("flow control done on replication");
private boolean flowControl(int size) {
boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout);
if (!writable.get()) {
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
try {
stop();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return false;
}
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
if (!flowWorked) {
try {
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
stop();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
return true;
}
@Override
public void readyForWriting() {
synchronized (replicationLock) {
writable.set(true);
replicationLock.notifyAll();
}
return flowWorked;
}
/**
@ -566,15 +528,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
if (!file.isOpen()) {
file.open();
}
int size = 32 * 1024;
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
try {
try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
final FileChannel channel = fis.getChannel()) {
// We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
int size = 1 << 17;
while (true) {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer);
int toSend = bytesRead;
@ -597,6 +561,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
}
} finally {
buffer.release();
if (file.isOpen())
file.close();
}