ARTEMIS-2336 Use zero copy to replicate journal/page/large message file

This commit is contained in:
yang wei 2019-05-13 12:32:58 +08:00 committed by
parent 6896e84b7e
commit 85b93f0883
13 changed files with 523 additions and 159 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.core;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -66,6 +68,20 @@ public interface Channel {
*/
boolean send(Packet packet);
/**
* Sends a packet and file on this channel.
*
* @param packet the packet to send
* @param raf the file to send
* @param fileChannel the file channel retrieved from raf
* @param offset the position of the raf
* @param dataSize the data size to send
* @param callback callback after send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
* successful
*/
boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback);
/**
* Sends a packet on this channel.
*
@ -247,4 +263,8 @@ public interface Channel {
* @param transferring whether the channel is transferring
*/
void setTransferring(boolean transferring);
interface Callback {
void done(boolean success);
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -25,6 +27,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@ -274,6 +277,60 @@ public final class ChannelImpl implements Channel {
}
}
private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) {
packet.setChannelID(id);
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}
ActiveMQBuffer buffer = packet.encode(connection);
lock.lock();
try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}
// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
} finally {
lock.unlock();
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}
checkReconnectID(reconnectID);
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}
}
}
return buffer;
}
// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
@ -281,55 +338,7 @@ public final class ChannelImpl implements Channel {
}
synchronized (sendLock) {
packet.setChannelID(id);
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}
ActiveMQBuffer buffer = packet.encode(connection);
lock.lock();
try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}
// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
} finally {
lock.unlock();
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}
checkReconnectID(reconnectID);
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}
}
}
ActiveMQBuffer buffer = beforeSend(packet, reconnectID);
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
@ -347,6 +356,37 @@ public final class ChannelImpl implements Channel {
}
}
@Override
public boolean send(Packet packet,
RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
Callback callback) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
}
synchronized (sendLock) {
ActiveMQBuffer buffer = beforeSend(packet, -1);
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
connection.getTransportConnection().write(buffer);
connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
throw t;
}
return true;
}
}
private void checkReconnectID(int reconnectID) {
if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) {
throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted();

View File

@ -336,7 +336,11 @@ public class PacketImpl implements Packet {
}
protected void encodeSize(ActiveMQBuffer buffer) {
size = buffer.writerIndex();
encodeSize(buffer, buffer.writerIndex());
}
protected void encodeSize(ActiveMQBuffer buffer, int size) {
this.size = size;
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
@ -345,9 +349,10 @@ public class PacketImpl implements Packet {
}
protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
return createPacket(connection, expectedEncodeSize());
}
int size = expectedEncodeSize();
protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) {
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {

View File

@ -16,7 +16,10 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -29,6 +32,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -350,6 +355,18 @@ public class NettyConnection implements Connection {
return canWrite;
}
private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) {
if (channel.pipeline().get(SslHandler.class) == null) {
return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
} else {
try {
return new ChunkedFile(raf, offset, dataSize, 8192);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public final void write(ActiveMQBuffer buffer,
final boolean flush,
@ -390,6 +407,30 @@ public class NettyConnection implements Connection {
}
}
@Override
public void write(RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
final ChannelFutureListener futureListener) {
final int readableBytes = dataSize;
if (logger.isDebugEnabled()) {
final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
if (remainingBytes < 0) {
logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
}
}
//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final Channel channel = this.channel;
assert readableBytes >= 0;
ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize));
if (futureListener != null) {
channelFuture.addListener(futureListener);
}
}
private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
if (!channel.eventLoop().inEventLoop()) {
waitFor(promise, DEFAULT_WAIT_MILLIS);

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.io.File;
import java.nio.channels.FileChannel;
import io.netty.channel.DefaultFileRegion;
public class NonClosingDefaultFileRegion extends DefaultFileRegion {
public NonClosingDefaultFileRegion(FileChannel file, long position, long count) {
super(file, position, count);
}
public NonClosingDefaultFileRegion(File f, long position, long count) {
super(f, position, count);
}
@Override
protected void deallocate() {
// Overridden to avoid closing the file
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelFutureListener;
@ -101,6 +103,8 @@ public interface Connection {
*/
void write(ActiveMQBuffer buffer);
void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener);
/**
* This should close the internal channel without calling any listeners.
* This is to avoid a situation where the broker is busy writing on an internal thread.

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.protocol.core.impl;
import javax.security.auth.Subject;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@ -386,6 +388,15 @@ public class ChannelImplTest {
}
@Override
public void write(RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
ChannelFutureListener channelFutureListener) {
}
@Override
public void forceClose() {

View File

@ -16,22 +16,30 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.utils.DataConstants;
import org.jboss.logging.Logger;
/**
* Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls
* which extra information is sent.
*/
public final class ReplicationSyncFileMessage extends PacketImpl {
private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class);
/**
* The JournalType or {@code null} if sync'ing large-messages.
@ -43,10 +51,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
*/
private long fileId;
private int dataSize;
private ByteBuf byteBuffer;
private byte[] byteArray;
private SimpleString pageStoreName;
private FileType fileType;
private RandomAccessFile raf;
private FileChannel fileChannel;
private long offset;
public enum FileType {
JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
@ -78,14 +88,18 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
SimpleString storeName,
long id,
int size,
ByteBuf buffer) {
RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int size) {
this();
this.byteBuffer = buffer;
this.pageStoreName = storeName;
this.dataSize = size;
this.fileId = id;
this.raf = raf;
this.fileChannel = fileChannel;
this.journalType = content;
this.offset = offset;
determineType();
}
@ -99,10 +113,30 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
}
public long getFileId() {
return fileId;
}
public int getDataSize() {
return dataSize;
}
public RandomAccessFile getRaf() {
return raf;
}
public FileChannel getFileChannel() {
return fileChannel;
}
public long getOffset() {
return offset;
}
@Override
public int expectedEncodeSize() {
int size = PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
if (fileId == -1)
return size;
@ -125,7 +159,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
if (dataSize > 0) {
size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
size += dataSize;
}
return size;
@ -150,30 +184,55 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
default:
// no-op
}
buffer.writeInt(dataSize);
/*
* sending -1 will close the file in case of a journal, but not in case of a largeMessage
* (which might receive appends)
*/
if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
}
release();
@Override
public ActiveMQBuffer encode(CoreRemotingConnection connection) {
if (fileId != -1 && dataSize > 0) {
ActiveMQBuffer buffer;
int bufferSize = expectedEncodeSize();
int encodedSize = bufferSize;
boolean isNetty = false;
if (connection != null && connection.getTransportConnection() instanceof NettyConnection) {
bufferSize -= dataSize;
isNetty = true;
}
buffer = createPacket(connection, bufferSize);
encodeHeader(buffer);
encodeRest(buffer, connection);
if (!isNetty) {
ByteBuffer byteBuffer;
if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) {
byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
} else {
byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes());
}
readFile(byteBuffer);
buffer.writerIndex(buffer.capacity());
}
encodeSize(buffer, encodedSize);
return buffer;
} else {
return super.encode(connection);
}
}
@Override
public void release() {
if (byteBuffer != null) {
byteBuffer.release();
byteBuffer = null;
if (raf != null) {
try {
raf.close();
} catch (IOException e) {
logger.error("Close file " + this + " failed", e);
}
}
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
fileId = buffer.readLong();
if (fileId == -1) return;
switch (FileType.getFileType(buffer.readByte())) {
case JOURNAL: {
journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
@ -197,6 +256,14 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
}
private void readFile(ByteBuffer buffer) {
try {
fileChannel.read(buffer, offset);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public long getId() {
return fileId;
}
@ -218,61 +285,22 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + Arrays.hashCode(byteArray);
result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode());
result = prime * result + dataSize;
result = prime * result + (int) (fileId ^ (fileId >>> 32));
result = prime * result + ((fileType == null) ? 0 : fileType.hashCode());
result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode());
return result;
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o;
return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof ReplicationSyncFileMessage)) {
return false;
}
ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj;
if (!Arrays.equals(byteArray, other.byteArray)) {
return false;
}
if (byteBuffer == null) {
if (other.byteBuffer != null) {
return false;
}
} else if (!byteBuffer.equals(other.byteBuffer)) {
return false;
}
if (dataSize != other.dataSize) {
return false;
}
if (fileId != other.fileId) {
return false;
}
if (fileType != other.fileType) {
return false;
}
if (journalType != other.journalType) {
return false;
}
if (pageStoreName == null) {
if (other.pageStoreName != null) {
return false;
}
} else if (!pageStoreName.equals(other.pageStoreName)) {
return false;
}
return true;
public int hashCode() {
int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset);
result = 31 * result + Arrays.hashCode(byteArray);
return result;
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.invm;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -242,6 +244,28 @@ public class InVMConnection implements Connection {
}
@Override
public void write(RandomAccessFile raf,
FileChannel fileChannel,
long offset,
int dataSize,
final ChannelFutureListener futureListener) {
if (futureListener == null) {
return;
}
try {
executor.execute(() -> {
try {
futureListener.operationComplete(null);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});
} catch (RejectedExecutionException e) {
}
}
@Override
public String getRemoteAddress() {
return "invm:" + serverID;

View File

@ -16,8 +16,7 @@
*/
package org.apache.activemq.artemis.core.replication;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.LinkedHashSet;
@ -28,8 +27,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -392,6 +389,39 @@ public final class ReplicationManager implements ActiveMQComponent {
return repliToken;
}
private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) {
if (!enabled) {
syncFileMessage.release();
return null;
}
final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory);
repliToken.replicationLineUp();
replicationStream.execute(() -> {
if (enabled) {
try {
pendingTokens.add(repliToken);
flowControl(syncFileMessage.expectedEncodeSize());
if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) {
replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(),
syncFileMessage.getOffset(), syncFileMessage.getDataSize(),
lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null);
} else {
replicatingChannel.send(syncFileMessage);
}
} catch (Exception e) {
syncFileMessage.release();
}
} else {
syncFileMessage.release();
repliToken.replicationDone();
}
});
return repliToken;
}
/**
* This was written as a refactoring of sendReplicatePacket.
* In case you refactor this in any way, this method must hold a lock on replication lock. .
@ -560,49 +590,52 @@ public final class ReplicationManager implements ActiveMQComponent {
if (!file.isOpen()) {
file.open();
}
int size = 32 * 1024;
final int size = 1024 * 1024;
long fileSize = file.size();
int flowControlSize = 10;
int packetsSent = 0;
FlushAction action = new FlushAction();
long offset = 0;
RandomAccessFile raf = null;
FileChannel fileChannel = null;
try {
try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
// We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
while (true) {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer);
int toSend = bytesRead;
if (bytesRead > 0) {
if (bytesRead >= maxBytesToSend) {
toSend = (int) maxBytesToSend;
maxBytesToSend = 0;
} else {
maxBytesToSend = maxBytesToSend - bytesRead;
}
raf = new RandomAccessFile(file.getJavaFile(), "r");
fileChannel = raf.getChannel();
while (true) {
long chunkSize = Math.min(size, fileSize - offset);
int toSend = (int) chunkSize;
if (chunkSize > 0) {
if (chunkSize >= maxBytesToSend) {
toSend = (int) maxBytesToSend;
maxBytesToSend = 0;
} else {
maxBytesToSend = maxBytesToSend - chunkSize;
}
logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
// sending -1 or 0 bytes will close the file at the backup
// We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory.
// so we don't use the executor here
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
packetsSent++;
if (packetsSent % flowControlSize == 0) {
flushReplicationStream(action);
}
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
logger.debug("sending " + toSend + " bytes on file " + file.getFileName());
// sending -1 or 0 bytes will close the file at the backup
// We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory.
// so we don't use the executor here
sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize);
packetsSent++;
offset += toSend;
if (packetsSent % flowControlSize == 0) {
flushReplicationStream(action);
}
if (toSend == 0 || maxBytesToSend == 0)
break;
}
flushReplicationStream(action);
} catch (Exception e) {
if (raf != null)
raf.close();
throw e;
} finally {
if (file.isOpen())
file.close();

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DataConstants;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES;
public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
@Test
public void testNettyConnectionEncodeMessage() throws Exception {
int dataSize = 10;
NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false);
SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
SequentialFile file = factory.createSequentialFile("file1.bin");
file.open();
RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
FileChannel fileChannel = raf.getChannel();
ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
null, 10, raf, fileChannel, 0, dataSize);
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize);
file.close();
}
@Test
public void testInVMConnectionEncodeMessage() throws Exception {
int fileId = 10;
InVMConnection conn = new InVMConnection(0, null, null, null);
SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
SequentialFile file = factory.createSequentialFile("file1.bin");
file.open();
RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
FileChannel fileChannel = raf.getChannel();
ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
null, fileId, raf, fileChannel, 0, 0);
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize());
Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE);
ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage();
decodedReplicationSyncFileMessage.decode(buffer);
Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES);
Assert.assertNull(decodedReplicationSyncFileMessage.getData());
file.close();
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.util;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -211,6 +213,11 @@ public class BackupSyncDelay implements Interceptor {
return true;
}
@Override
public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) {
return true;
}
@Override
public boolean sendBatched(Packet packet) {
throw new UnsupportedOperationException();

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -29,6 +31,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -77,6 +82,29 @@ public class NettyConnectionTest extends ActiveMQTestBase {
}
@Test
public void testWritePacketAndFile() throws Exception {
EmbeddedChannel channel = createChannel();
NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false);
final int size = 1234;
ActiveMQBuffer buff = conn.createTransportBuffer(size);
buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
SequentialFile file = factory.createSequentialFile("file1.bin");
file.open();
RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
FileChannel fileChannel = raf.getChannel();
conn.write(buff);
conn.write(raf, fileChannel, 0, size, future -> raf.close());
channel.runPendingTasks();
Assert.assertEquals(2, channel.outboundMessages().size());
Assert.assertFalse(fileChannel.isOpen());
file.close();
}
@Test(expected = IllegalStateException.class)
public void throwsExceptionOnBlockUntilWritableIfClosed() {
EmbeddedChannel channel = createChannel();