ARTEMIS-490 Fixing LargeMessage copy through replication

this will fix cases like DLQ and Diverts
This commit is contained in:
Clebert Suconic 2016-04-18 17:55:49 -04:00 committed by Andy Taylor
parent dcf651376f
commit e81fa5c359
8 changed files with 49 additions and 59 deletions
artemis-journal/src/main/java/org/apache/activemq/artemis/core/io
artemis-server/src
main/java/org/apache/activemq/artemis/core
test/java/org/apache/activemq/artemis/core/server/impl

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
@ -113,14 +114,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
for (;;) {
buffer.rewind();
int size = this.read(buffer);
newFileName.writeDirect(buffer, false);
if (size < 10 * 1024) {
break;
}
}
FileIOUtil.copyData(this, newFileName, buffer);
newFileName.close();
this.close();
}

View File

@ -264,51 +264,63 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
}
@Override
public synchronized ServerMessage copy() {
public ServerMessage copy() {
SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable);
ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID);
return newMessage;
}
public void copyFrom(final SequentialFile fileSource) throws Exception {
this.bodySize = -1;
this.pendingCopy = fileSource;
}
@Override
public void finishCopy() throws Exception {
if (pendingCopy != null) {
SequentialFile copyTo = createFile();
try {
this.pendingRecordID = storageManager.storePendingLargeMessage(this.messageID);
copyTo.open();
pendingCopy.open();
pendingCopy.copyTo(copyTo);
}
finally {
copyTo.close();
pendingCopy.close();
pendingCopy = null;
}
closeFile();
bodySize = -1;
file = null;
}
}
/**
* The copy of the file itself will be done later by {@link LargeServerMessageImpl#finishCopy()}
*/
@Override
public synchronized ServerMessage copy(final long newID) {
public ServerMessage copy(final long newID) {
try {
SequentialFile newfile = storageManager.createFileForLargeMessage(newID, durable);
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
byte[] bufferBytes = new byte[100 * 1024];
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
long oldPosition = file.position();
boolean originallyOpen = file.isOpen();
file.open();
file.position(0);
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = file.read(buffer);
byte[] bufferToWrite;
if (bytesRead == 0) {
break;
}
else if (bytesRead == bufferBytes.length) {
bufferToWrite = bufferBytes;
}
else {
bufferToWrite = new byte[bytesRead];
System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
}
newMessage.addBytes(bufferToWrite);
if (bytesRead < bufferBytes.length) {
break;
}
}
file.position(oldPosition);
if (!originallyOpen) {
file.close();
}
LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newfile, newID);
newMessage.copyFrom(createFile());
return newMessage;
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.lareMessageErrorCopying(e, this);

View File

@ -48,8 +48,6 @@ public interface ServerMessage extends MessageInternal, EncodingSupport {
ServerMessage copy(long newID);
void finishCopy() throws Exception;
ServerMessage copy();
int getMemoryEstimate();

View File

@ -148,7 +148,6 @@ public class Redistributor implements Consumer {
}
if (!reference.getMessage().isLargeMessage()) {
routingInfo.getB().finishCopy();
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
@ -160,7 +159,6 @@ public class Redistributor implements Consumer {
@Override
public void run() {
try {
routingInfo.getB().finishCopy();
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);

View File

@ -88,7 +88,6 @@ public class DivertImpl implements Divert {
// Shouldn't copy if it's not routed anywhere else
if (!forwardAddress.equals(message.getAddress())) {
copy = message.copy(id);
copy.finishCopy();
// This will set the original MessageId, and the original address
copy.setOriginalHeaders(message, null, false);

View File

@ -2222,7 +2222,6 @@ public class QueueImpl implements Queue {
}
}
copyMessage.finishCopy();
postOffice.processRoute(copyMessage, routingContext, false);
ref.handled();

View File

@ -185,10 +185,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
return m;
}
@Override
public void finishCopy() throws Exception {
}
@Override
public ServerMessage copy() {
// This is a simple copy, used only to avoid changing original properties
@ -216,7 +212,6 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
*/
ServerMessage copy = copy(newID);
copy.finishCopy();
if (copyOriginalHeaders) {
copy.setOriginalHeaders(this, originalReference, expiry);

View File

@ -336,11 +336,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public void finishCopy() throws Exception {
}
@Override
public ServerMessage copy() {
return null;