This commit is contained in:
Clebert Suconic 2019-10-14 15:59:31 -04:00
commit 3687ae400c
2 changed files with 158 additions and 42 deletions

View File

@ -368,52 +368,51 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
boolean originallyOpen = file != null && file.isOpen();
//clone a SequentialFile to avoid concurrent access
ensureFileExists(false);
SequentialFile cloneFile = file.cloneFile();
validateFile();
try {
byte[] bufferBytes = new byte[100 * 1024];
byte[] bufferBytes = new byte[100 * 1024];
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
long oldPosition = file.position();
if (!file.isOpen()) {
file.open();
}
file.position(0);
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = file.read(buffer);
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
} else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
bufferToWrite = bufferBytes;
} else {
bufferToWrite = new byte[bytesRead];
System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
if (!cloneFile.isOpen()) {
cloneFile.open();
}
newMessage.addBytes(bufferToWrite);
cloneFile.position(0);
if (bytesRead < bufferBytes.length) {
break;
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = cloneFile.read(buffer);
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
} else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
bufferToWrite = bufferBytes;
} else {
bufferToWrite = new byte[bytesRead];
System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
}
newMessage.addBytes(bufferToWrite);
if (bytesRead < bufferBytes.length) {
break;
}
}
}
file.position(oldPosition);
if (!originallyOpen) {
file.close(false);
newMessage.getFile().close();
} finally {
if (!file.isOpen()) {
newMessage.getFile().close();
}
cloneFile.close();
}
return newMessage;
@ -469,9 +468,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
// Private -------------------------------------------------------
public synchronized void validateFile() throws ActiveMQException {
this.ensureFileExists(true);
}
public synchronized void ensureFileExists(boolean toOpen) throws ActiveMQException {
try {
if (file == null) {
if (messageID <= 0) {
@ -480,7 +481,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
file = createFile();
openFile();
if (toOpen) {
openFile();
}
bodySize = file.size();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.largemessage;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashSet;
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After;
@ -335,6 +338,40 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
assertTrue(sync.get());
}
@Test
public void testLargeServerMessageCopyIsolation() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
try {
LargeServerMessageImpl largeMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
largeMessage.setMessageID(23456);
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
largeMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
//now replace the underlying file with a fake
replaceFile(largeMessage);
Message copied = largeMessage.copy(99999);
assertEquals(99999, copied.getMessageID());
} finally {
server.stop();
}
}
private void replaceFile(LargeServerMessageImpl largeMessage) throws Exception {
SequentialFile originalFile = largeMessage.getFile();
MockSequentialFile mockFile = new MockSequentialFile(originalFile);
Field fileField = LargeServerMessageImpl.class.getDeclaredField("file");
fileField.setAccessible(true);
fileField.set(largeMessage, mockFile);
mockFile.close();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -342,5 +379,81 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
private class MockSequentialFile extends AbstractSequentialFile {
private SequentialFile originalFile;
MockSequentialFile(SequentialFile originalFile) throws Exception {
super(originalFile.getJavaFile().getParentFile(), originalFile.getFileName(), new FakeSequentialFileFactory(), null);
this.originalFile = originalFile;
this.originalFile.close();
}
@Override
public void open() throws Exception {
//open and close it right away to simulate failure condition
originalFile.open();
originalFile.close();
}
@Override
public void open(int maxIO, boolean useExecutor) throws Exception {
}
@Override
public boolean isOpen() {
return originalFile.isOpen();
}
@Override
public int calculateBlockStart(int position) throws Exception {
return originalFile.calculateBlockStart(position);
}
@Override
public void fill(int size) throws Exception {
originalFile.fill(size);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
originalFile.writeDirect(bytes, sync, callback);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
originalFile.writeDirect(bytes, sync);
}
@Override
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
originalFile.blockingWriteDirect(bytes, sync, releaseBuffer);
}
@Override
public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
return originalFile.read(bytes, callback);
}
@Override
public int read(ByteBuffer bytes) throws Exception {
return originalFile.read(bytes);
}
@Override
public void sync() throws IOException {
originalFile.sync();
}
@Override
public long size() throws Exception {
return originalFile.size();
}
@Override
public SequentialFile cloneFile() {
return originalFile.cloneFile();
}
}
}