Merge pull request #15251 from jasontedor/stateless-indexing-memory-controller
IndexingMemoryController should not track shard index states
This commit is contained in:
commit
5a391f116b
|
@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
|||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -200,23 +199,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
return translogBuffer;
|
||||
}
|
||||
|
||||
|
||||
protected List<ShardId> availableShards() {
|
||||
ArrayList<ShardId> list = new ArrayList<>();
|
||||
protected List<IndexShard> availableShards() {
|
||||
List<IndexShard> activeShards = new ArrayList<>();
|
||||
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
if (shardAvailable(indexShard)) {
|
||||
list.add(indexShard.shardId());
|
||||
for (IndexShard shard : indexService) {
|
||||
if (shardAvailable(shard)) {
|
||||
activeShards.add(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/** returns true if shard exists and is availabe for updates */
|
||||
protected boolean shardAvailable(ShardId shardId) {
|
||||
return shardAvailable(getShard(shardId));
|
||||
return activeShards;
|
||||
}
|
||||
|
||||
/** returns true if shard exists and is availabe for updates */
|
||||
|
@ -225,19 +218,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
|
||||
}
|
||||
|
||||
/** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */
|
||||
protected IndexShard getShard(ShardId shardId) {
|
||||
IndexService indexService = indicesService.indexService(shardId.index().name());
|
||||
if (indexService != null) {
|
||||
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
|
||||
return indexShard;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** set new indexing and translog buffers on this shard. this may cause the shard to refresh to free up heap. */
|
||||
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||
final IndexShard shard = getShard(shardId);
|
||||
protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||
if (shard != null) {
|
||||
try {
|
||||
shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
|
@ -246,113 +228,33 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to set shard {} index buffer to [{}]", e, shardId, shardIndexingBufferSize);
|
||||
logger.warn("failed to set shard {} index buffer to [{}]", e, shard.shardId(), shardIndexingBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** returns {@link IndexShard#getActive} if the shard exists, else null */
|
||||
protected Boolean getShardActive(ShardId shardId) {
|
||||
final IndexShard indexShard = getShard(shardId);
|
||||
if (indexShard == null) {
|
||||
return null;
|
||||
}
|
||||
return indexShard.getActive();
|
||||
}
|
||||
|
||||
/** check if any shards active status changed, now. */
|
||||
public void forceCheck() {
|
||||
statusChecker.run();
|
||||
}
|
||||
|
||||
class ShardsIndicesStatusChecker implements Runnable {
|
||||
|
||||
// True if the shard was active last time we checked
|
||||
private final Map<ShardId,Boolean> shardWasActive = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
|
||||
|
||||
updateShardStatuses(changes);
|
||||
|
||||
if (changes.isEmpty() == false) {
|
||||
// Something changed: recompute indexing buffers:
|
||||
calcAndSetShardBuffers("[" + changes + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* goes through all existing shards and check whether there are changes in their active status
|
||||
*/
|
||||
private void updateShardStatuses(EnumSet<ShardStatusChangeType> changes) {
|
||||
for (ShardId shardId : availableShards()) {
|
||||
|
||||
// Is the shard active now?
|
||||
Boolean isActive = getShardActive(shardId);
|
||||
|
||||
if (isActive == null) {
|
||||
// shard was closed..
|
||||
continue;
|
||||
}
|
||||
|
||||
// Was the shard active last time we checked?
|
||||
Boolean wasActive = shardWasActive.get(shardId);
|
||||
if (wasActive == null) {
|
||||
// First time we are seeing this shard
|
||||
shardWasActive.put(shardId, isActive);
|
||||
changes.add(ShardStatusChangeType.ADDED);
|
||||
} else if (isActive) {
|
||||
// Shard is active now
|
||||
if (wasActive == false) {
|
||||
// Shard became active itself, since we last checked (due to new indexing op arriving)
|
||||
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
|
||||
logger.debug("marking shard {} as active indexing wise", shardId);
|
||||
shardWasActive.put(shardId, true);
|
||||
} else if (checkIdle(shardId) == Boolean.TRUE) {
|
||||
// Make shard inactive now
|
||||
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
|
||||
|
||||
shardWasActive.put(shardId, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* purge any existing statuses that are no longer updated
|
||||
*
|
||||
* @return the changes applied
|
||||
*/
|
||||
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
|
||||
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
|
||||
|
||||
Iterator<ShardId> statusShardIdIterator = shardWasActive.keySet().iterator();
|
||||
while (statusShardIdIterator.hasNext()) {
|
||||
ShardId shardId = statusShardIdIterator.next();
|
||||
if (shardAvailable(shardId) == false) {
|
||||
changes.add(ShardStatusChangeType.DELETED);
|
||||
statusShardIdIterator.remove();
|
||||
}
|
||||
}
|
||||
return changes;
|
||||
}
|
||||
|
||||
private void calcAndSetShardBuffers(String reason) {
|
||||
|
||||
// Count how many shards are now active:
|
||||
int activeShardCount = 0;
|
||||
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
|
||||
if (ent.getValue()) {
|
||||
activeShardCount++;
|
||||
List<IndexShard> availableShards = availableShards();
|
||||
List<IndexShard> activeShards = new ArrayList<>();
|
||||
for (IndexShard shard : availableShards) {
|
||||
if (!checkIdle(shard)) {
|
||||
activeShards.add(shard);
|
||||
}
|
||||
}
|
||||
int activeShardCount = activeShards.size();
|
||||
|
||||
// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
|
||||
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
|
||||
// get the same indexing buffer as large indices. But it quickly gets tricky...
|
||||
if (activeShardCount == 0) {
|
||||
logger.debug("no active shards (reason={})", reason);
|
||||
logger.debug("no active shards");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -372,13 +274,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
shardTranslogBufferSize = maxShardTranslogBufferSize;
|
||||
}
|
||||
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
logger.debug("recalculating shard indexing buffer, total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
|
||||
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
|
||||
if (ent.getValue()) {
|
||||
// This shard is active
|
||||
updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
}
|
||||
for (IndexShard shard : activeShards) {
|
||||
updateShardBuffers(shard, shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -389,14 +288,13 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
|
||||
/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
|
||||
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
|
||||
protected Boolean checkIdle(ShardId shardId) {
|
||||
String ignoreReason; // eclipse compiler does not know it is really final
|
||||
final IndexShard shard = getShard(shardId);
|
||||
protected Boolean checkIdle(IndexShard shard) {
|
||||
String ignoreReason = null; // eclipse compiler does not know it is really final
|
||||
if (shard != null) {
|
||||
try {
|
||||
if (shard.checkIdle()) {
|
||||
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
|
||||
shardId,
|
||||
shard.shardId(),
|
||||
shard.getInactiveTime());
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
@ -412,15 +310,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
ignoreReason = "shard not found";
|
||||
}
|
||||
if (ignoreReason != null) {
|
||||
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
|
||||
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shard.shardId());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static enum ShardStatusChangeType {
|
||||
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onShardActive(IndexShard indexShard) {
|
||||
// At least one shard used to be inactive ie. a new write operation just showed up.
|
||||
|
|
|
@ -22,54 +22,51 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class IndexingMemoryControllerTests extends ESTestCase {
|
||||
public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||
|
||||
static class MockController extends IndexingMemoryController {
|
||||
|
||||
final static ByteSizeValue INACTIVE = new ByteSizeValue(-1);
|
||||
|
||||
final Map<ShardId, ByteSizeValue> indexingBuffers = new HashMap<>();
|
||||
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
|
||||
final Map<IndexShard, ByteSizeValue> indexingBuffers = new HashMap<>();
|
||||
final Map<IndexShard, ByteSizeValue> translogBuffers = new HashMap<>();
|
||||
|
||||
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
|
||||
final Set<ShardId> activeShards = new HashSet<>();
|
||||
final Map<IndexShard, Long> lastIndexTimeNanos = new HashMap<>();
|
||||
final Set<IndexShard> activeShards = new HashSet<>();
|
||||
|
||||
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
|
||||
|
||||
public MockController(Settings settings) {
|
||||
super(Settings.builder()
|
||||
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
|
||||
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
|
||||
.put(settings)
|
||||
.build(),
|
||||
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
||||
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
|
||||
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
|
||||
.put(settings)
|
||||
.build(),
|
||||
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
||||
}
|
||||
|
||||
public void deleteShard(ShardId id) {
|
||||
public void deleteShard(IndexShard id) {
|
||||
indexingBuffers.remove(id);
|
||||
translogBuffers.remove(id);
|
||||
}
|
||||
|
||||
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
|
||||
public void assertBuffers(IndexShard id, ByteSizeValue indexing, ByteSizeValue translog) {
|
||||
assertThat(indexingBuffers.get(id), equalTo(indexing));
|
||||
assertThat(translogBuffers.get(id), equalTo(translog));
|
||||
}
|
||||
|
||||
public void assertInActive(ShardId id) {
|
||||
public void assertInactive(IndexShard id) {
|
||||
assertThat(indexingBuffers.get(id), equalTo(INACTIVE));
|
||||
assertThat(translogBuffers.get(id), equalTo(INACTIVE));
|
||||
}
|
||||
|
@ -80,36 +77,31 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<ShardId> availableShards() {
|
||||
protected List<IndexShard> availableShards() {
|
||||
return new ArrayList<>(indexingBuffers.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shardAvailable(ShardId shardId) {
|
||||
return indexingBuffers.containsKey(shardId);
|
||||
protected boolean shardAvailable(IndexShard shard) {
|
||||
return indexingBuffers.containsKey(shard);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean getShardActive(ShardId shardId) {
|
||||
return activeShards.contains(shardId);
|
||||
protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||
indexingBuffers.put(shard, shardIndexingBufferSize);
|
||||
translogBuffers.put(shard, shardTranslogBufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||
indexingBuffers.put(shardId, shardIndexingBufferSize);
|
||||
translogBuffers.put(shardId, shardTranslogBufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean checkIdle(ShardId shardId) {
|
||||
protected Boolean checkIdle(IndexShard shard) {
|
||||
final TimeValue inactiveTime = settings.getAsTime(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
|
||||
Long ns = lastIndexTimeNanos.get(shardId);
|
||||
Long ns = lastIndexTimeNanos.get(shard);
|
||||
if (ns == null) {
|
||||
return null;
|
||||
} else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) {
|
||||
indexingBuffers.put(shardId, INACTIVE);
|
||||
translogBuffers.put(shardId, INACTIVE);
|
||||
activeShards.remove(shardId);
|
||||
indexingBuffers.put(shard, INACTIVE);
|
||||
translogBuffers.put(shard, INACTIVE);
|
||||
activeShards.remove(shard);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -120,118 +112,126 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
|||
currentTimeSec += sec;
|
||||
}
|
||||
|
||||
public void simulateIndexing(ShardId shardId) {
|
||||
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
|
||||
if (indexingBuffers.containsKey(shardId) == false) {
|
||||
public void simulateIndexing(IndexShard shard) {
|
||||
lastIndexTimeNanos.put(shard, currentTimeInNanos());
|
||||
if (indexingBuffers.containsKey(shard) == false) {
|
||||
// First time we are seeing this shard; start it off with inactive buffers as IndexShard does:
|
||||
indexingBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||
translogBuffers.put(shardId, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||
indexingBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||
translogBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||
}
|
||||
activeShards.add(shardId);
|
||||
activeShards.add(shard);
|
||||
forceCheck();
|
||||
}
|
||||
}
|
||||
|
||||
public void testShardAdditionAndRemoval() {
|
||||
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
|
||||
final ShardId shard1 = new ShardId("test", 1);
|
||||
controller.simulateIndexing(shard1);
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build());
|
||||
IndexShard shard0 = test.getShard(0);
|
||||
controller.simulateIndexing(shard0);
|
||||
controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||
|
||||
// add another shard
|
||||
final ShardId shard2 = new ShardId("test", 2);
|
||||
controller.simulateIndexing(shard2);
|
||||
IndexShard shard1 = test.getShard(1);
|
||||
controller.simulateIndexing(shard1);
|
||||
controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||
|
||||
// remove first shard
|
||||
controller.deleteShard(shard1);
|
||||
controller.deleteShard(shard0);
|
||||
controller.forceCheck();
|
||||
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||
|
||||
// remove second shard
|
||||
controller.deleteShard(shard2);
|
||||
controller.deleteShard(shard1);
|
||||
controller.forceCheck();
|
||||
|
||||
// add a new one
|
||||
final ShardId shard3 = new ShardId("test", 3);
|
||||
controller.simulateIndexing(shard3);
|
||||
controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||
IndexShard shard2 = test.getShard(2);
|
||||
controller.simulateIndexing(shard2);
|
||||
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K
|
||||
}
|
||||
|
||||
public void testActiveInactive() {
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb")
|
||||
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s")
|
||||
.build());
|
||||
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
|
||||
final ShardId shard1 = new ShardId("test", 1);
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb")
|
||||
.put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s")
|
||||
.build());
|
||||
|
||||
IndexShard shard0 = test.getShard(0);
|
||||
controller.simulateIndexing(shard0);
|
||||
IndexShard shard1 = test.getShard(1);
|
||||
controller.simulateIndexing(shard1);
|
||||
final ShardId shard2 = new ShardId("test", 2);
|
||||
controller.simulateIndexing(shard2);
|
||||
controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
||||
|
||||
// index into both shards, move the clock and see that they are still active
|
||||
controller.simulateIndexing(shard0);
|
||||
controller.simulateIndexing(shard1);
|
||||
controller.simulateIndexing(shard2);
|
||||
|
||||
controller.incrementTimeSec(10);
|
||||
controller.forceCheck();
|
||||
|
||||
// both shards now inactive
|
||||
controller.assertInActive(shard1);
|
||||
controller.assertInActive(shard2);
|
||||
controller.assertInactive(shard0);
|
||||
controller.assertInactive(shard1);
|
||||
|
||||
// index into one shard only, see it becomes active
|
||||
controller.simulateIndexing(shard1);
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||
controller.assertInActive(shard2);
|
||||
controller.simulateIndexing(shard0);
|
||||
controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||
controller.assertInactive(shard1);
|
||||
|
||||
controller.incrementTimeSec(3); // increment but not enough to become inactive
|
||||
controller.forceCheck();
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||
controller.assertInActive(shard2);
|
||||
controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||
controller.assertInactive(shard1);
|
||||
|
||||
controller.incrementTimeSec(3); // increment some more
|
||||
controller.forceCheck();
|
||||
controller.assertInActive(shard1);
|
||||
controller.assertInActive(shard2);
|
||||
controller.assertInactive(shard0);
|
||||
controller.assertInactive(shard1);
|
||||
|
||||
// index some and shard becomes immediately active
|
||||
controller.simulateIndexing(shard2);
|
||||
controller.assertInActive(shard1);
|
||||
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||
controller.simulateIndexing(shard1);
|
||||
controller.assertInactive(shard0);
|
||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||
}
|
||||
|
||||
public void testMinShardBufferSizes() {
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
|
||||
.put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb")
|
||||
.put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build());
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
|
||||
.put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb")
|
||||
.put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build());
|
||||
|
||||
assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB));
|
||||
}
|
||||
|
||||
public void testMaxShardBufferSizes() {
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
|
||||
.put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb")
|
||||
.put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build());
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb")
|
||||
.put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb")
|
||||
.put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build());
|
||||
|
||||
assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB));
|
||||
}
|
||||
|
||||
public void testRelativeBufferSizes() {
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%")
|
||||
.build());
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%")
|
||||
.build());
|
||||
|
||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB)));
|
||||
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
|
||||
|
@ -240,10 +240,10 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
|||
|
||||
public void testMinBufferSizes() {
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%")
|
||||
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb")
|
||||
.put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%")
|
||||
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb")
|
||||
.put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
|
||||
|
||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
||||
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
|
||||
|
@ -251,23 +251,24 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
|||
|
||||
public void testMaxBufferSizes() {
|
||||
MockController controller = new MockController(Settings.builder()
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%")
|
||||
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb")
|
||||
.put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
|
||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
|
||||
.put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%")
|
||||
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb")
|
||||
.put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build());
|
||||
|
||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
||||
assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB)));
|
||||
}
|
||||
|
||||
protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) {
|
||||
final ShardId shard1 = new ShardId("test", 1);
|
||||
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard0 = test.getShard(0);
|
||||
controller.simulateIndexing(shard0);
|
||||
IndexShard shard1 = test.getShard(1);
|
||||
controller.simulateIndexing(shard1);
|
||||
final ShardId shard2 = new ShardId("test", 2);
|
||||
controller.simulateIndexing(shard2);
|
||||
controller.assertBuffers(shard0, indexBufferSize, translogBufferSize);
|
||||
controller.assertBuffers(shard1, indexBufferSize, translogBufferSize);
|
||||
controller.assertBuffers(shard2, indexBufferSize, translogBufferSize);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue