ARTEMIS-1089 Fixing Replication catchup slow

This commit is contained in:
Clebert Suconic 2017-04-02 19:20:42 -04:00 committed by Martyn Taylor
parent b819026dfc
commit 7929fff893
13 changed files with 140 additions and 65 deletions

View File

@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput {
*/ */
void writeBytes(ByteBuffer src); void writeBytes(ByteBuffer src);
/**
* Transfers the specified source buffer's data to this buffer starting at
* the current {@code writerIndex} until the source buffer's position
* reaches its limit, and increases the {@code writerIndex} by the
* number of the transferred bytes.
*
* @param src The source buffer
* @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
* {@code this.writableBytes}
*/
void writeBytes(ByteBuf src, int srcIndex, int length);
/** /**
* Returns a copy of this buffer's readable bytes. Modifying the content * Returns a copy of this buffer's readable bytes. Modifying the content
* of the returned buffer or this buffer does not affect each other at all. * of the returned buffer or this buffer does not affect each other at all.

View File

@ -575,6 +575,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
buffer.writeBytes(src); buffer.writeBytes(src);
} }
@Override
public void writeBytes(ByteBuf src, int srcIndex, int length) {
buffer.writeBytes(src, srcIndex, length);
}
@Override @Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
buffer.writeBytes(src.byteBuf(), srcIndex, length); buffer.writeBytes(src.byteBuf(), srcIndex, length);

View File

@ -263,6 +263,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
super.writeBytes(src); super.writeBytes(src);
} }
@Override
public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
changed();
super.writeBytes(src, srcIndex, length);
}
@Override @Override
public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
changed(); changed();

View File

@ -512,6 +512,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
} }
@Override
public void writeBytes(ByteBuf src, int srcIndex, int length) {
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
}
@Override @Override
public ByteBuffer toByteBuffer() { public ByteBuffer toByteBuffer() {
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);

View File

@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
} }
/**
* Transfers the specified source buffer's data to this buffer starting at
* the current {@code writerIndex} until the source buffer's position
* reaches its limit, and increases the {@code writerIndex} by the
* number of the transferred bytes.
*
* @param src The source buffer
* @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
* {@code this.writableBytes}
*/
@Override
public void writeBytes(ByteBuf src, int srcIndex, int length) {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
}
public int writeBytes(final InputStream in, final int length) throws IOException { public int writeBytes(final InputStream in, final int length) throws IOException {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
} }

View File

@ -718,6 +718,11 @@ public class TestConversions extends Assert {
} }
@Override
public void writeBytes(ByteBuf src, int srcIndex, int length) {
}
@Override @Override
public void readFully(byte[] b) throws IOException { public void readFully(byte[] b) throws IOException {
} }

View File

@ -332,6 +332,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
@Override @Override
public void resumeCleanup() { public void resumeCleanup() {
this.cleanupEnabled = true; this.cleanupEnabled = true;
scheduleCleanup();
} }
@Override @Override

View File

@ -1093,30 +1093,31 @@ public class PagingStoreImpl implements PagingStore {
@Override @Override
public Collection<Integer> getCurrentIds() throws Exception { public Collection<Integer> getCurrentIds() throws Exception {
List<Integer> ids = new ArrayList<>();
if (fileFactory != null) {
for (String fileName : fileFactory.listFiles("page")) {
ids.add(getPageIdFromFileName(fileName));
}
}
return ids;
}
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
for (Integer id : pageIds) { List<Integer> ids = new ArrayList<>();
SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id)); if (fileFactory != null) {
if (!sFile.exists()) { for (String fileName : fileFactory.listFiles("page")) {
continue; ids.add(getPageIdFromFileName(fileName));
} }
replicator.syncPages(sFile, id, getAddress());
} }
return ids;
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
for (Integer id : pageIds) {
SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
if (!sFile.exists()) {
continue;
}
ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size());
replicator.syncPages(sFile, id, getAddress());
}
}
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------
} }

View File

@ -587,10 +587,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
stopReplication(); stopReplication();
throw e; throw e;
} finally { } finally {
pagingManager.resumeCleanup();
// Re-enable compact and reclaim of journal files // Re-enable compact and reclaim of journal files
originalBindingsJournal.replicationSyncFinished(); originalBindingsJournal.replicationSyncFinished();
originalMessageJournal.replicationSyncFinished(); originalMessageJournal.replicationSyncFinished();
pagingManager.resumeCleanup();
} }
} }

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set; import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
@ -42,7 +42,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
*/ */
private long fileId; private long fileId;
private int dataSize; private int dataSize;
private ByteBuffer byteBuffer; private ByteBuf byteBuffer;
private byte[] byteArray; private byte[] byteArray;
private SimpleString pageStoreName; private SimpleString pageStoreName;
private FileType fileType; private FileType fileType;
@ -78,7 +78,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
SimpleString storeName, SimpleString storeName,
long id, long id,
int size, int size,
ByteBuffer buffer) { ByteBuf buffer) {
this(); this();
this.byteBuffer = buffer; this.byteBuffer = buffer;
this.pageStoreName = storeName; this.pageStoreName = storeName;
@ -124,7 +124,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
* (which might receive appends) * (which might receive appends)
*/ */
if (dataSize > 0) { if (dataSize > 0) {
buffer.writeBytes(byteBuffer); buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
if (byteBuffer != null) {
byteBuffer.release();
byteBuffer = null;
} }
} }

View File

@ -410,7 +410,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (!channel1.isOpen()) { if (!channel1.isOpen()) {
channel1.open(); channel1.open();
} }
channel1.writeDirect(ByteBuffer.wrap(data), true); channel1.writeDirect(ByteBuffer.wrap(data), false);
} }
/** /**

View File

@ -25,8 +25,11 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -121,6 +124,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
private final ExecutorFactory executorFactory; private final ExecutorFactory executorFactory;
private final Executor replicationStream;
private SessionFailureListener failureListener; private SessionFailureListener failureListener;
private CoreRemotingConnection remotingConnection; private CoreRemotingConnection remotingConnection;
@ -140,6 +145,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection; this.remotingConnection = remotingConnection;
this.replicationStream = executorFactory.getExecutor();
this.timeout = timeout; this.timeout = timeout;
} }
@ -175,7 +181,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
boolean sync, boolean sync,
final boolean lineUp) throws Exception { final boolean lineUp) throws Exception {
if (enabled) { if (enabled) {
sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp); sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
} }
} }
@ -340,15 +346,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} }
private OperationContext sendReplicatePacket(final Packet packet) { private OperationContext sendReplicatePacket(final Packet packet) {
return sendReplicatePacket(packet, true); return sendReplicatePacket(packet, true, true);
} }
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
if (!enabled) if (!enabled)
return null; return null;
boolean runItNow = false; boolean runItNow = false;
OperationContext repliToken = OperationContextImpl.getContext(executorFactory); final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
if (lineUp) { if (lineUp) {
repliToken.replicationLineUp(); repliToken.replicationLineUp();
} }
@ -356,10 +362,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
synchronized (replicationLock) { synchronized (replicationLock) {
if (enabled) { if (enabled) {
pendingTokens.add(repliToken); pendingTokens.add(repliToken);
if (!flowControl()) { if (useExecutor) {
return repliToken; replicationStream.execute(() -> {
if (enabled) {
flowControl();
replicatingChannel.send(packet);
}
});
} else {
flowControl();
replicatingChannel.send(packet);
} }
replicatingChannel.send(packet);
} else { } else {
// Already replicating channel failed, so just play the action now // Already replicating channel failed, so just play the action now
runItNow = true; runItNow = true;
@ -380,33 +393,35 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
* In case you refactor this in any way, this method must hold a lock on replication lock. . * In case you refactor this in any way, this method must hold a lock on replication lock. .
*/ */
private boolean flowControl() { private boolean flowControl() {
// synchronized (replicationLock) { -- I'm not adding this because the caller already has it synchronized (replicationLock) {
// future maintainers of this code please be aware that the intention here is hold the lock on replication lock // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
if (!replicatingChannel.getConnection().isWritable(this)) { // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
try { if (!replicatingChannel.getConnection().isWritable(this)) {
logger.trace("flowControl waiting on writable"); try {
writable.set(false); logger.trace("flowControl waiting on writable replication");
//don't wait for ever as this may hang tests etc, we've probably been closed anyway writable.set(false);
long now = System.currentTimeMillis(); //don't wait for ever as this may hang tests etc, we've probably been closed anyway
long deadline = now + timeout; long now = System.currentTimeMillis();
while (!writable.get() && now < deadline) { long deadline = now + timeout;
replicationLock.wait(deadline - now); while (!writable.get() && now < deadline) {
now = System.currentTimeMillis(); replicationLock.wait(deadline - now);
} now = System.currentTimeMillis();
logger.trace("flow control done");
if (!writable.get()) {
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
try {
stop();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} }
return false; logger.trace("flow control done on replication");
if (!writable.get()) {
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
try {
stop();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return false;
}
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} }
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} }
} }
return true; return true;
@ -512,7 +527,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} }
SequentialFile file = jf.getFile().cloneFile(); SequentialFile file = jf.getFile().cloneFile();
try { try {
ActiveMQServerLogger.LOGGER.journalSynch(jf, file.size(), file); ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE); sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
} finally { } finally {
if (file.isOpen()) if (file.isOpen())
@ -557,10 +572,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
// We can afford having a single buffer here for this entire loop // We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer // because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024 int size = 1 << 17;
while (true) { while (true) {
buffer.clear(); final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
final int bytesRead = channel.read(buffer); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer);
int toSend = bytesRead; int toSend = bytesRead;
if (bytesRead > 0) { if (bytesRead > 0) {
if (bytesRead >= maxBytesToSend) { if (bytesRead >= maxBytesToSend) {
@ -569,12 +585,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} else { } else {
maxBytesToSend = maxBytesToSend - bytesRead; maxBytesToSend = maxBytesToSend - bytesRead;
} }
buffer.limit(toSend);
} }
buffer.rewind(); logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
// sending -1 or 0 bytes will close the file at the backup // sending -1 or 0 bytes will close the file at the backup
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer)); // We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory.
// so we don't use the executor here
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break; break;
} }

View File

@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -189,8 +188,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void backupServerSynched(ActiveMQServerImpl server); void backupServerSynched(ActiveMQServerImpl server);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 221025, value = "Replication: sending {0} (size={1}) to backup. {2}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 221025, value = "Replication: sending {0} (size={1}) to replica.", format = Message.Format.MESSAGE_FORMAT)
void journalSynch(JournalFile jf, Long size, SequentialFile file); void replicaSyncFile(SequentialFile jf, Long size);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message( @Message(