mirror of
https://github.com/apache/activemq.git
synced 2025-02-28 05:09:07 +00:00
merging 882126 - https://issues.apache.org/activemq/browse/AMQ-2042
git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@882579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
120a7a10b0
commit
7a97edc865
@ -21,6 +21,7 @@ import java.io.InterruptedIOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.util.DataByteArrayOutputStream;
|
||||
@ -48,7 +49,7 @@ class DataFileAppender {
|
||||
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
|
||||
protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
|
||||
|
||||
private boolean running;
|
||||
protected boolean running;
|
||||
private Thread thread;
|
||||
|
||||
public static class WriteKey {
|
||||
@ -82,6 +83,7 @@ class DataFileAppender {
|
||||
public final WriteCommand first;
|
||||
public final CountDownLatch latch = new CountDownLatch(1);
|
||||
public int size;
|
||||
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
|
||||
|
||||
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
|
||||
this.dataFile = dataFile;
|
||||
@ -179,6 +181,10 @@ class DataFileAppender {
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
IOException exception = batch.exception.get();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
return location;
|
||||
@ -216,9 +222,6 @@ class DataFileAppender {
|
||||
if (shutdown) {
|
||||
throw new IOException("Async Writter Thread Shutdown");
|
||||
}
|
||||
if (firstAsyncException != null) {
|
||||
throw firstAsyncException;
|
||||
}
|
||||
|
||||
if (!running) {
|
||||
running = true;
|
||||
@ -231,6 +234,11 @@ class DataFileAppender {
|
||||
thread.setDaemon(true);
|
||||
thread.setName("ActiveMQ Data File Writer");
|
||||
thread.start();
|
||||
firstAsyncException = null;
|
||||
}
|
||||
|
||||
if (firstAsyncException != null) {
|
||||
throw firstAsyncException;
|
||||
}
|
||||
|
||||
if (nextWriteBatch == null) {
|
||||
@ -298,6 +306,7 @@ class DataFileAppender {
|
||||
protected void processQueue() {
|
||||
DataFile dataFile = null;
|
||||
RandomAccessFile file = null;
|
||||
WriteBatch wb = null;
|
||||
try {
|
||||
|
||||
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
|
||||
@ -321,7 +330,7 @@ class DataFileAppender {
|
||||
enqueueMutex.notify();
|
||||
}
|
||||
|
||||
WriteBatch wb = (WriteBatch)o;
|
||||
wb = (WriteBatch)o;
|
||||
if (dataFile != wb.dataFile) {
|
||||
if (file != null) {
|
||||
dataFile.closeRandomAccessFile(file);
|
||||
@ -406,6 +415,14 @@ class DataFileAppender {
|
||||
} catch (IOException e) {
|
||||
synchronized (enqueueMutex) {
|
||||
firstAsyncException = e;
|
||||
if (wb != null) {
|
||||
wb.latch.countDown();
|
||||
wb.exception.set(e);
|
||||
}
|
||||
if (nextWriteBatch != null) {
|
||||
nextWriteBatch.latch.countDown();
|
||||
nextWriteBatch.exception.set(e);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} finally {
|
||||
|
@ -47,6 +47,7 @@ class NIODataFileAppender extends DataFileAppender {
|
||||
DataFile dataFile = null;
|
||||
RandomAccessFile file = null;
|
||||
FileChannel channel = null;
|
||||
WriteBatch wb = null;
|
||||
|
||||
try {
|
||||
|
||||
@ -81,7 +82,7 @@ class NIODataFileAppender extends DataFileAppender {
|
||||
enqueueMutex.notify();
|
||||
}
|
||||
|
||||
WriteBatch wb = (WriteBatch)o;
|
||||
wb = (WriteBatch)o;
|
||||
if (dataFile != wb.dataFile) {
|
||||
if (file != null) {
|
||||
dataFile.closeRandomAccessFile(file);
|
||||
@ -180,16 +181,32 @@ class NIODataFileAppender extends DataFileAppender {
|
||||
} catch (IOException e) {
|
||||
synchronized (enqueueMutex) {
|
||||
firstAsyncException = e;
|
||||
if (wb != null) {
|
||||
wb.latch.countDown();
|
||||
wb.exception.set(e);
|
||||
}
|
||||
if (nextWriteBatch != null) {
|
||||
nextWriteBatch.latch.countDown();
|
||||
nextWriteBatch.exception.set(e);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} finally {
|
||||
try {
|
||||
if (file != null) {
|
||||
dataFile.closeRandomAccessFile(file);
|
||||
dataFile = null;
|
||||
file.close();
|
||||
file = null;
|
||||
}
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
channel = null;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
}
|
||||
shutdownDone.countDown();
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user