diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index a86c5c102b..efb9aa6fe3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -93,4 +93,11 @@ public interface Packet { * @return true if confirmation is required */ boolean isRequiresConfirmations(); + + + + /** The packe wasn't used because the stream is closed, + * this gives a chance to sub classes to cleanup anything that won't be used. */ + default void release() { + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 99c052bd3a..afbaf53b3f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -354,6 +354,7 @@ public class PacketImpl implements Packet { return result; } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 4d3c32fa2c..b81782bcd0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl { if (dataSize > 0) { buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } + + release(); + } + + @Override + public void release() { + if (byteBuffer != null) { + byteBuffer.release(); + byteBuffer = null; + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index e1027d48c9..d298a24880 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; @@ -92,8 +93,7 @@ public final class ReplicationManager implements ActiveMQComponent { public boolean toBoolean() { return true; } - }, - ADD { + }, ADD { @Override public boolean toBoolean() { return false; @@ -129,6 +129,8 @@ public final class ReplicationManager implements ActiveMQComponent { private final long timeout; + private final long initialReplicationSyncTimeout; + private volatile boolean inSync = true; private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); @@ -138,8 +140,10 @@ public final class ReplicationManager implements ActiveMQComponent { */ public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, + final long initialReplicationSyncTimeout, final ExecutorFactory executorFactory) { this.executorFactory = executorFactory; + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; this.replicationStream = executorFactory.getExecutor(); @@ -178,7 +182,7 @@ public final class ReplicationManager implements ActiveMQComponent { boolean sync, final boolean lineUp) throws Exception { if (enabled) { - sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true); + sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp); } } @@ -339,10 +343,10 @@ public final class ReplicationManager implements ActiveMQComponent { } private OperationContext sendReplicatePacket(final Packet packet) { - return sendReplicatePacket(packet, true, true); + return sendReplicatePacket(packet, true); } - private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) { + private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { if (!enabled) return null; boolean runItNow = false; @@ -353,22 +357,17 @@ public final class ReplicationManager implements ActiveMQComponent { } if (enabled) { - if (useExecutor) { - replicationStream.execute(() -> { - if (enabled) { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } - }); - } else { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } + replicationStream.execute(() -> { + if (enabled) { + pendingTokens.add(repliToken); + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); + } + }); } else { // Already replicating channel failed, so just play the action now runItNow = true; + packet.release(); } // Execute outside lock @@ -396,7 +395,6 @@ public final class ReplicationManager implements ActiveMQComponent { } } - return flowWorked; } @@ -511,6 +509,24 @@ public final class ReplicationManager implements ActiveMQComponent { sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); } + private class FlushAction implements Runnable { + + ReusableLatch latch = new ReusableLatch(1); + + public void reset() { + latch.setCount(1); + } + + public boolean await(long timeout, TimeUnit unit) throws Exception { + return latch.await(timeout, unit); + } + + @Override + public void run() { + latch.countDown(); + } + } + /** * Sends large files in reasonably sized chunks to the backup during replication synchronization. * @@ -532,15 +548,19 @@ public final class ReplicationManager implements ActiveMQComponent { file.open(); } int size = 32 * 1024; - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + + int flowControlSize = 10; + + int packetsSent = 0; + FlushAction action = new FlushAction(); try { - try (final FileInputStream fis = new FileInputStream(file.getJavaFile()); - final FileChannel channel = fis.getChannel()) { + try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { // We can afford having a single buffer here for this entire loop // because sendReplicatePacket will encode the packet as a NettyBuffer // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy while (true) { + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); buffer.clear(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); final int bytesRead = channel.read(byteBuffer); @@ -558,18 +578,31 @@ public final class ReplicationManager implements ActiveMQComponent { // We cannot simply send everything of a file through the executor, // otherwise we would run out of memory. // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false); + sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); + packetsSent++; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) break; } } + flushReplicationStream(action); } finally { - buffer.release(); if (file.isOpen()) file.close(); } } + private void flushReplicationStream(FlushAction action) throws Exception { + action.reset(); + replicationStream.execute(action); + if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) { + throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + } + } + /** * Reserve the following fileIDs in the backup server. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index c984ae2d6b..b532e57968 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation { ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); + replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 398e895c08..46cb085467 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -189,7 +189,7 @@ public final class ReplicationTest extends ActiveMQTestBase { setupServer(false); try { ClientSessionFactory sf = createSessionFactory(locator); - manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); + manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory); addActiveMQComponent(manager); manager.start(); Assert.fail("Exception was expected");