diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index 43142674d7..487d8a52bf 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; +import org.apache.activemq.artemis.core.io.util.FileIOUtil; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; @@ -113,14 +114,7 @@ public abstract class AbstractSequentialFile implements SequentialFile { ByteBuffer buffer = ByteBuffer.allocate(10 * 1024); - for (;;) { - buffer.rewind(); - int size = this.read(buffer); - newFileName.writeDirect(buffer, false); - if (size < 10 * 1024) { - break; - } - } + FileIOUtil.copyData(this, newFileName, buffer); newFileName.close(); this.close(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index c24924abaa..3f6f4d7393 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -264,51 +264,63 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } @Override - public synchronized ServerMessage copy() { + public ServerMessage copy() { SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable); ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID); return newMessage; } - public void copyFrom(final SequentialFile fileSource) throws Exception { - this.bodySize = -1; - this.pendingCopy = fileSource; - } - @Override - public void finishCopy() throws Exception { - if (pendingCopy != null) { - SequentialFile copyTo = createFile(); - try { - this.pendingRecordID = storageManager.storePendingLargeMessage(this.messageID); - copyTo.open(); - pendingCopy.open(); - pendingCopy.copyTo(copyTo); - } - finally { - copyTo.close(); - pendingCopy.close(); - pendingCopy = null; - } - - closeFile(); - bodySize = -1; - file = null; - } - } - - /** - * The copy of the file itself will be done later by {@link LargeServerMessageImpl#finishCopy()} - */ - @Override - public synchronized ServerMessage copy(final long newID) { + public ServerMessage copy(final long newID) { try { - SequentialFile newfile = storageManager.createFileForLargeMessage(newID, durable); + LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); + + + byte[] bufferBytes = new byte[100 * 1024]; + + ByteBuffer buffer = ByteBuffer.wrap(bufferBytes); + + long oldPosition = file.position(); + + boolean originallyOpen = file.isOpen(); + file.open(); + file.position(0); + + for (;;) { + // The buffer is reused... + // We need to make sure we clear the limits and the buffer before reusing it + buffer.clear(); + int bytesRead = file.read(buffer); + + byte[] bufferToWrite; + if (bytesRead == 0) { + break; + } + else if (bytesRead == bufferBytes.length) { + bufferToWrite = bufferBytes; + } + else { + bufferToWrite = new byte[bytesRead]; + System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead); + } + + newMessage.addBytes(bufferToWrite); + + if (bytesRead < bufferBytes.length) { + break; + } + } + + file.position(oldPosition); + + if (!originallyOpen) { + file.close(); + } - LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newfile, newID); - newMessage.copyFrom(createFile()); return newMessage; + + } catch (Exception e) { ActiveMQServerLogger.LOGGER.lareMessageErrorCopying(e, this); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java index 73a4df5f43..40dc50f1a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java @@ -48,8 +48,6 @@ public interface ServerMessage extends MessageInternal, EncodingSupport { ServerMessage copy(long newID); - void finishCopy() throws Exception; - ServerMessage copy(); int getMemoryEstimate(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 7d24dc6876..339293b114 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -148,7 +148,6 @@ public class Redistributor implements Consumer { } if (!reference.getMessage().isLargeMessage()) { - routingInfo.getB().finishCopy(); postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); @@ -160,7 +159,6 @@ public class Redistributor implements Consumer { @Override public void run() { try { - routingInfo.getB().finishCopy(); postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index c2f8b90570..b90db75f2a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -88,7 +88,6 @@ public class DivertImpl implements Divert { // Shouldn't copy if it's not routed anywhere else if (!forwardAddress.equals(message.getAddress())) { copy = message.copy(id); - copy.finishCopy(); // This will set the original MessageId, and the original address copy.setOriginalHeaders(message, null, false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 449704b6f9..6f91a0a936 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2222,7 +2222,6 @@ public class QueueImpl implements Queue { } } - copyMessage.finishCopy(); postOffice.processRoute(copyMessage, routingContext, false); ref.handled(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java index e7a7e67d85..5e5aebea7c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java @@ -185,10 +185,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { return m; } - @Override - public void finishCopy() throws Exception { - } - @Override public ServerMessage copy() { // This is a simple copy, used only to avoid changing original properties @@ -216,7 +212,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { */ ServerMessage copy = copy(newID); - copy.finishCopy(); if (copyOriginalHeaders) { copy.setOriginalHeaders(this, originalReference, expiry); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 53edb7971c..04a587dfb0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -336,11 +336,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } - @Override - public void finishCopy() throws Exception { - - } - @Override public ServerMessage copy() { return null;