Fix concurrency bug in IMC that could lead to negative total indexing bytes

This commit is contained in:
Mike McCandless 2016-05-14 18:47:26 -04:00
parent c183e4b6eb
commit ded8b400b0
1 changed files with 16 additions and 8 deletions

View File

@ -251,21 +251,29 @@ public class IndexingMemoryController extends AbstractComponent implements Index
/** Shard calls this on each indexing/delete op */ /** Shard calls this on each indexing/delete op */
public void bytesWritten(int bytes) { public void bytesWritten(int bytes) {
long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes); long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
assert totalBytes >= 0 ;
while (totalBytes > indexingBuffer.bytes()/30) { while (totalBytes > indexingBuffer.bytes()/30) {
if (runLock.tryLock()) { if (runLock.tryLock()) {
try { try {
// Must pull this again because it may have changed since we first checked:
totalBytes = bytesWrittenSinceCheck.get();
if (totalBytes > indexingBuffer.bytes()/30) {
bytesWrittenSinceCheck.addAndGet(-totalBytes); bytesWrittenSinceCheck.addAndGet(-totalBytes);
// NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
// typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
// thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
// processed by indexing: // processed by indexing:
runUnlocked(); runUnlocked();
}
} finally { } finally {
runLock.unlock(); runLock.unlock();
} }
// Could be while we were checking, more bytes arrived:
totalBytes = bytesWrittenSinceCheck.addAndGet(bytes); // Must get it again since other threads could have increased it while we were in runUnlocked
totalBytes = bytesWrittenSinceCheck.get();
} else { } else {
// Another thread beat us to it: let them do all the work, yay!
break; break;
} }
} }