ARTEMIS-332 avoid shutting down the server after interrupted threads on paging

This commit is contained in:
Clebert Suconic 2016-01-07 16:07:30 -05:00
parent 6b42f26283
commit 7820fd6d60
5 changed files with 189 additions and 10 deletions

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
@ -93,6 +94,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
fileSize = channel.size();
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
@ -117,6 +121,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
channel.force(false);
channel.position(0);
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
@ -125,6 +132,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
fileSize = channel.size();
}
public synchronized void waitForClose() throws InterruptedException {
while (isOpen()) {
wait();
}
}
@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
super.close();
@ -145,6 +158,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
rfile.close();
}
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
@ -178,6 +194,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
return bytesRead;
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
if (callback != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
@ -195,6 +214,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
try {
channel.force(false);
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
@ -211,6 +233,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
try {
return channel.size();
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
@ -223,6 +248,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
super.position(pos);
channel.position(pos);
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
@ -291,6 +319,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
try {
doInternalWrite(bytes, sync, callback);
}
catch (ClosedChannelException e) {
throw e;
}
catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
}
@ -306,6 +337,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
try {
doInternalWrite(bytes, sync, callback);
}
catch (ClosedChannelException e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
}
catch (IOException e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);

View File

@ -38,6 +38,4 @@ public interface PagingStoreFactory {
SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
void criticalException(Throwable e);
}

View File

@ -87,10 +87,6 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
// Public --------------------------------------------------------
public void criticalException(Throwable e) {
critialErrorListener.onIOException(e, e.getMessage(), null);
}
@Override
public void stop() {
}

View File

@ -17,10 +17,18 @@
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
import org.junit.Assert;
import org.junit.Test;
public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
@ -29,4 +37,151 @@ public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
return new NIOSequentialFileFactory(new File(folder), true, 1);
}
@Test
public void testInterrupts() throws Throwable {
final EncodingSupport fakeEncoding = new EncodingSupport() {
@Override
public int getEncodeSize() {
return 10;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeBytes(new byte[10]);
}
@Override
public void decode(ActiveMQBuffer buffer) {
}
};
final AtomicInteger calls = new AtomicInteger(0);
final NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
new Exception("shutdown").printStackTrace();
calls.incrementAndGet();
}
}, 1);
Thread threadOpen = new Thread() {
public void run() {
try {
Thread.currentThread().interrupt();
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
threadOpen.start();
threadOpen.join();
Thread threadClose = new Thread() {
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
file.write(fakeEncoding, true);
Thread.currentThread().interrupt();
file.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
threadClose.start();
threadClose.join();
Thread threadWrite = new Thread() {
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
Thread.currentThread().interrupt();
file.write(fakeEncoding, true);
file.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
threadWrite.start();
threadWrite.join();
Thread threadFill = new Thread() {
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
Thread.currentThread().interrupt();
file.fill(1024);
file.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
threadFill.start();
threadFill.join();
Thread threadWriteDirect = new Thread() {
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[10]);
Thread.currentThread().interrupt();
file.writeDirect(buffer, true);
file.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
threadWriteDirect.start();
threadWriteDirect.join();
Thread threadRead = new Thread() {
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
file.write(fakeEncoding, true);
file.position(0);
ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize());
Thread.currentThread().interrupt();
file.read(readBytes);
file.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
threadRead.start();
threadRead.join();
// An interrupt exception shouldn't issue a shutdown
Assert.assertEquals(0, calls.get());
}
}

View File

@ -782,10 +782,6 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
static final class FakeStoreFactory implements PagingStoreFactory {
@Override
public void criticalException(Throwable e) {
}
final SequentialFileFactory factory;
public FakeStoreFactory() {