https://issues.apache.org/activemq/browse/AMQ-2042 - making amq store resilent to 'no space left'

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@882126 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-11-19 12:25:52 +00:00
parent 9b260dc37e
commit 04b3f8e49b
2 changed files with 41 additions and 7 deletions

View File

@ -21,6 +21,7 @@ import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayOutputStream;
@ -48,7 +49,7 @@ class DataFileAppender {
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
private boolean running;
protected boolean running;
private Thread thread;
public static class WriteKey {
@ -82,6 +83,7 @@ class DataFileAppender {
public final WriteCommand first;
public final CountDownLatch latch = new CountDownLatch(1);
public int size;
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
this.dataFile = dataFile;
@ -179,6 +181,10 @@ class DataFileAppender {
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
IOException exception = batch.exception.get();
if (exception != null) {
throw exception;
}
}
return location;
@ -216,10 +222,7 @@ class DataFileAppender {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
if (firstAsyncException != null) {
throw firstAsyncException;
}
if (!running) {
running = true;
thread = new Thread() {
@ -231,6 +234,11 @@ class DataFileAppender {
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
firstAsyncException = null;
}
if (firstAsyncException != null) {
throw firstAsyncException;
}
if (nextWriteBatch == null) {
@ -298,6 +306,7 @@ class DataFileAppender {
protected void processQueue() {
DataFile dataFile = null;
RandomAccessFile file = null;
WriteBatch wb = null;
try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@ -321,7 +330,7 @@ class DataFileAppender {
enqueueMutex.notify();
}
WriteBatch wb = (WriteBatch)o;
wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
dataFile.closeRandomAccessFile(file);
@ -406,6 +415,14 @@ class DataFileAppender {
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
if (wb != null) {
wb.latch.countDown();
wb.exception.set(e);
}
if (nextWriteBatch != null) {
nextWriteBatch.latch.countDown();
nextWriteBatch.exception.set(e);
}
}
} catch (InterruptedException e) {
} finally {

View File

@ -47,6 +47,7 @@ class NIODataFileAppender extends DataFileAppender {
DataFile dataFile = null;
RandomAccessFile file = null;
FileChannel channel = null;
WriteBatch wb = null;
try {
@ -81,7 +82,7 @@ class NIODataFileAppender extends DataFileAppender {
enqueueMutex.notify();
}
WriteBatch wb = (WriteBatch)o;
wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
dataFile.closeRandomAccessFile(file);
@ -180,16 +181,32 @@ class NIODataFileAppender extends DataFileAppender {
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
if (wb != null) {
wb.latch.countDown();
wb.exception.set(e);
}
if (nextWriteBatch != null) {
nextWriteBatch.latch.countDown();
nextWriteBatch.exception.set(e);
}
}
} catch (InterruptedException e) {
} finally {
try {
if (file != null) {
dataFile.closeRandomAccessFile(file);
dataFile = null;
file.close();
file = null;
}
if (channel != null) {
channel.close();
channel = null;
}
} catch (IOException e) {
}
shutdownDone.countDown();
running = false;
}
}