mirror of https://github.com/apache/activemq.git
Fix threading issue which was causing a memory leak. Basically the writes were getting added to the in flight map
after they were enqueued to get written and in that time span they could be been processed and the remove() would occur before the add witch caused the leak. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@652562 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
935ab765ac
commit
4512f27fdd
|
@ -167,6 +167,9 @@ class DataFileAppender {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
// Find the position where this item will land at.
|
// Find the position where this item will land at.
|
||||||
DataFile dataFile = dataManager.allocateLocation(location);
|
DataFile dataFile = dataManager.allocateLocation(location);
|
||||||
|
if( !sync ) {
|
||||||
|
inflightWrites.put(new WriteKey(location), write);
|
||||||
|
}
|
||||||
batch = enqueue(dataFile, write);
|
batch = enqueue(dataFile, write);
|
||||||
}
|
}
|
||||||
location.setLatch(batch.latch);
|
location.setLatch(batch.latch);
|
||||||
|
@ -176,8 +179,6 @@ class DataFileAppender {
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new InterruptedIOException();
|
throw new InterruptedIOException();
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
inflightWrites.put(new WriteKey(location), write);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return location;
|
return location;
|
||||||
|
@ -201,10 +202,10 @@ class DataFileAppender {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
// Find the position where this item will land at.
|
// Find the position where this item will land at.
|
||||||
DataFile dataFile = dataManager.allocateLocation(location);
|
DataFile dataFile = dataManager.allocateLocation(location);
|
||||||
|
inflightWrites.put(new WriteKey(location), write);
|
||||||
batch = enqueue(dataFile, write);
|
batch = enqueue(dataFile, write);
|
||||||
}
|
}
|
||||||
location.setLatch(batch.latch);
|
location.setLatch(batch.latch);
|
||||||
inflightWrites.put(new WriteKey(location), write);
|
|
||||||
|
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue