From ce6942a9aa9375efaa449424fe89de2db3f22e36 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 18 Aug 2017 15:01:33 -0400 Subject: [PATCH 1/2] ARTEMIS-1353 Initial replication of large messages out of executor This is based on the work @jbertram made at the github pr #1466 and the discussions we had there --- .../artemis/core/protocol/core/Packet.java | 7 ++ .../core/protocol/core/impl/PacketImpl.java | 1 + .../ReplicationSyncFileMessage.java | 10 +++ .../core/replication/ReplicationManager.java | 82 +++++++++++++------ .../impl/SharedNothingLiveActivation.java | 2 +- .../replication/ReplicationTest.java | 2 +- 6 files changed, 78 insertions(+), 26 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index a86c5c102b..efb9aa6fe3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -93,4 +93,11 @@ public interface Packet { * @return true if confirmation is required */ boolean isRequiresConfirmations(); + + + + /** The packe wasn't used because the stream is closed, + * this gives a chance to sub classes to cleanup anything that won't be used. */ + default void release() { + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 97a7973f7a..186a703368 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -402,6 +402,7 @@ public class PacketImpl implements Packet { return result; } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 4d3c32fa2c..b81782bcd0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl { if (dataSize > 0) { buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } + + release(); + } + + @Override + public void release() { + if (byteBuffer != null) { + byteBuffer.release(); + byteBuffer = null; + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 3b6f9d6253..73ad201e98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; @@ -93,8 +94,7 @@ public final class ReplicationManager implements ActiveMQComponent { public boolean toBoolean() { return true; } - }, - ADD { + }, ADD { @Override public boolean toBoolean() { return false; @@ -130,6 +130,8 @@ public final class ReplicationManager implements ActiveMQComponent { private final long timeout; + private final long initialReplicationSyncTimeout; + private volatile boolean inSync = true; private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); @@ -139,8 +141,10 @@ public final class ReplicationManager implements ActiveMQComponent { */ public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, + final long initialReplicationSyncTimeout, final ExecutorFactory executorFactory) { this.executorFactory = executorFactory; + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; this.replicationStream = executorFactory.getExecutor(); @@ -181,7 +185,7 @@ public final class ReplicationManager implements ActiveMQComponent { boolean sync, final boolean lineUp) throws Exception { if (enabled) { - sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true); + sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp); } } @@ -342,10 +346,10 @@ public final class ReplicationManager implements ActiveMQComponent { } private OperationContext sendReplicatePacket(final Packet packet) { - return sendReplicatePacket(packet, true, true); + return sendReplicatePacket(packet, true); } - private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) { + private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { if (!enabled) return null; boolean runItNow = false; @@ -356,22 +360,17 @@ public final class ReplicationManager implements ActiveMQComponent { } if (enabled) { - if (useExecutor) { - replicationStream.execute(() -> { - if (enabled) { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } - }); - } else { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } + replicationStream.execute(() -> { + if (enabled) { + pendingTokens.add(repliToken); + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); + } + }); } else { // Already replicating channel failed, so just play the action now runItNow = true; + packet.release(); } // Execute outside lock @@ -399,7 +398,6 @@ public final class ReplicationManager implements ActiveMQComponent { } } - return flowWorked; } @@ -514,6 +512,24 @@ public final class ReplicationManager implements ActiveMQComponent { sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); } + private class FlushAction implements Runnable { + + ReusableLatch latch = new ReusableLatch(1); + + public void reset() { + latch.setCount(1); + } + + public boolean await(long timeout, TimeUnit unit) throws Exception { + return latch.await(timeout, unit); + } + + @Override + public void run() { + latch.countDown(); + } + } + /** * Sends large files in reasonably sized chunks to the backup during replication synchronization. * @@ -535,15 +551,20 @@ public final class ReplicationManager implements ActiveMQComponent { file.open(); } int size = 32 * 1024; - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + + int flowControlSize = 10; + + int packetsSent = 0; + FlushAction action = new FlushAction(); try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); - FileChannel channel = fis.getChannel()) { + try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { + // We can afford having a single buffer here for this entire loop // because sendReplicatePacket will encode the packet as a NettyBuffer // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy while (true) { + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); buffer.clear(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); final int bytesRead = channel.read(byteBuffer); @@ -561,18 +582,31 @@ public final class ReplicationManager implements ActiveMQComponent { // We cannot simply send everything of a file through the executor, // otherwise we would run out of memory. // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false); + sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); + packetsSent++; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) break; } } + flushReplicationStream(action); } finally { - buffer.release(); if (file.isOpen()) file.close(); } } + private void flushReplicationStream(FlushAction action) throws Exception { + action.reset(); + replicationStream.execute(action); + if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) { + throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + } + } + /** * Reserve the following fileIDs in the backup server. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 355cefb73b..920366aa03 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation { ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); + replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 823670297a..05f3730e3d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -190,7 +190,7 @@ public final class ReplicationTest extends ActiveMQTestBase { setupServer(false); try { ClientSessionFactory sf = createSessionFactory(locator); - manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); + manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory); addActiveMQComponent(manager); manager.start(); Assert.fail("Exception was expected"); From 2033ee8c43279358e13530f0d168c7b415465f25 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 18 Aug 2017 16:36:03 -0400 Subject: [PATCH 2/2] NO-JIRA fixing MQTT Test --- .../imported/MQTTInterceptorPropertiesTest.java | 14 ++++++++++++-- .../integration/mqtt/imported/MQTTTestSupport.java | 9 +++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java index 375e2f2986..2600952944 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; @@ -96,7 +97,12 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { @Override public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { System.out.println("incoming"); - return checkMessageProperties(packet, expectedProperties); + if (packet.getClass() == MqttPublishMessage.class) { + return checkMessageProperties(packet, expectedProperties); + } else { + return true; + } + } }; @@ -104,7 +110,11 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { @Override public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { System.out.println("outgoing"); - return checkMessageProperties(packet, expectedProperties); + if (packet.getClass() == MqttPublishMessage.class) { + return checkMessageProperties(packet, expectedProperties); + } else { + return true; + } } }; server.getRemotingService().addIncomingInterceptor(incomingInterceptor); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index a45f06d88b..bac2e37b49 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -369,7 +370,9 @@ public class MQTTTestSupport extends ActiveMQTestBase { @Override public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { - messageCount++; + if (packet.getClass() == MqttPublishMessage.class) { + messageCount++; + } return true; } @@ -388,7 +391,9 @@ public class MQTTTestSupport extends ActiveMQTestBase { @Override public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { - messageCount++; + if (packet.getClass() == MqttPublishMessage.class) { + messageCount++; + } return true; }