IndexingMemoryController should only update buffer settings of recovered shards

At the moment the IndexingMemoryController can try to update the index buffer memory of shards at any give moment. This update involves a flush, which may cause a FlushNotAllowedEngineException to be thrown in a concurrently finalizing recovery.

Closes #6642, closes #6667
This commit is contained in:
Boaz Leskes 2014-07-01 17:34:28 +02:00
parent b0c21d751d
commit 7119ffa7bc
3 changed files with 132 additions and 33 deletions

View File

@ -19,19 +19,7 @@
package org.elasticsearch.index.engine.internal; package org.elasticsearch.index.engine.internal;
import java.io.IOException; import com.google.common.collect.Lists;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
@ -40,7 +28,6 @@ import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -53,7 +40,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.SegmentReaderUtils; import org.elasticsearch.common.lucene.SegmentReaderUtils;
@ -64,7 +50,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
@ -89,7 +74,18 @@ import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.indices.warmer.InternalIndicesWarmer; import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* *
@ -314,6 +310,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
return new TimeValue(1, TimeUnit.SECONDS); return new TimeValue(1, TimeUnit.SECONDS);
} }
/** return the current indexing buffer size setting * */
public ByteSizeValue indexingBufferSize() {
return indexingBufferSize;
}
@Override @Override
public void enableGcDeletes(boolean enableGcDeletes) { public void enableGcDeletes(boolean enableGcDeletes) {
this.enableGcDeletes = enableGcDeletes; this.enableGcDeletes = enableGcDeletes;
@ -1566,11 +1567,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override @Override
public void beforeMerge(OnGoingMerge merge) { public void beforeMerge(OnGoingMerge merge) {
if (numMergesInFlight.incrementAndGet() > maxNumMerges) { if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) { if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
} }
lock = lockReference; lock = lockReference;
} }
} }
@ -1588,7 +1589,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private static final class NoOpLock implements Lock { private static final class NoOpLock implements Lock {
@Override @Override
public void lock() {} public void lock() {
}
@Override @Override
public void lockInterruptibly() throws InterruptedException { public void lockInterruptibly() throws InterruptedException {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -41,6 +42,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -64,7 +66,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final TimeValue inactiveTime; private final TimeValue inactiveTime;
private final TimeValue interval; private final TimeValue interval;
private final AtomicBoolean shardsCreatedOrDeleted = new AtomicBoolean(); private final AtomicBoolean shardsRecoveredOrDeleted = new AtomicBoolean();
private final Listener listener = new Listener(); private final Listener listener = new Listener();
@ -74,6 +76,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final Object mutex = new Object(); private final Object mutex = new Object();
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
@Inject @Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
super(settings); super(settings);
@ -151,6 +155,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
protected void doClose() throws ElasticsearchException { protected void doClose() throws ElasticsearchException {
} }
/**
* returns the current budget for the total amount of indexing buffers of
* active shards on this node
*/
public ByteSizeValue indexingBufferSize() {
return indexingBuffer;
}
class ShardsIndicesStatusChecker implements Runnable { class ShardsIndicesStatusChecker implements Runnable {
@Override @Override
public void run() { public void run() {
@ -206,9 +218,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
// ignore // ignore
} }
} }
boolean shardsCreatedOrDeleted = IndexingMemoryController.this.shardsCreatedOrDeleted.compareAndSet(true, false); boolean shardsRecoveredOrDeleted = IndexingMemoryController.this.shardsRecoveredOrDeleted.compareAndSet(true, false);
if (shardsCreatedOrDeleted || activeInactiveStatusChanges) { if (shardsRecoveredOrDeleted || activeInactiveStatusChanges) {
calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] created/deleted[" + shardsCreatedOrDeleted + "]"); calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] recovered/deleted[" + shardsRecoveredOrDeleted + "]");
} }
} }
} }
@ -217,10 +229,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
class Listener extends IndicesLifecycle.Listener { class Listener extends IndicesLifecycle.Listener {
@Override @Override
public void afterIndexShardCreated(IndexShard indexShard) { public void afterIndexShardPostRecovery(IndexShard indexShard) {
synchronized (mutex) { synchronized (mutex) {
shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus()); shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus());
shardsCreatedOrDeleted.set(true); shardsRecoveredOrDeleted.set(true);
} }
} }
@ -228,14 +240,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
public void afterIndexShardClosed(ShardId shardId) { public void afterIndexShardClosed(ShardId shardId) {
synchronized (mutex) { synchronized (mutex) {
shardsIndicesStatus.remove(shardId); shardsIndicesStatus.remove(shardId);
shardsCreatedOrDeleted.set(true); shardsRecoveredOrDeleted.set(true);
} }
} }
} }
private void calcAndSetShardBuffers(String reason) { private void calcAndSetShardBuffers(String reason) {
int shardsCount = countShards(); int shardsCount = countActiveShards();
if (shardsCount == 0) { if (shardsCount == 0) {
return; return;
} }
@ -258,6 +270,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize); logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize);
for (IndexService indexService : indicesService) { for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
IndexShardState state = indexShard.state();
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) {
logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state);
continue;
}
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || !status.inactiveIndexing) { if (status == null || !status.inactiveIndexing) {
try { try {
@ -270,14 +287,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
// ignore // ignore
continue; continue;
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to set shard [{}][{}] index buffer to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), shardIndexingBufferSize); logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize);
} }
} }
} }
} }
} }
private int countShards() { private int countActiveShards() {
int shardsCount = 0; int shardsCount = 0;
for (IndexService indexService : indicesService) { for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.memory;
import com.google.common.base.Predicate;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
public class IndexMemoryControllerTests extends ElasticsearchIntegrationTest {
@Test
public void testIndexBufferSizeUpdateAfterShardCreation() throws InterruptedException {
internalCluster().startNode(ImmutableSettings.builder()
.put("http.enabled", "false")
.put("discovery.type", "local")
.put("indices.memory.interval", "1s")
);
client().admin().indices().prepareCreate("test1")
.setSettings(ImmutableSettings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
).get();
ensureGreen();
final InternalIndexShard shard1 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);
client().admin().indices().prepareCreate("test2")
.setSettings(ImmutableSettings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
).get();
ensureGreen();
final InternalIndexShard shard2 = (InternalIndexShard) internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0);
final long expectedShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes() / 2;
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return ((InternalEngine) shard1.engine()).indexingBufferSize().bytes() <= expectedShardSize &&
((InternalEngine) shard2.engine()).indexingBufferSize().bytes() <= expectedShardSize;
}
});
if (!success) {
fail("failed to update shard indexing buffer size. expected [" + expectedShardSize + "] shard1 [" +
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "] shard2 [" +
((InternalEngine) shard1.engine()).indexingBufferSize().bytes() + "]"
);
}
}
}