Use IndexWriter.getFlushingBytes() rather than tracking it ourselves (#33582)

Currently we keep track of how many bytes are currently being written to disk
in an AtomicLong within InternalEngine, updating it on refresh. The IndexWriter
has its own accounting for this, and exposes it via a getFlushingBytes method
in the latest lucene 8 snapshot. This commit removes the InternalEngine tracking
in favour of just using the IndexWriter method.
This commit is contained in:
Alan Woodward 2018-09-11 13:38:44 +01:00 committed by GitHub
parent ad4b5e4270
commit 36bdad4895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 12 deletions

View File

@ -152,12 +152,6 @@ public class InternalEngine extends Engine {
private final SoftDeletesPolicy softDeletesPolicy; private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
* being indexed/deleted.
*/
private final AtomicLong writingBytes = new AtomicLong();
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
@Nullable @Nullable
@ -530,7 +524,7 @@ public class InternalEngine extends Engine {
/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */ /** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
@Override @Override
public long getWritingBytes() { public long getWritingBytes() {
return writingBytes.get(); return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
} }
/** /**
@ -1437,9 +1431,6 @@ public class InternalEngine extends Engine {
// pass the new reader reference to the external reader manager. // pass the new reader reference to the external reader manager.
final long localCheckpointBeforeRefresh = getLocalCheckpoint(); final long localCheckpointBeforeRefresh = getLocalCheckpoint();
// this will also cause version map ram to be freed hence we always account for it.
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
writingBytes.addAndGet(bytes);
try (ReleasableLock lock = readLock.acquire()) { try (ReleasableLock lock = readLock.acquire()) {
ensureOpen(); ensureOpen();
if (store.tryIncRef()) { if (store.tryIncRef()) {
@ -1465,8 +1456,6 @@ public class InternalEngine extends Engine {
e.addSuppressed(inner); e.addSuppressed(inner);
} }
throw new RefreshFailedEngineException(shardId, e); throw new RefreshFailedEngineException(shardId, e);
} finally {
writingBytes.addAndGet(-bytes);
} }
assert lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh : "refresh checkpoint was not advanced; " + assert lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh : "refresh checkpoint was not advanced; " +
"local_checkpoint=" + localCheckpointBeforeRefresh + " refresh_checkpoint=" + lastRefreshedCheckpoint(); "local_checkpoint=" + localCheckpointBeforeRefresh + " refresh_checkpoint=" + lastRefreshedCheckpoint();

View File

@ -434,6 +434,14 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
return maps.current.ramBytesUsed.get(); return maps.current.ramBytesUsed.get();
} }
/**
* Returns how much RAM is current being freed up by refreshing. This is {@link #ramBytesUsed()}
* except does not include tombstones because they don't clear on refresh.
*/
long getRefreshingBytes() {
return maps.old.ramBytesUsed.get();
}
@Override @Override
public Collection<Accountable> getChildResources() { public Collection<Accountable> getChildResources() {
// TODO: useful to break down RAM usage here? // TODO: useful to break down RAM usage here?

View File

@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class LiveVersionMapTests extends ESTestCase { public class LiveVersionMapTests extends ESTestCase {
@ -91,6 +92,19 @@ public class LiveVersionMapTests extends ESTestCase {
assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, tolerance); assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, tolerance);
} }
public void testRefreshingBytes() throws IOException {
LiveVersionMap map = new LiveVersionMap();
BytesRefBuilder uid = new BytesRefBuilder();
uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20));
try (Releasable r = map.acquireLock(uid.toBytesRef())) {
map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue());
}
map.beforeRefresh();
assertThat(map.getRefreshingBytes(), greaterThan(0L));
map.afterRefresh(true);
assertThat(map.getRefreshingBytes(), equalTo(0L));
}
private BytesRef uid(String string) { private BytesRef uid(String string) {
BytesRefBuilder builder = new BytesRefBuilder(); BytesRefBuilder builder = new BytesRefBuilder();
builder.copyChars(string); builder.copyChars(string);