fold in feedback

This commit is contained in:
Michael McCandless 2016-01-05 09:53:13 -05:00 committed by mikemccand
parent 485f4171bb
commit 99d6ec53fa
6 changed files with 105 additions and 83 deletions

View File

@ -375,7 +375,7 @@ public abstract class Engine implements Closeable {
}
/** How much heap is used that would be freed by a refresh. Note that this may throw {@link AlreadyClosedException}. */
abstract public long indexBufferRAMBytesUsed();
abstract public long getIndexBufferRAMBytesUsed();
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
ensureOpen();

View File

@ -522,11 +522,11 @@ public class InternalEngine extends Engine {
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
// refresh API), and another for version map interactions:
long versionMapBytes = versionMap.ramBytesUsedForRefresh();
long indexingBufferBytes = indexWriter.ramBytesUsed();
// refresh API), and another for version map interactions. See #15768.
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
final long indexingBufferBytes = indexWriter.ramBytesUsed();
boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
final boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
if (useRefresh) {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
@ -823,7 +823,7 @@ public class InternalEngine extends Engine {
}
@Override
public long indexBufferRAMBytesUsed() {
public long getIndexBufferRAMBytesUsed() {
return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
}
@ -1050,7 +1050,7 @@ public class InternalEngine extends Engine {
@Override
public void activateThrottling() {
int count = throttleRequestCount.incrementAndGet();
assert count >= 1;
assert count >= 1: "invalid post-increment throttleRequestCount=" + count;
if (count == 1) {
throttle.activate();
}
@ -1059,7 +1059,7 @@ public class InternalEngine extends Engine {
@Override
public void deactivateThrottling() {
int count = throttleRequestCount.decrementAndGet();
assert count >= 0;
assert count >= 0: "invalid post-decrement throttleRequestCount=" + count;
if (count == 0) {
throttle.deactivate();
}

View File

@ -227,7 +227,7 @@ public class ShadowEngine extends Engine {
}
@Override
public long indexBufferRAMBytesUsed() {
public long getIndexBufferRAMBytesUsed() {
// No IndexWriter nor version map
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}

View File

@ -19,6 +19,18 @@
package org.elasticsearch.index.shard;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.index.*;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
@ -81,8 +93,8 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
@ -104,17 +116,6 @@ import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class IndexShard extends AbstractIndexShardComponent {
@ -148,6 +149,9 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexSettings idxSettings;
private final NodeServicesProvider provider;
/** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh */
private final AtomicLong writingBytes = new AtomicLong();
private TimeValue refreshInterval;
private volatile ScheduledFuture<?> refreshScheduledFuture;
@ -542,15 +546,17 @@ public class IndexShard extends AbstractIndexShardComponent {
public void refresh(String source) {
verifyNotClosed();
if (canIndex()) {
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
long bytes = getEngine().getIndexBufferRAMBytesUsed();
writingBytes.addAndGet(bytes);
try {
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed));
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
indexingMemoryController.removeWritingBytes(this, ramBytesUsed);
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
// nocommit but we don't promptly stop index throttling anymore?
writingBytes.addAndGet(-bytes);
}
} else {
logger.debug("refresh with source [{}]", source);
@ -560,6 +566,11 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
/** Returns how many bytes we are currently moving from heap to disk */
public long getWritingBytes() {
return writingBytes.get();
}
public RefreshStats refreshStats() {
return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()));
}
@ -1008,7 +1019,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return 0;
}
try {
return engine.indexBufferRAMBytesUsed();
return engine.getIndexBufferRAMBytesUsed();
} catch (AlreadyClosedException ex) {
return 0;
}
@ -1262,15 +1273,18 @@ public class IndexShard extends AbstractIndexShardComponent {
public void run() {
try {
Engine engine = getEngine();
long bytes = engine.indexBufferRAMBytesUsed();
long bytes = engine.getIndexBufferRAMBytesUsed();
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
// there's still up to the 20% being used and continue writing if necessary:
indexingMemoryController.addWritingBytes(IndexShard.this, bytes);
writingBytes.addAndGet(bytes);
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
try {
getEngine().writeIndexingBuffer();
} finally {
indexingMemoryController.removeWritingBytes(IndexShard.this, bytes);
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
// nocommit but we don't promptly stop index throttling anymore?
writingBytes.addAndGet(-bytes);
}
} catch (Exception e) {
handleRefreshException(e);

View File

@ -23,6 +23,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -81,9 +82,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final ShardsIndicesStatusChecker statusChecker;
/** Maps each shard to how many bytes it is currently, asynchronously, writing to disk */
private final Map<IndexShard,Long> writingBytes = new ConcurrentHashMap<>();
@Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
@ -126,21 +124,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
}
/** Shard calls this when it starts writing its indexing buffer to disk to notify us */
public void addWritingBytes(IndexShard shard, long numBytes) {
writingBytes.put(shard, numBytes);
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(numBytes), shard.shardId());
}
/** Shard calls when it's done writing these bytes to disk */
public void removeWritingBytes(IndexShard shard, long numBytes) {
writingBytes.remove(shard);
logger.debug("clear [{}] writing bytes for shard [{}]", new ByteSizeValue(numBytes), shard.shardId());
// Since some bytes just freed up, now we check again to give throttling a chance to stop:
forceCheck();
}
@Override
protected void doStart() {
// it's fine to run it on the scheduler thread, no busy work
@ -184,12 +167,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return shard.getIndexBufferRAMBytesUsed();
}
/** returns how many bytes this shard is currently writing to disk */
protected long getShardWritingBytes(IndexShard shard) {
return shard.getWritingBytes();
}
/** ask this shard to refresh, in the background, to free up heap */
protected void writeIndexingBufferAsync(IndexShard shard) {
shard.writeIndexingBufferAsync();
}
/** used by tests to check if any shards active status changed, now. */
/** force checker to run now */
public void forceCheck() {
statusChecker.run();
}
@ -225,25 +213,42 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
}
class ShardsIndicesStatusChecker implements Runnable {
/** not static because we need access to many fields/methods from our containing class (IMC): */
final class ShardsIndicesStatusChecker implements Runnable {
long bytesWrittenSinceCheck;
final AtomicLong bytesWrittenSinceCheck = new AtomicLong();
final ReentrantLock runLock = new ReentrantLock();
/** Shard calls this on each indexing/delete op */
public synchronized void bytesWritten(int bytes) {
bytesWrittenSinceCheck += bytes;
if (bytesWrittenSinceCheck > indexingBuffer.bytes()/30) {
public void bytesWritten(int bytes) {
long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
if (totalBytes > indexingBuffer.bytes()/30) {
if (runLock.tryLock()) {
try {
bytesWrittenSinceCheck.addAndGet(-totalBytes);
// NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
// typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
// thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
// processed by indexing:
run();
} finally {
runLock.unlock();
}
}
}
}
@Override
public synchronized void run() {
public void run() {
runLock.lock();
try {
runUnlocked();
} finally {
runLock.unlock();
}
}
private void runUnlocked() {
// NOTE: even if we hit an errant exc here, our ThreadPool.scheduledWithFixedDelay will log the exception and re-invoke us
// again, on schedule
@ -257,12 +262,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
checkIdle(shard, inactiveTime.nanos());
// How many bytes this shard is currently (async'd) moving from heap to disk:
Long shardWritingBytes = writingBytes.get(shard);
long shardWritingBytes = getShardWritingBytes(shard);
// How many heap bytes this shard is currently using
long shardBytesUsed = getIndexBufferRAMBytesUsed(shard);
if (shardWritingBytes != null) {
shardBytesUsed -= shardWritingBytes;
totalBytesWriting += shardWritingBytes;
@ -271,7 +275,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
if (shardBytesUsed < 0) {
continue;
}
}
totalBytesUsed += shardBytesUsed;
}
@ -293,12 +296,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
for (IndexShard shard : availableShards()) {
// How many bytes this shard is currently (async'd) moving from heap to disk:
Long shardWritingBytes = writingBytes.get(shard);
long shardWritingBytes = getShardWritingBytes(shard);
// How many heap bytes this shard is currently using
long shardBytesUsed = getIndexBufferRAMBytesUsed(shard);
if (shardWritingBytes != null) {
// Only count up bytes not already being refreshed:
shardBytesUsed -= shardWritingBytes;
@ -307,11 +309,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
if (shardBytesUsed < 0) {
continue;
}
}
if (shardBytesUsed > 0) {
if (logger.isTraceEnabled()) {
if (shardWritingBytes != null) {
if (shardWritingBytes != 0) {
logger.trace("shard [{}] is using [{}] heap, writing [{}] heap", shard.shardId(), shardBytesUsed, shardWritingBytes);
} else {
logger.trace("shard [{}] is using [{}] heap, not writing any bytes", shard.shardId(), shardBytesUsed);
@ -341,8 +342,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
throttled.clear();
}
bytesWrittenSinceCheck = 0;
}
}

View File

@ -73,6 +73,16 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
return indexBufferRAMBytesUsed.get(shard) + writingBytes.get(shard);
}
@Override
protected long getShardWritingBytes(IndexShard shard) {
Long bytes = writingBytes.get(shard);
if (bytes == null) {
return 0;
} else {
return bytes;
}
}
@Override
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
}
@ -81,7 +91,6 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
public void writeIndexingBufferAsync(IndexShard shard) {
long bytes = indexBufferRAMBytesUsed.put(shard, 0L);
writingBytes.put(shard, writingBytes.get(shard) + bytes);
addWritingBytes(shard, bytes);
indexBufferRAMBytesUsed.put(shard, 0L);
}
@ -96,8 +105,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
}
public void doneWriting(IndexShard shard) {
long bytes = writingBytes.put(shard, 0L);
removeWritingBytes(shard, bytes);
writingBytes.put(shard, 0L);
}
public void assertBuffer(IndexShard shard, int expectedMB) {
@ -281,6 +289,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
// Both shards finally finish writing, and throttling should stop:
controller.doneWriting(shard0);
controller.doneWriting(shard1);
controller.forceCheck();
controller.assertNotThrottled(shard0);
controller.assertNotThrottled(shard1);
}