From 85b93f0883bc06a2dfe2de9d560805a59d626d38 Mon Sep 17 00:00:00 2001 From: yang wei Date: Mon, 13 May 2019 12:32:58 +0800 Subject: [PATCH] ARTEMIS-2336 Use zero copy to replicate journal/page/large message file --- .../artemis/core/protocol/core/Channel.java | 20 +++ .../core/protocol/core/impl/ChannelImpl.java | 138 +++++++++----- .../core/protocol/core/impl/PacketImpl.java | 11 +- .../remoting/impl/netty/NettyConnection.java | 41 +++++ .../netty/NonClosingDefaultFileRegion.java | 38 ++++ .../artemis/spi/core/remoting/Connection.java | 4 + .../protocol/core/impl/ChannelImplTest.java | 11 ++ .../ReplicationSyncFileMessage.java | 170 ++++++++++-------- .../remoting/impl/invm/InVMConnection.java | 24 +++ .../core/replication/ReplicationManager.java | 105 +++++++---- .../ReplicationSyncFileMessageTest.java | 85 +++++++++ .../cluster/util/BackupSyncDelay.java | 7 + .../impl/netty/NettyConnectionTest.java | 28 +++ 13 files changed, 523 insertions(+), 159 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 56f825959f..e541dad8a4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.core; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -66,6 +68,20 @@ public interface Channel { */ boolean send(Packet packet); + /** + * Sends a packet and file on this channel. + * + * @param packet the packet to send + * @param raf the file to send + * @param fileChannel the file channel retrieved from raf + * @param offset the position of the raf + * @param dataSize the data size to send + * @param callback callback after send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback); + /** * Sends a packet on this channel. * @@ -247,4 +263,8 @@ public interface Channel { * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); + + interface Callback { + void done(boolean success); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index fe876edd7e..9f36d81935 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -25,6 +27,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -274,6 +277,60 @@ public final class ChannelImpl implements Channel { } } + private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) { + packet.setChannelID(id); + + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); + } + + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); + } + + ActiveMQBuffer buffer = packet.encode(connection); + + lock.lock(); + + try { + if (failingOver) { + waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); + } + + // Sanity check + if (transferring) { + throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); + } + + if (resendCache != null && packet.isRequiresConfirmations()) { + addResendPacket(packet); + } + + } finally { + lock.unlock(); + } + + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); + } + + checkReconnectID(reconnectID); + + //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, + //As the send could block if the response cache cannot add, preventing responses to be handled. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + while (!responseAsyncCache.add(packet)) { + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore + } + } + } + + return buffer; + } + // This must never called by more than one thread concurrently private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { if (invokeInterceptors(packet, interceptors, connection) != null) { @@ -281,55 +338,7 @@ public final class ChannelImpl implements Channel { } synchronized (sendLock) { - packet.setChannelID(id); - - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); - } - - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); - } - - ActiveMQBuffer buffer = packet.encode(connection); - - lock.lock(); - - try { - if (failingOver) { - waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); - } - - // Sanity check - if (transferring) { - throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); - } - - if (resendCache != null && packet.isRequiresConfirmations()) { - addResendPacket(packet); - } - - } finally { - lock.unlock(); - } - - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); - } - - checkReconnectID(reconnectID); - - //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, - //As the send could block if the response cache cannot add, preventing responses to be handled. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - while (!responseAsyncCache.add(packet)) { - try { - Thread.sleep(1); - } catch (Exception e) { - // Ignore - } - } - } + ActiveMQBuffer buffer = beforeSend(packet, reconnectID); // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover @@ -347,6 +356,37 @@ public final class ChannelImpl implements Channel { } } + @Override + public boolean send(Packet packet, + RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + Callback callback) { + if (invokeInterceptors(packet, interceptors, connection) != null) { + return false; + } + + synchronized (sendLock) { + ActiveMQBuffer buffer = beforeSend(packet, -1); + + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp + // buffer is full, preventing any incoming buffers being handled and blocking failover + try { + connection.getTransportConnection().write(buffer); + connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); + } catch (Throwable t) { + //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. + //The client would get still know about this as the exception bubbles up the call stack instead. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + responseAsyncCache.remove(packet.getCorrelationID()); + } + throw t; + } + return true; + } + } + private void checkReconnectID(int reconnectID) { if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) { throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index f8f85e8a42..a7a32539ff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -336,7 +336,11 @@ public class PacketImpl implements Packet { } protected void encodeSize(ActiveMQBuffer buffer) { - size = buffer.writerIndex(); + encodeSize(buffer, buffer.writerIndex()); + } + + protected void encodeSize(ActiveMQBuffer buffer, int size) { + this.size = size; // The length doesn't include the actual length byte int len = size - DataConstants.SIZE_INT; @@ -345,9 +349,10 @@ public class PacketImpl implements Packet { } protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) { + return createPacket(connection, expectedEncodeSize()); + } - int size = expectedEncodeSize(); - + protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) { if (connection == null) { return new ChannelBufferWrapper(Unpooled.buffer(size)); } else { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 51330c727b..497448e547 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; +import java.io.IOException; +import java.io.RandomAccessFile; import java.net.SocketAddress; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +32,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -350,6 +355,18 @@ public class NettyConnection implements Connection { return canWrite; } + private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { + if (channel.pipeline().get(SslHandler.class) == null) { + return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); + } else { + try { + return new ChunkedFile(raf, offset, dataSize, 8192); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + @Override public final void write(ActiveMQBuffer buffer, final boolean flush, @@ -390,6 +407,30 @@ public class NettyConnection implements Connection { } } + @Override + public void write(RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + final ChannelFutureListener futureListener) { + final int readableBytes = dataSize; + if (logger.isDebugEnabled()) { + final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; + if (remainingBytes < 0) { + logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); + } + } + + //no need to lock because the Netty's channel is thread-safe + //and the order of write is ensured by the order of the write calls + final Channel channel = this.channel; + assert readableBytes >= 0; + ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize)); + if (futureListener != null) { + channelFuture.addListener(futureListener); + } + } + private static void flushAndWait(final Channel channel, final ChannelPromise promise) { if (!channel.eventLoop().inEventLoop()) { waitFor(promise, DEFAULT_WAIT_MILLIS); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java new file mode 100644 index 0000000000..4fc367fa2b --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import java.io.File; +import java.nio.channels.FileChannel; + +import io.netty.channel.DefaultFileRegion; + +public class NonClosingDefaultFileRegion extends DefaultFileRegion { + + public NonClosingDefaultFileRegion(FileChannel file, long position, long count) { + super(file, position, count); + } + + public NonClosingDefaultFileRegion(File f, long position, long count) { + super(f, position, count); + } + + @Override + protected void deallocate() { + // Overridden to avoid closing the file + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index ebde456034..fe5d3950f1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelFutureListener; @@ -101,6 +103,8 @@ public interface Connection { */ void write(ActiveMQBuffer buffer); + void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener); + /** * This should close the internal channel without calling any listeners. * This is to avoid a situation where the broker is busy writing on an internal thread. diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index 9908d974d2..e9181f8025 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import javax.security.auth.Subject; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -386,6 +388,15 @@ public class ChannelImplTest { } + @Override + public void write(RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + ChannelFutureListener channelFutureListener) { + + } + @Override public void forceClose() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index b81782bcd0..5a30c64bbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -16,22 +16,30 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.EnumSet; +import java.util.Objects; import java.util.Set; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.utils.DataConstants; +import org.jboss.logging.Logger; /** * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls * which extra information is sent. */ public final class ReplicationSyncFileMessage extends PacketImpl { + private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class); /** * The JournalType or {@code null} if sync'ing large-messages. @@ -43,10 +51,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl { */ private long fileId; private int dataSize; - private ByteBuf byteBuffer; private byte[] byteArray; private SimpleString pageStoreName; private FileType fileType; + private RandomAccessFile raf; + private FileChannel fileChannel; + private long offset; public enum FileType { JOURNAL(0), PAGE(1), LARGE_MESSAGE(2); @@ -78,14 +88,18 @@ public final class ReplicationSyncFileMessage extends PacketImpl { public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content, SimpleString storeName, long id, - int size, - ByteBuf buffer) { + RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int size) { this(); - this.byteBuffer = buffer; this.pageStoreName = storeName; this.dataSize = size; this.fileId = id; + this.raf = raf; + this.fileChannel = fileChannel; this.journalType = content; + this.offset = offset; determineType(); } @@ -99,10 +113,30 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } } + public long getFileId() { + return fileId; + } + + public int getDataSize() { + return dataSize; + } + + public RandomAccessFile getRaf() { + return raf; + } + + public FileChannel getFileChannel() { + return fileChannel; + } + + public long getOffset() { + return offset; + } + @Override public int expectedEncodeSize() { int size = PACKET_HEADERS_SIZE + - DataConstants.SIZE_LONG; // buffer.writeLong(fileId); + DataConstants.SIZE_LONG; // buffer.writeLong(fileId); if (fileId == -1) return size; @@ -125,7 +159,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl { size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize); if (dataSize > 0) { - size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); + size += dataSize; } return size; @@ -150,30 +184,55 @@ public final class ReplicationSyncFileMessage extends PacketImpl { default: // no-op } - buffer.writeInt(dataSize); - /* - * sending -1 will close the file in case of a journal, but not in case of a largeMessage - * (which might receive appends) - */ - if (dataSize > 0) { - buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); - } + } - release(); + @Override + public ActiveMQBuffer encode(CoreRemotingConnection connection) { + if (fileId != -1 && dataSize > 0) { + ActiveMQBuffer buffer; + int bufferSize = expectedEncodeSize(); + int encodedSize = bufferSize; + boolean isNetty = false; + if (connection != null && connection.getTransportConnection() instanceof NettyConnection) { + bufferSize -= dataSize; + isNetty = true; + } + buffer = createPacket(connection, bufferSize); + encodeHeader(buffer); + encodeRest(buffer, connection); + if (!isNetty) { + ByteBuffer byteBuffer; + if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) { + byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes()); + } else { + byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes()); + } + readFile(byteBuffer); + buffer.writerIndex(buffer.capacity()); + } + encodeSize(buffer, encodedSize); + return buffer; + } else { + return super.encode(connection); + } } @Override public void release() { - if (byteBuffer != null) { - byteBuffer.release(); - byteBuffer = null; + if (raf != null) { + try { + raf.close(); + } catch (IOException e) { + logger.error("Close file " + this + " failed", e); + } } } @Override public void decodeRest(final ActiveMQBuffer buffer) { fileId = buffer.readLong(); + if (fileId == -1) return; switch (FileType.getFileType(buffer.readByte())) { case JOURNAL: { journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte()); @@ -197,6 +256,14 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } } + private void readFile(ByteBuffer buffer) { + try { + fileChannel.read(buffer, offset); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public long getId() { return fileId; } @@ -218,61 +285,22 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + Arrays.hashCode(byteArray); - result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode()); - result = prime * result + dataSize; - result = prime * result + (int) (fileId ^ (fileId >>> 32)); - result = prime * result + ((fileType == null) ? 0 : fileType.hashCode()); - result = prime * result + ((journalType == null) ? 0 : journalType.hashCode()); - result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode()); - return result; + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o; + return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel); } @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!super.equals(obj)) { - return false; - } - if (!(obj instanceof ReplicationSyncFileMessage)) { - return false; - } - ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj; - if (!Arrays.equals(byteArray, other.byteArray)) { - return false; - } - if (byteBuffer == null) { - if (other.byteBuffer != null) { - return false; - } - } else if (!byteBuffer.equals(other.byteBuffer)) { - return false; - } - if (dataSize != other.dataSize) { - return false; - } - if (fileId != other.fileId) { - return false; - } - if (fileType != other.fileType) { - return false; - } - if (journalType != other.journalType) { - return false; - } - if (pageStoreName == null) { - if (other.pageStoreName != null) { - return false; - } - } else if (!pageStoreName.equals(other.pageStoreName)) { - return false; - } - return true; + public int hashCode() { + int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset); + result = 31 * result + Arrays.hashCode(byteArray); + return result; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index b2fc576db3..02f1c84acf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.remoting.impl.invm; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -242,6 +244,28 @@ public class InVMConnection implements Connection { } + @Override + public void write(RandomAccessFile raf, + FileChannel fileChannel, + long offset, + int dataSize, + final ChannelFutureListener futureListener) { + if (futureListener == null) { + return; + } + try { + executor.execute(() -> { + try { + futureListener.operationComplete(null); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + } catch (RejectedExecutionException e) { + + } + } + @Override public String getRemoteAddress() { return "invm:" + serverID; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 1d1217d616..d48a5a01c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -16,8 +16,7 @@ */ package org.apache.activemq.artemis.core.replication; -import java.io.FileInputStream; -import java.nio.ByteBuffer; +import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -28,8 +27,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -392,6 +389,39 @@ public final class ReplicationManager implements ActiveMQComponent { return repliToken; } + private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) { + if (!enabled) { + syncFileMessage.release(); + return null; + } + + final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory); + repliToken.replicationLineUp(); + + replicationStream.execute(() -> { + if (enabled) { + try { + pendingTokens.add(repliToken); + flowControl(syncFileMessage.expectedEncodeSize()); + if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { + replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(), + syncFileMessage.getOffset(), syncFileMessage.getDataSize(), + lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); + } else { + replicatingChannel.send(syncFileMessage); + } + } catch (Exception e) { + syncFileMessage.release(); + } + } else { + syncFileMessage.release(); + repliToken.replicationDone(); + } + }); + + return repliToken; + } + /** * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . @@ -560,49 +590,52 @@ public final class ReplicationManager implements ActiveMQComponent { if (!file.isOpen()) { file.open(); } - int size = 32 * 1024; + final int size = 1024 * 1024; + long fileSize = file.size(); int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); + long offset = 0; + RandomAccessFile raf = null; + FileChannel fileChannel = null; try { - try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - - // We can afford having a single buffer here for this entire loop - // because sendReplicatePacket will encode the packet as a NettyBuffer - // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy - while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); - buffer.clear(); - ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); - final int bytesRead = channel.read(byteBuffer); - int toSend = bytesRead; - if (bytesRead > 0) { - if (bytesRead >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - bytesRead; - } + raf = new RandomAccessFile(file.getJavaFile(), "r"); + fileChannel = raf.getChannel(); + while (true) { + long chunkSize = Math.min(size, fileSize - offset); + int toSend = (int) chunkSize; + if (chunkSize > 0) { + if (chunkSize >= maxBytesToSend) { + toSend = (int) maxBytesToSend; + maxBytesToSend = 0; + } else { + maxBytesToSend = maxBytesToSend - chunkSize; } - logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); - packetsSent++; - - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); - } - if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) - break; } + logger.debug("sending " + toSend + " bytes on file " + file.getFileName()); + // sending -1 or 0 bytes will close the file at the backup + // We cannot simply send everything of a file through the executor, + // otherwise we would run out of memory. + // so we don't use the executor here + sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize); + packetsSent++; + offset += toSend; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } + if (toSend == 0 || maxBytesToSend == 0) + break; } flushReplicationStream(action); + + } catch (Exception e) { + if (raf != null) + raf.close(); + throw e; } finally { if (file.isOpen()) file.close(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java new file mode 100644 index 0000000000..812de2c716 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.HashMap; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.DataConstants; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES; + +public class ReplicationSyncFileMessageTest extends ActiveMQTestBase { + @Test + public void testNettyConnectionEncodeMessage() throws Exception { + int dataSize = 10; + NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false); + + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, + null, 10, raf, fileChannel, 0, dataSize); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null); + ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); + Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); + Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize); + file.close(); + } + + + @Test + public void testInVMConnectionEncodeMessage() throws Exception { + int fileId = 10; + InVMConnection conn = new InVMConnection(0, null, null, null); + + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, + null, fileId, raf, fileChannel, 0, 0); + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null); + ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); + Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); + Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize()); + + Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE); + + ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage(); + decodedReplicationSyncFileMessage.decode(buffer); + Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES); + Assert.assertNull(decodedReplicationSyncFileMessage.getData()); + file.close(); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index c7ed8699c1..c55764a096 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.util; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -211,6 +213,11 @@ public class BackupSyncDelay implements Interceptor { return true; } + @Override + public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) { + return true; + } + @Override public boolean sendBatched(Packet packet) { throw new UnsupportedOperationException(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index c9c975cf85..23ae5f9a7c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,6 +31,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -77,6 +82,29 @@ public class NettyConnectionTest extends ActiveMQTestBase { } + @Test + public void testWritePacketAndFile() throws Exception { + EmbeddedChannel channel = createChannel(); + NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false); + + final int size = 1234; + + ActiveMQBuffer buff = conn.createTransportBuffer(size); + buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); + FileChannel fileChannel = raf.getChannel(); + + conn.write(buff); + conn.write(raf, fileChannel, 0, size, future -> raf.close()); + channel.runPendingTasks(); + Assert.assertEquals(2, channel.outboundMessages().size()); + Assert.assertFalse(fileChannel.isOpen()); + file.close(); + } + @Test(expected = IllegalStateException.class) public void throwsExceptionOnBlockUntilWritableIfClosed() { EmbeddedChannel channel = createChannel();