ARTEMIS-2513 Large message's copy may be interfered by other threads

In LargeMessageImpl.copy(long) it need to open the underlying
file in order to read and copy bytes into the new copied message.
However there is a chance that another thread can come in and close
the file in the middle, making the copy failed
with "channel is null" error.

This is happening in cases where a large message is sent to a jms
topic (multicast address). During delivery it to multiple
subscribers, some consumer is doing delivery and closed the
underlying file after. Some other consumer is rolling back
the messages and eventually move it to DLQ (which will call
the above copy method). So there is a chance this bug being hit on.
This commit is contained in:
Howard Gao 2019-10-09 23:57:57 +08:00 committed by Clebert Suconic
parent f19f75e2fe
commit 6177d32774
2 changed files with 158 additions and 42 deletions

View File

@ -368,52 +368,51 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
boolean originallyOpen = file != null && file.isOpen();
//clone a SequentialFile to avoid concurrent access
ensureFileExists(false);
SequentialFile cloneFile = file.cloneFile();
validateFile();
try {
byte[] bufferBytes = new byte[100 * 1024];
byte[] bufferBytes = new byte[100 * 1024];
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
long oldPosition = file.position();
if (!file.isOpen()) {
file.open();
}
file.position(0);
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = file.read(buffer);
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
} else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
bufferToWrite = bufferBytes;
} else {
bufferToWrite = new byte[bytesRead];
System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
if (!cloneFile.isOpen()) {
cloneFile.open();
}
newMessage.addBytes(bufferToWrite);
cloneFile.position(0);
if (bytesRead < bufferBytes.length) {
break;
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = cloneFile.read(buffer);
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
} else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
bufferToWrite = bufferBytes;
} else {
bufferToWrite = new byte[bytesRead];
System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
}
newMessage.addBytes(bufferToWrite);
if (bytesRead < bufferBytes.length) {
break;
}
}
}
file.position(oldPosition);
if (!originallyOpen) {
file.close(false);
newMessage.getFile().close();
} finally {
if (!file.isOpen()) {
newMessage.getFile().close();
}
cloneFile.close();
}
return newMessage;
@ -469,9 +468,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
// Private -------------------------------------------------------
public synchronized void validateFile() throws ActiveMQException {
this.ensureFileExists(true);
}
public synchronized void ensureFileExists(boolean toOpen) throws ActiveMQException {
try {
if (file == null) {
if (messageID <= 0) {
@ -480,7 +481,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
file = createFile();
openFile();
if (toOpen) {
openFile();
}
bodySize = file.size();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.largemessage;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashSet;
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After;
@ -335,6 +338,40 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
assertTrue(sync.get());
}
@Test
public void testLargeServerMessageCopyIsolation() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
try {
LargeServerMessageImpl largeMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
largeMessage.setMessageID(23456);
for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
largeMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
//now replace the underlying file with a fake
replaceFile(largeMessage);
Message copied = largeMessage.copy(99999);
assertEquals(99999, copied.getMessageID());
} finally {
server.stop();
}
}
private void replaceFile(LargeServerMessageImpl largeMessage) throws Exception {
SequentialFile originalFile = largeMessage.getFile();
MockSequentialFile mockFile = new MockSequentialFile(originalFile);
Field fileField = LargeServerMessageImpl.class.getDeclaredField("file");
fileField.setAccessible(true);
fileField.set(largeMessage, mockFile);
mockFile.close();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -342,5 +379,81 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
private class MockSequentialFile extends AbstractSequentialFile {
private SequentialFile originalFile;
MockSequentialFile(SequentialFile originalFile) throws Exception {
super(originalFile.getJavaFile().getParentFile(), originalFile.getFileName(), new FakeSequentialFileFactory(), null);
this.originalFile = originalFile;
this.originalFile.close();
}
@Override
public void open() throws Exception {
//open and close it right away to simulate failure condition
originalFile.open();
originalFile.close();
}
@Override
public void open(int maxIO, boolean useExecutor) throws Exception {
}
@Override
public boolean isOpen() {
return originalFile.isOpen();
}
@Override
public int calculateBlockStart(int position) throws Exception {
return originalFile.calculateBlockStart(position);
}
@Override
public void fill(int size) throws Exception {
originalFile.fill(size);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
originalFile.writeDirect(bytes, sync, callback);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
originalFile.writeDirect(bytes, sync);
}
@Override
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
originalFile.blockingWriteDirect(bytes, sync, releaseBuffer);
}
@Override
public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
return originalFile.read(bytes, callback);
}
@Override
public int read(ByteBuffer bytes) throws Exception {
return originalFile.read(bytes);
}
@Override
public void sync() throws IOException {
originalFile.sync();
}
@Override
public long size() throws Exception {
return originalFile.size();
}
@Override
public SequentialFile cloneFile() {
return originalFile.cloneFile();
}
}
}