[Infra] remove indicesLifecycle.Listener from IndexingMemoryController
The IndexingMemoryController determines the amount of indexing buffer size and translog buffer size each shard should have. It takes memory from inactive shards (indexing wise) and assigns it to other shards. To do so it needs to know about the addition and closing of shards. The current implementation hooks into the indicesService.indicesLifecycle() mechanism to receive call backs, such shard entered the POST_RECOVERY state. Those call backs are typically run on the thread that actually made the change. A mutex was used to synchronize those callbacks with IndexingMemoryController's background thread, which updates the internal engines memory usage on a regular interval. This introduced a dependency between those threads and the locks of the internal engines hosted on the node. In a *very* rare situation (two tests runs locally) this can cause recovery time outs where two nodes are recovering replicas from each other. This commit introduces a a lock free approach that updates the internal data structures during iterations in the background thread. Closes #6892
This commit is contained in:
parent
9714dd55c2
commit
38d8e3ccc2
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.indices.memory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -37,16 +36,12 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -66,16 +61,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
|
||||
private final TimeValue inactiveTime;
|
||||
private final TimeValue interval;
|
||||
private final AtomicBoolean shardsRecoveredOrDeleted = new AtomicBoolean();
|
||||
|
||||
private final Listener listener = new Listener();
|
||||
|
||||
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = Maps.newHashMap();
|
||||
|
||||
private volatile ScheduledFuture scheduler;
|
||||
|
||||
private final Object mutex = new Object();
|
||||
|
||||
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
|
||||
|
||||
@Inject
|
||||
|
@ -137,14 +125,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
|
||||
@Override
|
||||
protected void doStart() throws ElasticsearchException {
|
||||
indicesService.indicesLifecycle().addListener(listener);
|
||||
// its fine to run it on the scheduler thread, no busy work
|
||||
this.scheduler = threadPool.scheduleWithFixedDelay(new ShardsIndicesStatusChecker(), interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticsearchException {
|
||||
indicesService.indicesLifecycle().removeListener(listener);
|
||||
if (scheduler != null) {
|
||||
scheduler.cancel(false);
|
||||
scheduler = null;
|
||||
|
@ -164,153 +150,185 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||
}
|
||||
|
||||
class ShardsIndicesStatusChecker implements Runnable {
|
||||
|
||||
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = new HashMap<>();
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (mutex) {
|
||||
boolean activeInactiveStatusChanges = false;
|
||||
List<IndexShard> activeToInactiveIndexingShards = Lists.newArrayList();
|
||||
List<IndexShard> inactiveToActiveIndexingShards = Lists.newArrayList();
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
long time = threadPool.estimatedTimeInMillis();
|
||||
Translog translog = ((InternalIndexShard) indexShard).translog();
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null) { // not added yet
|
||||
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
|
||||
|
||||
changes.addAll(purgeDeletedAndClosedShards());
|
||||
|
||||
final List<IndexShard> activeToInactiveIndexingShards = Lists.newArrayList();
|
||||
final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards);
|
||||
for (IndexShard indexShard : activeToInactiveIndexingShards) {
|
||||
// update inactive indexing buffer size
|
||||
try {
|
||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||
((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||
} catch (EngineClosedException e) {
|
||||
// ignore
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
if (!changes.isEmpty()) {
|
||||
calcAndSetShardBuffers(activeShards, "[" + changes + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* goes through all existing shards and check whether the changes their active status
|
||||
*
|
||||
* @return the current count of active shards
|
||||
*/
|
||||
private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<IndexShard> activeToInactiveIndexingShards) {
|
||||
int activeShards = 0;
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
|
||||
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) {
|
||||
// not ready to be updated yet.
|
||||
continue;
|
||||
}
|
||||
|
||||
final long time = threadPool.estimatedTimeInMillis();
|
||||
|
||||
Translog translog = ((InternalIndexShard) indexShard).translog();
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null) {
|
||||
status = new ShardIndexingStatus();
|
||||
shardsIndicesStatus.put(indexShard.shardId(), status);
|
||||
changes.add(ShardStatusChangeType.ADDED);
|
||||
}
|
||||
// check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time)
|
||||
if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) {
|
||||
if (status.time == -1) { // first time
|
||||
status.time = time;
|
||||
}
|
||||
// inactive?
|
||||
if (status.activeIndexing) {
|
||||
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
|
||||
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
|
||||
// inactive for this amount of time, mark it
|
||||
activeToInactiveIndexingShards.add(indexShard);
|
||||
status.activeIndexing = false;
|
||||
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
|
||||
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (!status.activeIndexing) {
|
||||
status.activeIndexing = true;
|
||||
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
|
||||
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
|
||||
}
|
||||
status.time = -1;
|
||||
}
|
||||
status.translogId = translog.currentId();
|
||||
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();
|
||||
|
||||
if (status.activeIndexing) {
|
||||
activeShards++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return activeShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* purge any existing statuses that are no longer updated
|
||||
*
|
||||
* @return true if any change
|
||||
*/
|
||||
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
|
||||
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
|
||||
|
||||
Iterator<ShardId> statusShardIdIterator = shardsIndicesStatus.keySet().iterator();
|
||||
while (statusShardIdIterator.hasNext()) {
|
||||
ShardId statusShardId = statusShardIdIterator.next();
|
||||
IndexService indexService = indicesService.indexService(statusShardId.getIndex());
|
||||
boolean remove = false;
|
||||
try {
|
||||
if (indexService == null) {
|
||||
remove = true;
|
||||
continue;
|
||||
}
|
||||
IndexShard indexShard = indexService.shard(statusShardId.id());
|
||||
if (indexShard == null) {
|
||||
remove = true;
|
||||
continue;
|
||||
}
|
||||
remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state());
|
||||
|
||||
} finally {
|
||||
if (remove) {
|
||||
changes.add(ShardStatusChangeType.DELETED);
|
||||
statusShardIdIterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
return changes;
|
||||
}
|
||||
|
||||
private void calcAndSetShardBuffers(int activeShards, String reason) {
|
||||
if (activeShards == 0) {
|
||||
return;
|
||||
}
|
||||
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards);
|
||||
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = minShardIndexBufferSize;
|
||||
}
|
||||
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = maxShardIndexBufferSize;
|
||||
}
|
||||
|
||||
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / activeShards);
|
||||
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
|
||||
shardTranslogBufferSize = minShardTranslogBufferSize;
|
||||
}
|
||||
if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) {
|
||||
shardTranslogBufferSize = maxShardTranslogBufferSize;
|
||||
}
|
||||
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
IndexShardState state = indexShard.state();
|
||||
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) {
|
||||
logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state);
|
||||
continue;
|
||||
}
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null || status.activeIndexing) {
|
||||
try {
|
||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
|
||||
((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize);
|
||||
} catch (EngineClosedException e) {
|
||||
// ignore
|
||||
continue;
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore
|
||||
continue;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize);
|
||||
}
|
||||
// check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time)
|
||||
if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) {
|
||||
if (status.time == -1) { // first time
|
||||
status.time = time;
|
||||
}
|
||||
// inactive?
|
||||
if (!status.inactiveIndexing) {
|
||||
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
|
||||
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
|
||||
// inactive for this amount of time, mark it
|
||||
activeToInactiveIndexingShards.add(indexShard);
|
||||
status.inactiveIndexing = true;
|
||||
activeInactiveStatusChanges = true;
|
||||
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (status.inactiveIndexing) {
|
||||
inactiveToActiveIndexingShards.add(indexShard);
|
||||
status.inactiveIndexing = false;
|
||||
activeInactiveStatusChanges = true;
|
||||
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
|
||||
}
|
||||
status.time = -1;
|
||||
}
|
||||
status.translogId = translog.currentId();
|
||||
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();
|
||||
}
|
||||
}
|
||||
for (IndexShard indexShard : activeToInactiveIndexingShards) {
|
||||
// update inactive indexing buffer size
|
||||
try {
|
||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
|
||||
((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
||||
} catch (EngineClosedException e) {
|
||||
// ignore
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
boolean shardsRecoveredOrDeleted = IndexingMemoryController.this.shardsRecoveredOrDeleted.compareAndSet(true, false);
|
||||
if (shardsRecoveredOrDeleted || activeInactiveStatusChanges) {
|
||||
calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] recovered/deleted[" + shardsRecoveredOrDeleted + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Listener extends IndicesLifecycle.Listener {
|
||||
|
||||
@Override
|
||||
public void afterIndexShardPostRecovery(IndexShard indexShard) {
|
||||
synchronized (mutex) {
|
||||
shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus());
|
||||
shardsRecoveredOrDeleted.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexShardClosed(ShardId shardId) {
|
||||
synchronized (mutex) {
|
||||
shardsIndicesStatus.remove(shardId);
|
||||
shardsRecoveredOrDeleted.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void calcAndSetShardBuffers(String reason) {
|
||||
int shardsCount = countActiveShards();
|
||||
if (shardsCount == 0) {
|
||||
return;
|
||||
}
|
||||
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
|
||||
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = minShardIndexBufferSize;
|
||||
}
|
||||
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
|
||||
shardIndexingBufferSize = maxShardIndexBufferSize;
|
||||
}
|
||||
|
||||
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / shardsCount);
|
||||
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
|
||||
shardTranslogBufferSize = minShardTranslogBufferSize;
|
||||
}
|
||||
if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) {
|
||||
shardTranslogBufferSize = maxShardTranslogBufferSize;
|
||||
}
|
||||
|
||||
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize);
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
IndexShardState state = indexShard.state();
|
||||
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) {
|
||||
logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state);
|
||||
continue;
|
||||
}
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null || !status.inactiveIndexing) {
|
||||
try {
|
||||
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
|
||||
((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize);
|
||||
} catch (EngineClosedException e) {
|
||||
// ignore
|
||||
continue;
|
||||
} catch (FlushNotAllowedEngineException e) {
|
||||
// ignore
|
||||
continue;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int countActiveShards() {
|
||||
int shardsCount = 0;
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
|
||||
if (status == null || !status.inactiveIndexing) {
|
||||
shardsCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return shardsCount;
|
||||
private static enum ShardStatusChangeType {
|
||||
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
|
||||
}
|
||||
|
||||
|
||||
static class ShardIndexingStatus {
|
||||
long translogId = -1;
|
||||
int translogNumberOfOperations = -1;
|
||||
boolean inactiveIndexing = false;
|
||||
boolean activeIndexing = true;
|
||||
long time = -1; // contains the first time we saw this shard with no operations done on it
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.memory;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.index.engine.internal.InternalEngine;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
|
||||
public class IndexMemoryControllerTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testIndexBufferSizeUpdateAfterShardCreation() throws InterruptedException {
|
||||
|
||||
internalCluster().startNode(ImmutableSettings.builder()
|
||||
.put("http.enabled", "false")
|
||||
.put("discovery.type", "local")
|
||||
.put("indices.memory.interval", "1s")
|
||||
);
|
||||
|
||||
client().admin().indices().prepareCreate("test1")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
).get();
|
||||
|
||||
ensureGreen();
|
||||
|
||||
final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
|
||||
|
||||
client().admin().indices().prepareCreate("test2")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
).get();
|
||||
|
||||
ensureGreen();
|
||||
|
||||
final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0);
|
||||
final long expectedShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes() / 2;
|
||||
|
||||
boolean success = awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expectedShardSize &&
|
||||
((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expectedShardSize;
|
||||
}
|
||||
});
|
||||
|
||||
if (!success) {
|
||||
fail("failed to update shard indexing buffer size. expected [" + expectedShardSize + "] shard1 [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.memory;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.internal.InternalEngine;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
|
||||
public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException {
|
||||
|
||||
createNode(ImmutableSettings.EMPTY);
|
||||
|
||||
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
|
||||
|
||||
ensureGreen();
|
||||
|
||||
final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
|
||||
|
||||
prepareCreate("test2").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
|
||||
|
||||
ensureGreen();
|
||||
|
||||
final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0);
|
||||
final long expected1ShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes();
|
||||
final long expected2ShardsSize = expected1ShardSize / 2;
|
||||
|
||||
boolean success = awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expected2ShardsSize &&
|
||||
((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expected2ShardsSize;
|
||||
}
|
||||
});
|
||||
|
||||
if (!success) {
|
||||
fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" +
|
||||
((InternalEngine) shard2.engine()).indexingBufferSize().bytes() + "]"
|
||||
);
|
||||
}
|
||||
|
||||
client().admin().indices().prepareDelete("test2").get();
|
||||
success = awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() >= expected1ShardSize;
|
||||
}
|
||||
});
|
||||
|
||||
if (!success) {
|
||||
fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException {
|
||||
|
||||
createNode(ImmutableSettings.builder().put("indices.memory.shard_inactive_time", "100ms").build());
|
||||
|
||||
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
|
||||
|
||||
ensureGreen();
|
||||
|
||||
final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
|
||||
boolean success = awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
|
||||
}
|
||||
});
|
||||
if (!success) {
|
||||
fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
|
||||
);
|
||||
}
|
||||
|
||||
index("test1", "type", "1", "f", 1);
|
||||
|
||||
success = awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() > Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
|
||||
}
|
||||
});
|
||||
if (!success) {
|
||||
fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
|
||||
);
|
||||
}
|
||||
|
||||
flush(); // clean translogs
|
||||
|
||||
success = awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() == Engine.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
|
||||
}
|
||||
});
|
||||
if (!success) {
|
||||
fail("failed to update shard indexing buffer size due to inactive state. expected [" + Engine.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
|
||||
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNode(Settings settings) {
|
||||
internalCluster().startNode(ImmutableSettings.builder()
|
||||
.put(ClusterName.SETTING, "IndexingMemoryControllerTests")
|
||||
.put("node.name", "IndexingMemoryControllerTests")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
|
||||
.put("http.enabled", false)
|
||||
.put("index.store.type", "ram")
|
||||
.put("config.ignore_system_properties", true) // make sure we get what we set :)
|
||||
.put("gateway.type", "none")
|
||||
.put("indices.memory.interval", "100ms")
|
||||
.put(settings)
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue