From 70c2200c54066eb6d9f1d0235aae5c3bbf5b7412 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 18 Sep 2019 11:55:23 -0400 Subject: [PATCH] ARTEMIS-2496 Revert catch up with zero-copy, as it's causing issues into some integration usage Revert "ARTEMIS-2336 Use zero copy to replicate journal/page/large message file" This reverts commit 85b93f0883bc06a2dfe2de9d560805a59d626d38. --- .../artemis/core/protocol/core/Channel.java | 20 --- .../core/protocol/core/impl/ChannelImpl.java | 138 +++++--------- .../core/protocol/core/impl/PacketImpl.java | 11 +- .../remoting/impl/netty/NettyConnection.java | 41 ----- .../netty/NonClosingDefaultFileRegion.java | 38 ---- .../artemis/spi/core/remoting/Connection.java | 4 - .../protocol/core/impl/ChannelImplTest.java | 11 -- .../ReplicationSyncFileMessage.java | 170 ++++++++---------- .../remoting/impl/invm/InVMConnection.java | 24 --- .../core/replication/ReplicationManager.java | 105 ++++------- .../ReplicationSyncFileMessageTest.java | 85 --------- .../cluster/util/BackupSyncDelay.java | 7 - .../impl/netty/NettyConnectionTest.java | 28 --- 13 files changed, 159 insertions(+), 523 deletions(-) delete mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java delete mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index e541dad8a4..56f825959f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.core; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -68,20 +66,6 @@ public interface Channel { */ boolean send(Packet packet); - /** - * Sends a packet and file on this channel. - * - * @param packet the packet to send - * @param raf the file to send - * @param fileChannel the file channel retrieved from raf - * @param offset the position of the raf - * @param dataSize the data size to send - * @param callback callback after send - * @return false if the packet was rejected by an outgoing interceptor; true if the send was - * successful - */ - boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback); - /** * Sends a packet on this channel. * @@ -263,8 +247,4 @@ public interface Channel { * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); - - interface Callback { - void done(boolean success); - } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index d69b1e1835..154ab8aa80 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -27,7 +25,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -277,60 +274,6 @@ public final class ChannelImpl implements Channel { } } - private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) { - packet.setChannelID(id); - - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); - } - - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); - } - - ActiveMQBuffer buffer = packet.encode(connection); - - lock.lock(); - - try { - if (failingOver) { - waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); - } - - // Sanity check - if (transferring) { - throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); - } - - if (resendCache != null && packet.isRequiresConfirmations()) { - addResendPacket(packet); - } - - } finally { - lock.unlock(); - } - - if (logger.isTraceEnabled()) { - logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); - } - - checkReconnectID(reconnectID); - - //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, - //As the send could block if the response cache cannot add, preventing responses to be handled. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - while (!responseAsyncCache.add(packet)) { - try { - Thread.sleep(1); - } catch (Exception e) { - // Ignore - } - } - } - - return buffer; - } - // This must never called by more than one thread concurrently private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { if (invokeInterceptors(packet, interceptors, connection) != null) { @@ -338,7 +281,55 @@ public final class ChannelImpl implements Channel { } synchronized (sendLock) { - ActiveMQBuffer buffer = beforeSend(packet, reconnectID); + packet.setChannelID(id); + + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); + } + + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); + } + + ActiveMQBuffer buffer = packet.encode(connection); + + lock.lock(); + + try { + if (failingOver) { + waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); + } + + // Sanity check + if (transferring) { + throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); + } + + if (resendCache != null && packet.isRequiresConfirmations()) { + addResendPacket(packet); + } + + } finally { + lock.unlock(); + } + + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); + } + + checkReconnectID(reconnectID); + + //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, + //As the send could block if the response cache cannot add, preventing responses to be handled. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + while (!responseAsyncCache.add(packet)) { + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore + } + } + } // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover @@ -356,37 +347,6 @@ public final class ChannelImpl implements Channel { } } - @Override - public boolean send(Packet packet, - RandomAccessFile raf, - FileChannel fileChannel, - long offset, - int dataSize, - Callback callback) { - if (invokeInterceptors(packet, interceptors, connection) != null) { - return false; - } - - synchronized (sendLock) { - ActiveMQBuffer buffer = beforeSend(packet, -1); - - // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp - // buffer is full, preventing any incoming buffers being handled and blocking failover - try { - connection.getTransportConnection().write(buffer); - connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess())); - } catch (Throwable t) { - //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. - //The client would get still know about this as the exception bubbles up the call stack instead. - if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { - responseAsyncCache.remove(packet.getCorrelationID()); - } - throw t; - } - return true; - } - } - private void checkReconnectID(int reconnectID) { if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) { throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index a7a32539ff..f8f85e8a42 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -336,11 +336,7 @@ public class PacketImpl implements Packet { } protected void encodeSize(ActiveMQBuffer buffer) { - encodeSize(buffer, buffer.writerIndex()); - } - - protected void encodeSize(ActiveMQBuffer buffer, int size) { - this.size = size; + size = buffer.writerIndex(); // The length doesn't include the actual length byte int len = size - DataConstants.SIZE_INT; @@ -349,10 +345,9 @@ public class PacketImpl implements Packet { } protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) { - return createPacket(connection, expectedEncodeSize()); - } - protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) { + int size = expectedEncodeSize(); + if (connection == null) { return new ChannelBufferWrapper(Unpooled.buffer(size)); } else { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 497448e547..51330c727b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -16,10 +16,7 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; -import java.io.IOException; -import java.io.RandomAccessFile; import java.net.SocketAddress; -import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -32,8 +29,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedFile; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -355,18 +350,6 @@ public class NettyConnection implements Connection { return canWrite; } - private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) { - if (channel.pipeline().get(SslHandler.class) == null) { - return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize); - } else { - try { - return new ChunkedFile(raf, offset, dataSize, 8192); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - @Override public final void write(ActiveMQBuffer buffer, final boolean flush, @@ -407,30 +390,6 @@ public class NettyConnection implements Connection { } } - @Override - public void write(RandomAccessFile raf, - FileChannel fileChannel, - long offset, - int dataSize, - final ChannelFutureListener futureListener) { - final int readableBytes = dataSize; - if (logger.isDebugEnabled()) { - final int remainingBytes = this.writeBufferHighWaterMark - readableBytes; - if (remainingBytes < 0) { - logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes"); - } - } - - //no need to lock because the Netty's channel is thread-safe - //and the order of write is ensured by the order of the write calls - final Channel channel = this.channel; - assert readableBytes >= 0; - ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize)); - if (futureListener != null) { - channelFuture.addListener(futureListener); - } - } - private static void flushAndWait(final Channel channel, final ChannelPromise promise) { if (!channel.eventLoop().inEventLoop()) { waitFor(promise, DEFAULT_WAIT_MILLIS); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java deleted file mode 100644 index 4fc367fa2b..0000000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.remoting.impl.netty; - -import java.io.File; -import java.nio.channels.FileChannel; - -import io.netty.channel.DefaultFileRegion; - -public class NonClosingDefaultFileRegion extends DefaultFileRegion { - - public NonClosingDefaultFileRegion(FileChannel file, long position, long count) { - super(file, position, count); - } - - public NonClosingDefaultFileRegion(File f, long position, long count) { - super(f, position, count); - } - - @Override - protected void deallocate() { - // Overridden to avoid closing the file - } -} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index fe5d3950f1..ebde456034 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.spi.core.remoting; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelFutureListener; @@ -103,8 +101,6 @@ public interface Connection { */ void write(ActiveMQBuffer buffer); - void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener); - /** * This should close the internal channel without calling any listeners. * This is to avoid a situation where the broker is busy writing on an internal thread. diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java index 4a4ca39512..7d3fb23939 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -17,8 +17,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import javax.security.auth.Subject; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -394,15 +392,6 @@ public class ChannelImplTest { } - @Override - public void write(RandomAccessFile raf, - FileChannel fileChannel, - long offset, - int dataSize, - ChannelFutureListener channelFutureListener) { - - } - @Override public void forceClose() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 5a30c64bbe..b81782bcd0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -16,30 +16,22 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.EnumSet; -import java.util.Objects; import java.util.Set; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; -import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.utils.DataConstants; -import org.jboss.logging.Logger; /** * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls * which extra information is sent. */ public final class ReplicationSyncFileMessage extends PacketImpl { - private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class); /** * The JournalType or {@code null} if sync'ing large-messages. @@ -51,12 +43,10 @@ public final class ReplicationSyncFileMessage extends PacketImpl { */ private long fileId; private int dataSize; + private ByteBuf byteBuffer; private byte[] byteArray; private SimpleString pageStoreName; private FileType fileType; - private RandomAccessFile raf; - private FileChannel fileChannel; - private long offset; public enum FileType { JOURNAL(0), PAGE(1), LARGE_MESSAGE(2); @@ -88,18 +78,14 @@ public final class ReplicationSyncFileMessage extends PacketImpl { public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content, SimpleString storeName, long id, - RandomAccessFile raf, - FileChannel fileChannel, - long offset, - int size) { + int size, + ByteBuf buffer) { this(); + this.byteBuffer = buffer; this.pageStoreName = storeName; this.dataSize = size; this.fileId = id; - this.raf = raf; - this.fileChannel = fileChannel; this.journalType = content; - this.offset = offset; determineType(); } @@ -113,30 +99,10 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } } - public long getFileId() { - return fileId; - } - - public int getDataSize() { - return dataSize; - } - - public RandomAccessFile getRaf() { - return raf; - } - - public FileChannel getFileChannel() { - return fileChannel; - } - - public long getOffset() { - return offset; - } - @Override public int expectedEncodeSize() { int size = PACKET_HEADERS_SIZE + - DataConstants.SIZE_LONG; // buffer.writeLong(fileId); + DataConstants.SIZE_LONG; // buffer.writeLong(fileId); if (fileId == -1) return size; @@ -159,7 +125,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl { size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize); if (dataSize > 0) { - size += dataSize; + size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } return size; @@ -184,55 +150,30 @@ public final class ReplicationSyncFileMessage extends PacketImpl { default: // no-op } - buffer.writeInt(dataSize); - } - @Override - public ActiveMQBuffer encode(CoreRemotingConnection connection) { - if (fileId != -1 && dataSize > 0) { - ActiveMQBuffer buffer; - int bufferSize = expectedEncodeSize(); - int encodedSize = bufferSize; - boolean isNetty = false; - if (connection != null && connection.getTransportConnection() instanceof NettyConnection) { - bufferSize -= dataSize; - isNetty = true; - } - buffer = createPacket(connection, bufferSize); - encodeHeader(buffer); - encodeRest(buffer, connection); - if (!isNetty) { - ByteBuffer byteBuffer; - if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) { - byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes()); - } else { - byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes()); - } - readFile(byteBuffer); - buffer.writerIndex(buffer.capacity()); - } - encodeSize(buffer, encodedSize); - return buffer; - } else { - return super.encode(connection); + buffer.writeInt(dataSize); + /* + * sending -1 will close the file in case of a journal, but not in case of a largeMessage + * (which might receive appends) + */ + if (dataSize > 0) { + buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } + + release(); } @Override public void release() { - if (raf != null) { - try { - raf.close(); - } catch (IOException e) { - logger.error("Close file " + this + " failed", e); - } + if (byteBuffer != null) { + byteBuffer.release(); + byteBuffer = null; } } @Override public void decodeRest(final ActiveMQBuffer buffer) { fileId = buffer.readLong(); - if (fileId == -1) return; switch (FileType.getFileType(buffer.readByte())) { case JOURNAL: { journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte()); @@ -256,14 +197,6 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } } - private void readFile(ByteBuffer buffer) { - try { - fileChannel.read(buffer, offset); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - public long getId() { return fileId; } @@ -285,22 +218,61 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - if (!super.equals(o)) - return false; - ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o; - return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel); + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Arrays.hashCode(byteArray); + result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode()); + result = prime * result + dataSize; + result = prime * result + (int) (fileId ^ (fileId >>> 32)); + result = prime * result + ((fileType == null) ? 0 : fileType.hashCode()); + result = prime * result + ((journalType == null) ? 0 : journalType.hashCode()); + result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode()); + return result; } @Override - public int hashCode() { - int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset); - result = 31 * result + Arrays.hashCode(byteArray); - return result; + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof ReplicationSyncFileMessage)) { + return false; + } + ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj; + if (!Arrays.equals(byteArray, other.byteArray)) { + return false; + } + if (byteBuffer == null) { + if (other.byteBuffer != null) { + return false; + } + } else if (!byteBuffer.equals(other.byteBuffer)) { + return false; + } + if (dataSize != other.dataSize) { + return false; + } + if (fileId != other.fileId) { + return false; + } + if (fileType != other.fileType) { + return false; + } + if (journalType != other.journalType) { + return false; + } + if (pageStoreName == null) { + if (other.pageStoreName != null) { + return false; + } + } else if (!pageStoreName.equals(other.pageStoreName)) { + return false; + } + return true; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 02f1c84acf..b2fc576db3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.core.remoting.impl.invm; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -244,28 +242,6 @@ public class InVMConnection implements Connection { } - @Override - public void write(RandomAccessFile raf, - FileChannel fileChannel, - long offset, - int dataSize, - final ChannelFutureListener futureListener) { - if (futureListener == null) { - return; - } - try { - executor.execute(() -> { - try { - futureListener.operationComplete(null); - } catch (Exception e) { - throw new IllegalStateException(e); - } - }); - } catch (RejectedExecutionException e) { - - } - } - @Override public String getRemoteAddress() { return "invm:" + serverID; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d48a5a01c8..1d1217d616 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -16,7 +16,8 @@ */ package org.apache.activemq.artemis.core.replication; -import java.io.RandomAccessFile; +import java.io.FileInputStream; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -27,6 +28,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -389,39 +392,6 @@ public final class ReplicationManager implements ActiveMQComponent { return repliToken; } - private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) { - if (!enabled) { - syncFileMessage.release(); - return null; - } - - final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory); - repliToken.replicationLineUp(); - - replicationStream.execute(() -> { - if (enabled) { - try { - pendingTokens.add(repliToken); - flowControl(syncFileMessage.expectedEncodeSize()); - if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) { - replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(), - syncFileMessage.getOffset(), syncFileMessage.getDataSize(), - lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null); - } else { - replicatingChannel.send(syncFileMessage); - } - } catch (Exception e) { - syncFileMessage.release(); - } - } else { - syncFileMessage.release(); - repliToken.replicationDone(); - } - }); - - return repliToken; - } - /** * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . @@ -590,52 +560,49 @@ public final class ReplicationManager implements ActiveMQComponent { if (!file.isOpen()) { file.open(); } - final int size = 1024 * 1024; - long fileSize = file.size(); + int size = 32 * 1024; int flowControlSize = 10; int packetsSent = 0; FlushAction action = new FlushAction(); - long offset = 0; - RandomAccessFile raf = null; - FileChannel fileChannel = null; try { - raf = new RandomAccessFile(file.getJavaFile(), "r"); - fileChannel = raf.getChannel(); - while (true) { - long chunkSize = Math.min(size, fileSize - offset); - int toSend = (int) chunkSize; - if (chunkSize > 0) { - if (chunkSize >= maxBytesToSend) { - toSend = (int) maxBytesToSend; - maxBytesToSend = 0; - } else { - maxBytesToSend = maxBytesToSend - chunkSize; - } - } - logger.debug("sending " + toSend + " bytes on file " + file.getFileName()); - // sending -1 or 0 bytes will close the file at the backup - // We cannot simply send everything of a file through the executor, - // otherwise we would run out of memory. - // so we don't use the executor here - sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize); - packetsSent++; - offset += toSend; + try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { - if (packetsSent % flowControlSize == 0) { - flushReplicationStream(action); + // We can afford having a single buffer here for this entire loop + // because sendReplicatePacket will encode the packet as a NettyBuffer + // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy + while (true) { + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + buffer.clear(); + ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); + final int bytesRead = channel.read(byteBuffer); + int toSend = bytesRead; + if (bytesRead > 0) { + if (bytesRead >= maxBytesToSend) { + toSend = (int) maxBytesToSend; + maxBytesToSend = 0; + } else { + maxBytesToSend = maxBytesToSend - bytesRead; + } + } + logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()); + // sending -1 or 0 bytes will close the file at the backup + // We cannot simply send everything of a file through the executor, + // otherwise we would run out of memory. + // so we don't use the executor here + sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); + packetsSent++; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } + if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) + break; } - if (toSend == 0 || maxBytesToSend == 0) - break; } flushReplicationStream(action); - - } catch (Exception e) { - if (raf != null) - raf.close(); - throw e; } finally { if (file.isOpen()) file.close(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java deleted file mode 100644 index f01e5e6adb..0000000000 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; -import java.util.HashMap; - -import io.netty.channel.embedded.EmbeddedChannel; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; -import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.utils.DataConstants; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES; - -public class ReplicationSyncFileMessageTest extends ActiveMQTestBase { - @Test - public void testNettyConnectionEncodeMessage() throws Exception { - int dataSize = 10; - NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false); - - SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); - SequentialFile file = factory.createSequentialFile("file1.bin"); - file.open(); - RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); - FileChannel fileChannel = raf.getChannel(); - ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, - null, 10, raf, fileChannel, 0, dataSize); - RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); - ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); - Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); - Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize); - file.close(); - } - - - @Test - public void testInVMConnectionEncodeMessage() throws Exception { - int fileId = 10; - InVMConnection conn = new InVMConnection(0, null, null, null); - - SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); - SequentialFile file = factory.createSequentialFile("file1.bin"); - file.open(); - RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); - FileChannel fileChannel = raf.getChannel(); - ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES, - null, fileId, raf, fileChannel, 0, 0); - RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null, null); - ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection); - Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT); - Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize()); - - Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE); - - ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage(); - decodedReplicationSyncFileMessage.decode(buffer); - Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES); - Assert.assertNull(decodedReplicationSyncFileMessage.getData()); - file.close(); - } -} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index c55764a096..c7ed8699c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.util; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -213,11 +211,6 @@ public class BackupSyncDelay implements Interceptor { return true; } - @Override - public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) { - return true; - } - @Override public boolean sendBatched(Packet packet) { throw new UnsupportedOperationException(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index 23ae5f9a7c..c9c975cf85 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,9 +29,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -82,29 +77,6 @@ public class NettyConnectionTest extends ActiveMQTestBase { } - @Test - public void testWritePacketAndFile() throws Exception { - EmbeddedChannel channel = createChannel(); - NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false); - - final int size = 1234; - - ActiveMQBuffer buff = conn.createTransportBuffer(size); - buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. - SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); - SequentialFile file = factory.createSequentialFile("file1.bin"); - file.open(); - RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r"); - FileChannel fileChannel = raf.getChannel(); - - conn.write(buff); - conn.write(raf, fileChannel, 0, size, future -> raf.close()); - channel.runPendingTasks(); - Assert.assertEquals(2, channel.outboundMessages().size()); - Assert.assertFalse(fileChannel.isOpen()); - file.close(); - } - @Test(expected = IllegalStateException.class) public void throwsExceptionOnBlockUntilWritableIfClosed() { EmbeddedChannel channel = createChannel();