From 7929fff893e1510390ae8e6a7924aa0e4f3864c0 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Sun, 2 Apr 2017 19:20:42 -0400 Subject: [PATCH] ARTEMIS-1089 Fixing Replication catchup slow --- .../artemis/api/core/ActiveMQBuffer.java | 13 +++ .../buffers/impl/ChannelBufferWrapper.java | 5 + .../impl/ResetLimitWrappedActiveMQBuffer.java | 8 ++ .../CompressedLargeMessageControllerImpl.java | 6 ++ .../impl/LargeMessageControllerImpl.java | 15 +++ .../amqp/converter/TestConversions.java | 5 + .../cursor/impl/PageCursorProviderImpl.java | 1 + .../core/paging/impl/PagingStoreImpl.java | 33 ++++--- .../impl/journal/JournalStorageManager.java | 2 +- .../ReplicationSyncFileMessage.java | 13 ++- .../core/replication/ReplicationEndpoint.java | 2 +- .../core/replication/ReplicationManager.java | 97 +++++++++++-------- .../core/server/ActiveMQServerLogger.java | 5 +- 13 files changed, 140 insertions(+), 65 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java index f30ef3509e..d753b8fb7a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java @@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput { */ void writeBytes(ByteBuffer src); + + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @param src The source buffer + * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than + * {@code this.writableBytes} + */ + void writeBytes(ByteBuf src, int srcIndex, int length); + /** * Returns a copy of this buffer's readable bytes. Modifying the content * of the returned buffer or this buffer does not affect each other at all. diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java index c75be21c63..496c146b1e 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java @@ -575,6 +575,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { buffer.writeBytes(src); } + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + buffer.writeBytes(src, srcIndex, length); + } + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { buffer.writeBytes(src.byteBuf(), srcIndex, length); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java index ec6cf092e1..d6cba00309 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java @@ -263,6 +263,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper super.writeBytes(src); } + + @Override + public void writeBytes(final ByteBuf src, final int srcIndex, final int length) { + changed(); + + super.writeBytes(src, srcIndex, length); + } + @Override public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) { changed(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 55f912903d..ce652d20f0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -512,6 +512,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); + } + + @Override public ByteBuffer toByteBuffer() { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index 951aea2f62..0bb5690f80 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + /** + * Transfers the specified source buffer's data to this buffer starting at + * the current {@code writerIndex} until the source buffer's position + * reaches its limit, and increases the {@code writerIndex} by the + * number of the transferred bytes. + * + * @param src The source buffer + * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than + * {@code this.writableBytes} + */ + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + public int writeBytes(final InputStream in, final int length) throws IOException { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index cbe2699bf4..e154cd254d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -718,6 +718,11 @@ public class TestConversions extends Assert { } + @Override + public void writeBytes(ByteBuf src, int srcIndex, int length) { + + } + @Override public void readFully(byte[] b) throws IOException { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 76ad26b255..701f86cd57 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -332,6 +332,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { @Override public void resumeCleanup() { this.cleanupEnabled = true; + scheduleCleanup(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 4e57c8520b..8cba9fe208 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1093,30 +1093,31 @@ public class PagingStoreImpl implements PagingStore { @Override public Collection getCurrentIds() throws Exception { - List ids = new ArrayList<>(); - if (fileFactory != null) { - for (String fileName : fileFactory.listFiles("page")) { - ids.add(getPageIdFromFileName(fileName)); - } - } - return ids; - } - - @Override - public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { lock.writeLock().lock(); try { - for (Integer id : pageIds) { - SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id)); - if (!sFile.exists()) { - continue; + List ids = new ArrayList<>(); + if (fileFactory != null) { + for (String fileName : fileFactory.listFiles("page")) { + ids.add(getPageIdFromFileName(fileName)); } - replicator.syncPages(sFile, id, getAddress()); } + return ids; } finally { lock.writeLock().unlock(); } } + @Override + public void sendPages(ReplicationManager replicator, Collection pageIds) throws Exception { + for (Integer id : pageIds) { + SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id)); + if (!sFile.exists()) { + continue; + } + ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size()); + replicator.syncPages(sFile, id, getAddress()); + } + } + // Inner classes ------------------------------------------------- } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 7c0a6510db..9c122b323e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -587,10 +587,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager { stopReplication(); throw e; } finally { - pagingManager.resumeCleanup(); // Re-enable compact and reclaim of journal files originalBindingsJournal.replicationSyncFinished(); originalMessageJournal.replicationSyncFinished(); + pagingManager.resumeCleanup(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index de7f73e0c7..90d2ca0b4a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.EnumSet; import java.util.Set; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; @@ -42,7 +42,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl { */ private long fileId; private int dataSize; - private ByteBuffer byteBuffer; + private ByteBuf byteBuffer; private byte[] byteArray; private SimpleString pageStoreName; private FileType fileType; @@ -78,7 +78,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl { SimpleString storeName, long id, int size, - ByteBuffer buffer) { + ByteBuf buffer) { this(); this.byteBuffer = buffer; this.pageStoreName = storeName; @@ -124,7 +124,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl { * (which might receive appends) */ if (dataSize > 0) { - buffer.writeBytes(byteBuffer); + buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); + } + + if (byteBuffer != null) { + byteBuffer.release(); + byteBuffer = null; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 1a07adc40f..e1879daaa3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -410,7 +410,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon if (!channel1.isOpen()) { channel1.open(); } - channel1.writeDirect(ByteBuffer.wrap(data), true); + channel1.writeDirect(ByteBuffer.wrap(data), false); } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d0468d19d2..7e0881cb26 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -25,8 +25,11 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -121,6 +124,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene private final ExecutorFactory executorFactory; + private final Executor replicationStream; + private SessionFailureListener failureListener; private CoreRemotingConnection remotingConnection; @@ -140,6 +145,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene this.executorFactory = executorFactory; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; + this.replicationStream = executorFactory.getExecutor(); this.timeout = timeout; } @@ -175,7 +181,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene boolean sync, final boolean lineUp) throws Exception { if (enabled) { - sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp); + sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true); } } @@ -340,15 +346,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } private OperationContext sendReplicatePacket(final Packet packet) { - return sendReplicatePacket(packet, true); + return sendReplicatePacket(packet, true, true); } - private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { + private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) { if (!enabled) return null; boolean runItNow = false; - OperationContext repliToken = OperationContextImpl.getContext(executorFactory); + final OperationContext repliToken = OperationContextImpl.getContext(executorFactory); if (lineUp) { repliToken.replicationLineUp(); } @@ -356,10 +362,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene synchronized (replicationLock) { if (enabled) { pendingTokens.add(repliToken); - if (!flowControl()) { - return repliToken; + if (useExecutor) { + replicationStream.execute(() -> { + if (enabled) { + flowControl(); + replicatingChannel.send(packet); + } + }); + } else { + flowControl(); + replicatingChannel.send(packet); } - replicatingChannel.send(packet); } else { // Already replicating channel failed, so just play the action now runItNow = true; @@ -380,33 +393,35 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene * In case you refactor this in any way, this method must hold a lock on replication lock. . */ private boolean flowControl() { - // synchronized (replicationLock) { -- I'm not adding this because the caller already has it - // future maintainers of this code please be aware that the intention here is hold the lock on replication lock - if (!replicatingChannel.getConnection().isWritable(this)) { - try { - logger.trace("flowControl waiting on writable"); - writable.set(false); - //don't wait for ever as this may hang tests etc, we've probably been closed anyway - long now = System.currentTimeMillis(); - long deadline = now + timeout; - while (!writable.get() && now < deadline) { - replicationLock.wait(deadline - now); - now = System.currentTimeMillis(); - } - logger.trace("flow control done"); - - if (!writable.get()) { - ActiveMQServerLogger.LOGGER.slowReplicationResponse(); - logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now); - try { - stop(); - } catch (Exception e) { - logger.warn(e.getMessage(), e); + synchronized (replicationLock) { + // synchronized (replicationLock) { -- I'm not adding this because the caller already has it + // future maintainers of this code please be aware that the intention here is hold the lock on replication lock + if (!replicatingChannel.getConnection().isWritable(this)) { + try { + logger.trace("flowControl waiting on writable replication"); + writable.set(false); + //don't wait for ever as this may hang tests etc, we've probably been closed anyway + long now = System.currentTimeMillis(); + long deadline = now + timeout; + while (!writable.get() && now < deadline) { + replicationLock.wait(deadline - now); + now = System.currentTimeMillis(); } - return false; + logger.trace("flow control done on replication"); + + if (!writable.get()) { + ActiveMQServerLogger.LOGGER.slowReplicationResponse(); + logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now); + try { + stop(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + return false; + } + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); } - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); } } return true; @@ -512,7 +527,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } SequentialFile file = jf.getFile().cloneFile(); try { - ActiveMQServerLogger.LOGGER.journalSynch(jf, file.size(), file); + ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size()); sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE); } finally { if (file.isOpen()) @@ -557,10 +572,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene // We can afford having a single buffer here for this entire loop // because sendReplicatePacket will encode the packet as a NettyBuffer // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy - final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024 + int size = 1 << 17; while (true) { - buffer.clear(); - final int bytesRead = channel.read(buffer); + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size); + ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); + final int bytesRead = channel.read(byteBuffer); int toSend = bytesRead; if (bytesRead > 0) { if (bytesRead >= maxBytesToSend) { @@ -569,12 +585,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } else { maxBytesToSend = maxBytesToSend - bytesRead; } - buffer.limit(toSend); } - buffer.rewind(); - + logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); // sending -1 or 0 bytes will close the file at the backup - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer)); + // We cannot simply send everything of a file through the executor, + // otherwise we would run out of memory. + // so we don't use the executor here + sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false); if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) break; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index cf904e1917..a25a7f6d4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -189,8 +188,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void backupServerSynched(ActiveMQServerImpl server); @LogMessage(level = Logger.Level.INFO) - @Message(id = 221025, value = "Replication: sending {0} (size={1}) to backup. {2}", format = Message.Format.MESSAGE_FORMAT) - void journalSynch(JournalFile jf, Long size, SequentialFile file); + @Message(id = 221025, value = "Replication: sending {0} (size={1}) to replica.", format = Message.Format.MESSAGE_FORMAT) + void replicaSyncFile(SequentialFile jf, Long size); @LogMessage(level = Logger.Level.INFO) @Message(