fix org.apache.kahadb.journal.JournalTest.testBatchWriteCompleteAfterClose() after changes for AMQ-2143, the file and offset in a location are set after an enqueue to a write batch, so doing the inflight add just overwrote the same entry, -1:-1. Which fixed the leak but broke inflights, proper fix is to do the inflight addition in enqueue, just after the location details are known and with a lock on appender enqueue

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@748475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-27 10:22:50 +00:00
parent 3266019825
commit d458a5d17f
1 changed files with 8 additions and 10 deletions

View File

@ -175,9 +175,6 @@ class DataFileAppender {
// assigned // assigned
// by the data manager (which is basically just appending) // by the data manager (which is basically just appending)
if (!sync) {
inflightWrites.put(new WriteKey(location), write);
}
synchronized (this) { synchronized (this) {
batch = enqueue(write); batch = enqueue(write);
} }
@ -190,7 +187,6 @@ class DataFileAppender {
} }
} }
return location; return location;
} }
@ -208,14 +204,13 @@ class DataFileAppender {
synchronized (this) { synchronized (this) {
batch = enqueue(write); batch = enqueue(write);
} }
inflightWrites.put(new WriteKey(location), write);
location.setLatch(batch.latch); location.setLatch(batch.latch);
return location; return location;
} }
private WriteBatch enqueue(WriteCommand write) throws IOException { private WriteBatch enqueue(WriteCommand write) throws IOException {
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
WriteBatch rc = null;
if (shutdown) { if (shutdown) {
throw new IOException("Async Writter Thread Shutdown"); throw new IOException("Async Writter Thread Shutdown");
} }
@ -244,14 +239,13 @@ class DataFileAppender {
} }
nextWriteBatch = new WriteBatch(file, file.getLength(), write); nextWriteBatch = new WriteBatch(file, file.getLength(), write);
rc = nextWriteBatch;
enqueueMutex.notify(); enqueueMutex.notify();
return rc; break;
} else { } else {
// Append to current batch if possible.. // Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) { if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write); nextWriteBatch.append(write);
return nextWriteBatch; break;
} else { } else {
// Otherwise wait for the queuedCommand to be null // Otherwise wait for the queuedCommand to be null
try { try {
@ -267,6 +261,10 @@ class DataFileAppender {
} }
} }
} }
if (!write.sync) {
inflightWrites.put(new WriteKey(write.location), write);
}
return nextWriteBatch;
} }
} }