This commit is contained in:
Michael McCandless 2015-10-03 05:09:07 -04:00 committed by mikemccand
parent aa4a63354b
commit f27c0adb0b
7 changed files with 166 additions and 175 deletions

View File

@ -361,6 +361,9 @@ public abstract class Engine implements Closeable {
stats.addIndexWriterMaxMemoryInBytes(0);
}
/** How much heap Lucene's IndexWriter is using */
abstract public long indexWriterRAMBytesUsed();
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
ensureOpen();
Map<String, Segment> segments = new HashMap<>();
@ -618,6 +621,11 @@ public abstract class Engine implements Closeable {
Type opType();
Origin origin();
/**
* Returns operation start time in nanoseconds.
*/
long startTime();
}
public static abstract class IndexingOperation implements Operation {
@ -711,9 +719,7 @@ public abstract class Engine implements Closeable {
return this.doc.source();
}
/**
* Returns operation start time in nanoseconds.
*/
@Override
public long startTime() {
return this.startTime;
}
@ -853,9 +859,7 @@ public abstract class Engine implements Closeable {
return this.found;
}
/**
* Returns operation start time in nanoseconds.
*/
@Override
public long startTime() {
return this.startTime;
}

View File

@ -902,6 +902,11 @@ public class InternalEngine extends Engine {
stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB() * 1024 * 1024));
}
@Override
public long indexWriterRAMBytesUsed() {
return indexWriter.ramBytesUsed();
}
@Override
public List<Segment> segments(boolean verbose) {
try (ReleasableLock lock = readLock.acquire()) {

View File

@ -244,4 +244,9 @@ public class ShadowEngine extends Engine {
return lastCommittedSegmentInfos;
}
@Override
public long indexWriterRAMBytesUsed() {
// No IndexWriter
return 0L;
}
}

View File

@ -86,8 +86,8 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
@ -103,6 +103,7 @@ import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
@ -120,6 +121,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener {
private final ThreadPool threadPool;
@ -192,13 +194,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
private final AtomicBoolean active = new AtomicBoolean();
private volatile long lastWriteNS;
private final IndexingMemoryController indexingMemoryController;
@Inject
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
IndicesQueryCache indicesQueryCache, CodecService codecService,
TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory,
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) {
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService,
IndexingMemoryController indexingMemoryController) {
super(shardId, indexSettings);
this.codecService = codecService;
this.warmer = warmer;
@ -252,6 +259,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.indexingMemoryController = indexingMemoryController;
// TODO: can we somehow call IMC.forceCheck here? Since we just became active, it can divvy up the RAM
active.set(true);
}
public Store store() {
@ -447,7 +458,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
public void create(Engine.Create create) {
writeAllowed(create.origin());
ensureWriteAllowed(create);
create = indexingService.preCreate(create);
try {
if (logger.isTraceEnabled()) {
@ -485,7 +496,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
* updated.
*/
public boolean index(Engine.Index index) {
writeAllowed(index.origin());
ensureWriteAllowed(index);
index = indexingService.preIndex(index);
final boolean created;
try {
@ -509,7 +520,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
public void delete(Engine.Delete delete) {
writeAllowed(delete.origin());
ensureWriteAllowed(delete);
delete = indexingService.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
@ -913,7 +924,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
}
private void writeAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
/** Returns timestamp of last indexing operation */
public long getLastWriteNS() {
return lastWriteNS;
}
private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
if (active.getAndSet(true) == false) {
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
indexingMemoryController.forceCheck();
}
lastWriteNS = op.startTime();
Engine.Operation.Origin origin = op.origin();
IndexShardState state = this.state; // one time volatile read
if (origin == Engine.Operation.Origin.PRIMARY) {
@ -975,7 +999,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
this.failedEngineListener.delegates.add(failedEngineListener);
}
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
/** Returns true if the indexing buffer size did change */
public boolean updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final EngineConfig config = engineConfig;
final ByteSizeValue preValue = config.getIndexingBufferSize();
@ -985,26 +1010,28 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
Engine engine = engineUnsafe();
if (engine == null) {
logger.debug("updateBufferSize: engine is closed; skipping");
return;
return false;
}
// update engine if it is already started.
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// so we push changes these changes down to IndexWriter:
engine.onSettingsChanged();
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);
if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
// it's inactive: make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize);
long iwBytesUsed = engine.indexWriterRAMBytesUsed();
if (shardIndexingBufferSize.bytes() < iwBytesUsed) {
// our allowed buffer was changed to less than we are currently using; we ask IW to refresh
// so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
logger.debug("refresh because index buffer decreased to [{}] and IndexWriter is now using [{}] bytes",
shardIndexingBufferSize, iwBytesUsed);
// TODO: should IW have an API to move segments to disk, but not refresh?
try {
refresh("update index buffer");
} catch (Throwable e) {
logger.warn("failed to refresh after setting shard to inactive", e);
logger.warn("failed to refresh after decreasing index buffer", e);
}
} else {
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);
}
}
@ -1012,8 +1039,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
}
public void markAsInactive() {
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, TranslogConfig.INACTIVE_SHARD_TRANSLOG_BUFFER);
indicesLifecycle.onShardInactive(this);
if (active.getAndSet(false)) {
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, TranslogConfig.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");
indicesLifecycle.onShardInactive(this);
}
}
public boolean getActive() {
return active.get();
}
public final boolean isFlushOnClose() {

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.index.shard;
import java.io.IOException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
@ -27,10 +29,10 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.IndexSearcherWrappingService;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
@ -43,10 +45,9 @@ import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
/**
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
* from the primary when a flush happens. It also ensures that a replica being
@ -65,13 +66,15 @@ public final class ShadowIndexShard extends IndexShard {
@Nullable IndicesWarmer warmer,
SimilarityService similarityService,
EngineFactory factory,
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService,
IndexingMemoryController indexingMemoryController) throws IOException {
super(shardId, indexSettings, indicesLifecycle, store,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, codecService,
termVectorsService, indexFieldDataService,
warmer, similarityService,
factory, path, bigArrays, wrappingService);
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, codecService,
termVectorsService, indexFieldDataService,
warmer, similarityService,
factory, path, bigArrays, wrappingService,
indexingMemoryController);
}
/**

View File

@ -42,9 +42,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
/**
*
*/
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
@ -175,7 +172,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
@Override
protected void doStart() {
// its fine to run it on the scheduler thread, no busy work
// it's fine to run it on the scheduler thread, no busy work
this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval);
}
@ -234,8 +231,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
protected IndexShard getShard(ShardId shardId) {
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.shard(shardId.id());
return indexShard;
return indexService.shard(shardId.id());
}
return null;
}
@ -255,49 +251,43 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
}
protected boolean isShardInactive(ShardId shardId, long inactiveTimeNS) {
final IndexShard shard = getShard(shardId);
if (shard == null) {
return false;
}
return currentTimeInNanos() - shard.getLastWriteNS() >= inactiveTimeNS;
}
/** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */
protected ShardIndexingStatus getTranslogStatus(ShardId shardId) {
protected Boolean getShardActive(ShardId shardId) {
final IndexShard indexShard = getShard(shardId);
if (indexShard == null) {
return null;
}
final Translog translog;
try {
translog = indexShard.engine().getTranslog();
} catch (EngineClosedException e) {
// not ready yet to be checked for activity
return null;
}
ShardIndexingStatus status = new ShardIndexingStatus();
status.translogId = translog.currentFileGeneration();
status.translogNumberOfOperations = translog.totalOperations();
return status;
return indexShard.getActive();
}
// used for tests
void forceCheck() {
/** Check if any shards active status changed, now. */
public void forceCheck() {
statusChecker.run();
}
class ShardsIndicesStatusChecker implements Runnable {
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = new HashMap<>();
// True if the shard was active last time we checked
private final Map<ShardId,Boolean> shardWasActive = new HashMap<>();
@Override
public void run() {
public synchronized void run() {
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
final List<ShardId> activeToInactiveIndexingShards = new ArrayList<>();
final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards);
for (ShardId indexShard : activeToInactiveIndexingShards) {
markShardAsInactive(indexShard);
}
final int activeShardCount = updateShardStatuses(changes);
if (changes.isEmpty() == false) {
// Something changed: recompute indexing buffers:
calcAndSetShardBuffers(activeShards, "[" + changes + "]");
calcAndSetShardBuffers(activeShardCount, "[" + changes + "]");
}
}
@ -306,43 +296,48 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
*
* @return the current count of active shards
*/
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<ShardId> activeToInactiveIndexingShards) {
int activeShards = 0;
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes) {
int activeShardCount = 0;
for (ShardId shardId : availableShards()) {
final ShardIndexingStatus currentStatus = getTranslogStatus(shardId);
// Is the shard active now?
Boolean isActive = getShardActive(shardId);
if (currentStatus == null) {
if (isActive == null) {
// shard was closed..
continue;
} else if (isActive) {
activeShardCount++;
}
ShardIndexingStatus status = shardsIndicesStatus.get(shardId);
if (status == null) {
status = currentStatus;
shardsIndicesStatus.put(shardId, status);
// Was the shard active last time we checked?
Boolean wasActive = shardWasActive.get(shardId);
if (wasActive == null) {
// First time we are seeing this shard
shardWasActive.put(shardId, isActive);
changes.add(ShardStatusChangeType.ADDED);
} else {
final boolean lastActiveIndexing = status.activeIndexing;
status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos());
if (lastActiveIndexing && (status.activeIndexing == false)) {
activeToInactiveIndexingShards.add(shardId);
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]",
shardId,
inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
} else if ((lastActiveIndexing == false) && status.activeIndexing) {
} else if (isActive) {
// Shard is active now
if (wasActive == false) {
// Shard became active itself, since we last checked (due to new indexing op arriving)
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
logger.debug("marking shard {} as active indexing wise", shardId);
}
}
shardWasActive.put(shardId, true);
if (status.activeIndexing) {
activeShards++;
} else if (isShardInactive(shardId, inactiveTime.nanos())) {
// Make shard inactive now
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
shardId,
inactiveTime);
markShardAsInactive(shardId);
shardWasActive.put(shardId, false);
}
}
}
return activeShards;
return activeShardCount;
}
/**
@ -353,7 +348,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
Iterator<ShardId> statusShardIdIterator = shardsIndicesStatus.keySet().iterator();
Iterator<ShardId> statusShardIdIterator = shardWasActive.keySet().iterator();
while (statusShardIdIterator.hasNext()) {
ShardId shardId = statusShardIdIterator.next();
if (shardAvailable(shardId) == false) {
@ -364,12 +359,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return changes;
}
private void calcAndSetShardBuffers(int activeShards, String reason) {
if (activeShards == 0) {
private void calcAndSetShardBuffers(int activeShardCount, String reason) {
// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
// get the same indexing buffer as large indices. But it quickly gets tricky...
if (activeShardCount == 0) {
logger.debug("no active shards (reason={})", reason);
return;
}
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards);
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShardCount);
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = minShardIndexBufferSize;
}
@ -377,7 +375,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
shardIndexingBufferSize = maxShardIndexBufferSize;
}
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShards);
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShardCount);
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = minShardTranslogBufferSize;
}
@ -385,10 +383,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
shardTranslogBufferSize = maxShardTranslogBufferSize;
}
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize);
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
for (ShardId shardId : availableShards()) {
ShardIndexingStatus status = shardsIndicesStatus.get(shardId);
if (status == null || status.activeIndexing) {
if (shardWasActive.get(shardId) == Boolean.TRUE) {
updateShardBuffers(shardId, shardIndexingBufferSize, shardTranslogBufferSize);
}
}
@ -424,42 +422,4 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private static enum ShardStatusChangeType {
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}
static class ShardIndexingStatus {
long translogId = -1;
long translogNumberOfOperations = -1;
boolean activeIndexing = true;
long idleSinceNanoTime = -1; // contains the first time we saw this shard with no operations done on it
/** update status based on a new sample. updates all internal variables */
public void updateWith(long currentNanoTime, ShardIndexingStatus current, long inactiveNanoInterval) {
final boolean idle = (translogId == current.translogId && translogNumberOfOperations == current.translogNumberOfOperations);
if (activeIndexing && idle) {
// no indexing activity detected.
if (idleSinceNanoTime < 0) {
// first time we see this, start the clock.
idleSinceNanoTime = currentNanoTime;
} else if ((currentNanoTime - idleSinceNanoTime) > inactiveNanoInterval) {
// shard is inactive. mark it as such.
activeIndexing = false;
}
} else if (activeIndexing == false // we weren't indexing before
&& idle == false // but we do now
&& current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow
) {
// since we sync flush once a shard becomes inactive, the translog id can change, however that
// doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens
// immediately after an indexing operation we may not become active immediately. The following
// indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay
// inactive
activeIndexing = true;
idleSinceNanoTime = -1;
}
translogId = current.translogId;
translogNumberOfOperations = current.translogNumberOfOperations;
}
}
}

View File

@ -39,12 +39,11 @@ public class IndexingMemoryControllerTests extends ESTestCase {
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
final Map<ShardId, Long> translogIds = new HashMap<>();
final Map<ShardId, Long> translogOps = new HashMap<>();
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
public MockController(Settings settings) {
@ -56,18 +55,7 @@ public class IndexingMemoryControllerTests extends ESTestCase {
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
}
public void incTranslog(ShardId shard1, int id, int ops) {
setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops);
}
public void setTranslog(ShardId id, long translogId, long ops) {
translogIds.put(id, translogId);
translogOps.put(id, ops);
}
public void deleteShard(ShardId id) {
translogIds.remove(id);
translogOps.remove(id);
indexingBuffers.remove(id);
translogBuffers.remove(id);
}
@ -94,12 +82,12 @@ public class IndexingMemoryControllerTests extends ESTestCase {
@Override
protected List<ShardId> availableShards() {
return new ArrayList<>(translogIds.keySet());
return new ArrayList<>(indexingBuffers.keySet());
}
@Override
protected boolean shardAvailable(ShardId shardId) {
return translogIds.containsKey(shardId);
return indexingBuffers.containsKey(shardId);
}
@Override
@ -109,14 +97,8 @@ public class IndexingMemoryControllerTests extends ESTestCase {
}
@Override
protected ShardIndexingStatus getTranslogStatus(ShardId shardId) {
if (!shardAvailable(shardId)) {
return null;
}
ShardIndexingStatus status = new ShardIndexingStatus();
status.translogId = translogIds.get(shardId);
status.translogNumberOfOperations = translogOps.get(shardId);
return status;
protected Boolean getShardActive(ShardId shardId) {
return INACTIVE.equals(indexingBuffers.get(shardId));
}
@Override
@ -125,12 +107,17 @@ public class IndexingMemoryControllerTests extends ESTestCase {
translogBuffers.put(shardId, shardTranslogBufferSize);
}
@Override
protected boolean isShardInactive(ShardId shardId, long inactiveTimeNS) {
return currentTimeInNanos() - lastIndexTimeNanos.get(shardId) >= inactiveTimeNS;
}
public void incrementTimeSec(int sec) {
currentTimeSec += sec;
}
public void simulateFlush(ShardId shard) {
setTranslog(shard, translogIds.get(shard) + 1, 0);
public void simulateIndexing(ShardId shardId) {
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
}
}
@ -139,13 +126,13 @@ public class IndexingMemoryControllerTests extends ESTestCase {
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
final ShardId shard1 = new ShardId("test", 1);
controller.setTranslog(shard1, randomInt(10), randomInt(10));
controller.simulateIndexing(shard1);
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
// add another shard
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, randomInt(10), randomInt(10));
controller.simulateIndexing(shard2);
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
@ -161,7 +148,7 @@ public class IndexingMemoryControllerTests extends ESTestCase {
// add a new one
final ShardId shard3 = new ShardId("test", 3);
controller.setTranslog(shard3, randomInt(10), randomInt(10));
controller.simulateIndexing(shard3);
controller.forceCheck();
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
}
@ -174,16 +161,16 @@ public class IndexingMemoryControllerTests extends ESTestCase {
.build());
final ShardId shard1 = new ShardId("test", 1);
controller.setTranslog(shard1, 0, 0);
controller.simulateIndexing(shard1);
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, 0, 0);
controller.simulateIndexing(shard2);
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// index into both shards, move the clock and see that they are still active
controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1);
controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2));
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard2);
// the controller doesn't know when the ops happened, so even if this is more
// than the inactive time the shard is still marked as active
controller.incrementTimeSec(10);
@ -192,7 +179,7 @@ public class IndexingMemoryControllerTests extends ESTestCase {
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
// index into one shard only, see other shard is made inactive correctly
controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1);
controller.simulateIndexing(shard1);
controller.forceCheck(); // register what happened with the controller (shard is still active)
controller.incrementTimeSec(3); // increment but not enough
controller.forceCheck();
@ -204,15 +191,8 @@ public class IndexingMemoryControllerTests extends ESTestCase {
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
controller.assertInActive(shard2);
if (randomBoolean()) {
// once a shard gets inactive it will be synced flushed and a new translog generation will be made
controller.simulateFlush(shard2);
controller.forceCheck();
controller.assertInActive(shard2);
}
// index some and shard becomes immediately active
controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0
controller.simulateIndexing(shard2);
controller.forceCheck();
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
@ -273,9 +253,9 @@ public class IndexingMemoryControllerTests extends ESTestCase {
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
final ShardId shard1 = new ShardId("test", 1);
controller.setTranslog(shard1, 0, 0);
controller.simulateIndexing(shard1);
final ShardId shard2 = new ShardId("test", 2);
controller.setTranslog(shard2, 0, 0);
controller.simulateIndexing(shard2);
controller.forceCheck();
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);