Add test for IMC to ensure we also move buffers to disk during translog recovery
This commit is contained in:
parent
d006200c77
commit
a356f74d4d
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
||||
|
@ -43,8 +42,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -71,7 +68,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final IndicesService indicesService;
|
||||
private final Iterable<IndexShard> indexShards;
|
||||
|
||||
private final ByteSizeValue indexingBuffer;
|
||||
|
||||
|
@ -88,14 +85,14 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
|
||||
private final ShardsIndicesStatusChecker statusChecker;
|
||||
|
||||
IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
|
||||
this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
|
||||
IndexingMemoryController(Settings settings, ThreadPool threadPool, Iterable<IndexShard>indexServices) {
|
||||
this(settings, threadPool, indexServices, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
|
||||
}
|
||||
|
||||
// for testing
|
||||
IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) {
|
||||
IndexingMemoryController(Settings settings, ThreadPool threadPool, Iterable<IndexShard> indexServices, long jvmMemoryInBytes) {
|
||||
super(settings);
|
||||
this.indicesService = indicesService;
|
||||
this.indexShards = indexServices;
|
||||
|
||||
ByteSizeValue indexingBuffer;
|
||||
String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%");
|
||||
|
@ -152,13 +149,10 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
|
||||
protected List<IndexShard> availableShards() {
|
||||
List<IndexShard> availableShards = new ArrayList<>();
|
||||
|
||||
for (IndexService indexService : indicesService) {
|
||||
for (IndexShard shard : indexService) {
|
||||
// shadow replica doesn't have an indexing buffer
|
||||
if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
|
||||
availableShards.add(shard);
|
||||
}
|
||||
for (IndexShard shard : indexShards) {
|
||||
// shadow replica doesn't have an indexing buffer
|
||||
if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
|
||||
availableShards.add(shard);
|
||||
}
|
||||
}
|
||||
return availableShards;
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||
|
@ -171,7 +172,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
this.mapperRegistry = mapperRegistry;
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
|
||||
clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
|
||||
indexingMemoryController = new IndexingMemoryController(settings, threadPool, this);
|
||||
indexingMemoryController = new IndexingMemoryController(settings, threadPool, Iterables.flatten(this));
|
||||
this.indexScopeSetting = indexScopedSettings;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
|
||||
|
|
|
@ -1404,12 +1404,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
public static final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners);
|
||||
return recoverShard(newShard, shard.routingEntry());
|
||||
}
|
||||
|
||||
private final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException {
|
||||
public static final IndexShard recoverShard(IndexShard newShard, ShardRouting oldRouting) throws IOException {
|
||||
ShardRouting routing = new ShardRouting(oldRouting);
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
|
@ -1422,7 +1422,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
return newShard;
|
||||
}
|
||||
|
||||
private final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
|
||||
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
|
||||
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
|
||||
|
|
|
@ -19,26 +19,43 @@
|
|||
package org.elasticsearch.indices;
|
||||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTests;
|
||||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
|
@ -377,4 +394,51 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testTranslogRecoveryWorksWithIMC() throws IOException {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService indexService = indicesService.indexService(resolveIndex("test"));
|
||||
IndexShard shard = indexService.getShardOrNull(0);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
client().prepareIndex("test", "test", Integer.toString(i)).setSource("{\"foo\" : \"bar\"}").get();
|
||||
}
|
||||
|
||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
|
||||
shard.close("simon says", false);
|
||||
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
|
||||
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50kb").build();
|
||||
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
|
||||
: Collections.singleton(shardRef.get()).iterator();
|
||||
AtomicInteger flushes = new AtomicInteger();
|
||||
IndexingMemoryController imc = new IndexingMemoryController(settings, client().threadPool(), iterable) {
|
||||
@Override
|
||||
protected void writeIndexingBufferAsync(IndexShard shard) {
|
||||
assertEquals(shard, shardRef.get());
|
||||
flushes.incrementAndGet();
|
||||
shard.writeIndexingBuffer();
|
||||
}
|
||||
};
|
||||
final IndexShard newShard = IndexShardTests.newIndexShard(indexService, shard, wrapper, imc);
|
||||
shardRef.set(newShard);
|
||||
try {
|
||||
assertEquals(0, imc.availableShards().size());
|
||||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
|
||||
|
||||
assertEquals(1, imc.availableShards().size());
|
||||
assertTrue(newShard.recoverFromStore(localNode));
|
||||
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
|
||||
routing = new ShardRouting(routing);
|
||||
ShardRoutingHelper.moveToStarted(routing);
|
||||
newShard.updateRoutingEntry(routing, true);
|
||||
} finally {
|
||||
newShard.close("simon says", false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue