ARTEMIS-2186 Large message incomplete when server is crashed

This commit is contained in:
yang wei 2018-11-28 20:37:09 +08:00 committed by Michael Andre Pearce
parent de90d8cfe1
commit 3954f0183f
2 changed files with 160 additions and 1 deletions

View File

@ -327,6 +327,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
public synchronized void releaseResources() {
if (file != null && file.isOpen()) {
try {
file.sync();
file.close();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);

View File

@ -16,11 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.largemessage;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -30,6 +36,10 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.security.Role;
@ -38,6 +48,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -177,7 +188,154 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
}
}
// Package protected ---------------------------------------------
@Test
public void testLargeServerMessageSync() throws Exception {
final AtomicBoolean open = new AtomicBoolean(false);
final AtomicBoolean sync = new AtomicBoolean(false);
JournalStorageManager storageManager = new JournalStorageManager(createDefaultInVMConfig(), EmptyCriticalAnalyzer.getInstance(), getOrderedExecutor(), getOrderedExecutor()) {
@Override
public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
return new SequentialFile() {
@Override
public boolean isOpen() {
return open.get();
}
@Override
public boolean exists() {
return true;
}
@Override
public void open() throws Exception {
open.set(true);
}
@Override
public void open(int maxIO, boolean useExecutor) throws Exception {
open.set(true);
}
@Override
public boolean fits(int size) {
return false;
}
@Override
public int calculateBlockStart(int position) throws Exception {
return 0;
}
@Override
public String getFileName() {
return null;
}
@Override
public void fill(int size) throws Exception {
}
@Override
public void delete() throws IOException, InterruptedException, ActiveMQException {
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
}
@Override
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
}
@Override
public void write(EncodingSupport bytes, boolean sync) throws Exception {
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
}
@Override
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
}
@Override
public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
return 0;
}
@Override
public int read(ByteBuffer bytes) throws Exception {
return 0;
}
@Override
public void position(long pos) throws IOException {
}
@Override
public long position() {
return 0;
}
@Override
public void close() throws Exception {
open.set(false);
}
@Override
public void sync() throws IOException {
sync.set(true);
}
@Override
public long size() throws Exception {
return 0;
}
@Override
public void renameTo(String newFileName) throws Exception {
}
@Override
public SequentialFile cloneFile() {
return null;
}
@Override
public void copyTo(SequentialFile newFileName) throws Exception {
}
@Override
public void setTimedBuffer(TimedBuffer buffer) {
}
@Override
public File getJavaFile() {
return null;
}
};
}
};
LargeServerMessageImpl largeServerMessage = new LargeServerMessageImpl(storageManager);
largeServerMessage.setMessageID(1234);
largeServerMessage.addBytes(new byte[0]);
assertTrue(open.get());
largeServerMessage.releaseResources();
assertTrue(sync.get());
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------