This commit is contained in:
Clebert Suconic 2017-08-19 12:40:30 -04:00
commit 8966b599c4
8 changed files with 97 additions and 30 deletions

View File

@ -93,4 +93,11 @@ public interface Packet {
* @return true if confirmation is required * @return true if confirmation is required
*/ */
boolean isRequiresConfirmations(); boolean isRequiresConfirmations();
/** The packe wasn't used because the stream is closed,
* this gives a chance to sub classes to cleanup anything that won't be used. */
default void release() {
}
} }

View File

@ -402,6 +402,7 @@ public class PacketImpl implements Packet {
return result; return result;
} }
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {

View File

@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
if (dataSize > 0) { if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
} }
release();
}
@Override
public void release() {
if (byteBuffer != null) {
byteBuffer.release();
byteBuffer = null;
}
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -93,8 +94,7 @@ public final class ReplicationManager implements ActiveMQComponent {
public boolean toBoolean() { public boolean toBoolean() {
return true; return true;
} }
}, }, ADD {
ADD {
@Override @Override
public boolean toBoolean() { public boolean toBoolean() {
return false; return false;
@ -130,6 +130,8 @@ public final class ReplicationManager implements ActiveMQComponent {
private final long timeout; private final long timeout;
private final long initialReplicationSyncTimeout;
private volatile boolean inSync = true; private volatile boolean inSync = true;
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@ -139,8 +141,10 @@ public final class ReplicationManager implements ActiveMQComponent {
*/ */
public ReplicationManager(CoreRemotingConnection remotingConnection, public ReplicationManager(CoreRemotingConnection remotingConnection,
final long timeout, final long timeout,
final long initialReplicationSyncTimeout,
final ExecutorFactory executorFactory) { final ExecutorFactory executorFactory) {
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection; this.remotingConnection = remotingConnection;
this.replicationStream = executorFactory.getExecutor(); this.replicationStream = executorFactory.getExecutor();
@ -181,7 +185,7 @@ public final class ReplicationManager implements ActiveMQComponent {
boolean sync, boolean sync,
final boolean lineUp) throws Exception { final boolean lineUp) throws Exception {
if (enabled) { if (enabled) {
sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true); sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
} }
} }
@ -342,10 +346,10 @@ public final class ReplicationManager implements ActiveMQComponent {
} }
private OperationContext sendReplicatePacket(final Packet packet) { private OperationContext sendReplicatePacket(final Packet packet) {
return sendReplicatePacket(packet, true, true); return sendReplicatePacket(packet, true);
} }
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) { private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled) if (!enabled)
return null; return null;
boolean runItNow = false; boolean runItNow = false;
@ -356,7 +360,6 @@ public final class ReplicationManager implements ActiveMQComponent {
} }
if (enabled) { if (enabled) {
if (useExecutor) {
replicationStream.execute(() -> { replicationStream.execute(() -> {
if (enabled) { if (enabled) {
pendingTokens.add(repliToken); pendingTokens.add(repliToken);
@ -364,14 +367,10 @@ public final class ReplicationManager implements ActiveMQComponent {
replicatingChannel.send(packet); replicatingChannel.send(packet);
} }
}); });
} else {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
} else { } else {
// Already replicating channel failed, so just play the action now // Already replicating channel failed, so just play the action now
runItNow = true; runItNow = true;
packet.release();
} }
// Execute outside lock // Execute outside lock
@ -399,7 +398,6 @@ public final class ReplicationManager implements ActiveMQComponent {
} }
} }
return flowWorked; return flowWorked;
} }
@ -514,6 +512,24 @@ public final class ReplicationManager implements ActiveMQComponent {
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
} }
private class FlushAction implements Runnable {
ReusableLatch latch = new ReusableLatch(1);
public void reset() {
latch.setCount(1);
}
public boolean await(long timeout, TimeUnit unit) throws Exception {
return latch.await(timeout, unit);
}
@Override
public void run() {
latch.countDown();
}
}
/** /**
* Sends large files in reasonably sized chunks to the backup during replication synchronization. * Sends large files in reasonably sized chunks to the backup during replication synchronization.
* *
@ -535,15 +551,20 @@ public final class ReplicationManager implements ActiveMQComponent {
file.open(); file.open();
} }
int size = 32 * 1024; int size = 32 * 1024;
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
int flowControlSize = 10;
int packetsSent = 0;
FlushAction action = new FlushAction();
try { try {
try (FileInputStream fis = new FileInputStream(file.getJavaFile()); try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
FileChannel channel = fis.getChannel()) {
// We can afford having a single buffer here for this entire loop // We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer // because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
while (true) { while (true) {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
buffer.clear(); buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer); final int bytesRead = channel.read(byteBuffer);
@ -561,18 +582,31 @@ public final class ReplicationManager implements ActiveMQComponent {
// We cannot simply send everything of a file through the executor, // We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory. // otherwise we would run out of memory.
// so we don't use the executor here // so we don't use the executor here
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false); sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
packetsSent++;
if (packetsSent % flowControlSize == 0) {
flushReplicationStream(action);
}
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break; break;
} }
} }
flushReplicationStream(action);
} finally { } finally {
buffer.release();
if (file.isOpen()) if (file.isOpen())
file.close(); file.close();
} }
} }
private void flushReplicationStream(FlushAction action) throws Exception {
action.reset();
replicationStream.execute(action);
if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
}
}
/** /**
* Reserve the following fileIDs in the backup server. * Reserve the following fileIDs in the backup server.
* *

View File

@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
ReplicationFailureListener listener = new ReplicationFailureListener(); ReplicationFailureListener listener = new ReplicationFailureListener();
rc.addCloseListener(listener); rc.addCloseListener(listener);
rc.addFailureListener(listener); rc.addFailureListener(listener);
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
replicationManager.start(); replicationManager.start();
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@ -96,7 +97,12 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override @Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
System.out.println("incoming"); System.out.println("incoming");
if (packet.getClass() == MqttPublishMessage.class) {
return checkMessageProperties(packet, expectedProperties); return checkMessageProperties(packet, expectedProperties);
} else {
return true;
}
} }
}; };
@ -104,7 +110,11 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override @Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
System.out.println("outgoing"); System.out.println("outgoing");
if (packet.getClass() == MqttPublishMessage.class) {
return checkMessageProperties(packet, expectedProperties); return checkMessageProperties(packet, expectedProperties);
} else {
return true;
}
} }
}; };
server.getRemotingService().addIncomingInterceptor(incomingInterceptor); server.getRemotingService().addIncomingInterceptor(incomingInterceptor);

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -369,7 +370,9 @@ public class MQTTTestSupport extends ActiveMQTestBase {
@Override @Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getClass() == MqttPublishMessage.class) {
messageCount++; messageCount++;
}
return true; return true;
} }
@ -388,7 +391,9 @@ public class MQTTTestSupport extends ActiveMQTestBase {
@Override @Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getClass() == MqttPublishMessage.class) {
messageCount++; messageCount++;
}
return true; return true;
} }

View File

@ -190,7 +190,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
setupServer(false); setupServer(false);
try { try {
ClientSessionFactory sf = createSessionFactory(locator); ClientSessionFactory sf = createSessionFactory(locator);
manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
addActiveMQComponent(manager); addActiveMQComponent(manager);
manager.start(); manager.start();
Assert.fail("Exception was expected"); Assert.fail("Exception was expected");