From b08352047d5c3dfccec85b37e3de060aea08ed0d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Sep 2016 18:20:25 +0200 Subject: [PATCH] Introduce IndexShardTestCase (#20411) Introduce a base class for unit tests that are based on real `IndexShard`s. The base class takes care of all the little details needed to create and recover shards. This commit also moves `IndexShardTests` and `ESIndexLevelReplicationTestCase` to use the new base class. All tests in `IndexShardTests` that required a full node environment were moved to a new `IndexShardIT` suite. --- .../index/IndexServiceTests.java | 10 +- .../ESIndexLevelReplicationTestCase.java | 227 +-- .../index/shard/IndexShardIT.java | 476 ++++++ .../index/shard/IndexShardTests.java | 1336 ++++++----------- .../index/shard/ShardUtilsTests.java | 4 + .../IndexingMemoryControllerTests.java | 5 +- .../cluster/routing/ShardRoutingHelper.java | 0 .../index/shard/IndexShardTestCase.java | 477 ++++++ 8 files changed, 1419 insertions(+), 1116 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java rename {core/src/test => test/framework/src/main}/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java (100%) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 22324e1ff2b..afde263d73d 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -43,6 +43,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -52,13 +54,13 @@ import static org.hamcrest.Matchers.nullValue; public class IndexServiceTests extends ESSingleNodeTestCase { public void testDetermineShadowEngineShouldBeUsed() { Settings regularSettings = Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SHARDS, 2) + .put(SETTING_NUMBER_OF_REPLICAS, 1) .build(); Settings shadowSettings = Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SHARDS, 2) + .put(SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) .build(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index ec794091a42..2d6ef7f2069 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -18,15 +18,7 @@ */ package org.elasticsearch.index.replication; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; @@ -41,52 +33,21 @@ import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; -import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.collect.Iterators; -import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.MapperTestUtils; -import org.elasticsearch.index.cache.IndexCache; -import org.elasticsearch.index.cache.query.DisabledQueryCache; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.UidFieldMapper; -import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardPath; -import org.elasticsearch.index.similarity.SimilarityService; -import org.elasticsearch.index.store.DirectoryService; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.RecoveryFailedException; -import org.elasticsearch.indices.recovery.RecoverySourceHandler; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; -import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; -import org.elasticsearch.indices.recovery.StartRecoveryRequest; -import org.elasticsearch.test.DummyShardLock; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -94,10 +55,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -107,98 +66,24 @@ import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { +public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase { - protected ThreadPool threadPool; protected final Index index = new Index("test", "uuid"); private final ShardId shardId = new ShardId(index, 0); private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); - protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { - @Override - public void onRecoveryDone(RecoveryState state) { - - } - - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - fail(ExceptionsHelper.detailedMessage(e)); - } - }; - - - @TestLogging("index.shard:TRACE,index.replication:TRACE,indices.recovery:TRACE") - public void testIndexingDuringFileRecovery() throws Exception { - try (ReplicationGroup shards = createGroup(randomInt(1))) { - shards.startAll(); - int docs = shards.indexDocs(randomInt(50)); - shards.flush(); - IndexShard replica = shards.addReplica(); - final CountDownLatch recoveryBlocked = new CountDownLatch(1); - final CountDownLatch releaseRecovery = new CountDownLatch(1); - final Future recoveryFuture = shards.asyncRecoverReplica(replica, - new BiFunction() { - @Override - public RecoveryTarget apply(IndexShard indexShard, DiscoveryNode node) { - return new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) { - @Override - public void renameAllTempFiles() throws IOException { - super.renameAllTempFiles(); - recoveryBlocked.countDown(); - try { - releaseRecovery.await(); - } catch (InterruptedException e) { - throw new IOException("terminated by interrupt", e); - } - } - }; - } - }); - - recoveryBlocked.await(); - docs += shards.indexDocs(randomInt(20)); - releaseRecovery.countDown(); - recoveryFuture.get(); - - shards.assertAllEqual(docs); - } - } - - @Override - public void setUp() throws Exception { - super.setUp(); - threadPool = new TestThreadPool(getClass().getName()); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); - } - - private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { - final ShardId shardId = shardPath.getShardId(); - final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { - @Override - public Directory newDirectory() throws IOException { - return newFSDirectory(shardPath.resolveIndex()); - } - - @Override - public long throttleTimeInNanos() { - return 0; - } - }; - return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); - } protected ReplicationGroup createGroup(int replicas) throws IOException { - final Path homePath = createTempDir(); - Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .build(); - IndexMetaData metaData = IndexMetaData.builder(index.getName()).settings(build).primaryTerm(0, 1).build(); - return new ReplicationGroup(metaData, homePath); + IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName()) + .settings(settings) + .primaryTerm(0, 1); + for (Map.Entry typeMapping: indexMapping.entrySet()) { + metaData.putMapping(typeMapping.getKey(), typeMapping.getValue()); + } + return new ReplicationGroup(metaData.build()); } protected DiscoveryNode getDiscoveryNode(String id) { @@ -206,50 +91,22 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); } - private IndexShard newShard(boolean primary, DiscoveryNode node, IndexMetaData indexMetaData, Path homePath) throws IOException { - // add node name to settings for propper logging - final Settings nodeSettings = Settings.builder().put("node.name", node.getName()).build(); - final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); - ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, node.getId(), primary, ShardRoutingState.INITIALIZING, - primary ? StoreRecoverySource.EMPTY_STORE_INSTANCE : PeerRecoverySource.INSTANCE); - final Path path = Files.createDirectories(homePath.resolve(node.getId())); - final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(path); - ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - Store store = createStore(indexSettings, shardPath); - IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); - MapperService mapperService = MapperTestUtils.newMapperService(homePath, indexSettings.getSettings()); - for (Map.Entry type : indexMapping.entrySet()) { - mapperService.merge(type.getKey(), new CompressedXContent(type.getValue()), MapperService.MergeReason.MAPPING_RECOVERY, true); - } - SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap()); - final IndexEventListener indexEventListener = new IndexEventListener() { - }; - final Engine.Warmer warmer = searcher -> { - }; - return new IndexShard(shardRouting, indexSettings, shardPath, store, indexCache, mapperService, similarityService, null, null, - indexEventListener, null, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), - Collections.emptyList()); - } - protected class ReplicationGroup implements AutoCloseable, Iterable { private final IndexShard primary; private final List replicas; private final IndexMetaData indexMetaData; - private final Path homePath; private final AtomicInteger replicaId = new AtomicInteger(); private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; - ReplicationGroup(final IndexMetaData indexMetaData, Path homePath) throws IOException { - primary = newShard(true, getDiscoveryNode("s0"), indexMetaData, homePath); + ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { + primary = newShard(shardId, true, "s0", indexMetaData, null); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; - this.homePath = homePath; for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { addReplica(); } - } public int indexDocs(final int numOfDoc) throws Exception { @@ -289,7 +146,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { } public synchronized IndexShard addReplica() throws IOException { - final IndexShard replica = newShard(false, getDiscoveryNode("s" + replicaId.incrementAndGet()), indexMetaData, homePath); + final IndexShard replica = newShard(shardId, false,"s" + replicaId.incrementAndGet(), indexMetaData, null); replicas.add(replica); return replica; } @@ -304,39 +161,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { } public void recoverReplica(IndexShard replica, BiFunction targetSupplier, - boolean markAsRecovering) - throws IOException { - final DiscoveryNode pNode = getPrimaryNode(); - final DiscoveryNode rNode = getDiscoveryNode(replica.routingEntry().currentNodeId()); - if (markAsRecovering) { - replica.markAsRecovering("remote", - new RecoveryState(replica.routingEntry(), pNode, rNode)); - } else { - assertEquals(replica.state(), IndexShardState.RECOVERING); - } - replica.prepareForIndexRecovery(); - RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); - StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, - getMetadataSnapshotOrEmpty(replica), false, 0); - RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {}, - (int) ByteSizeUnit.MB.toKB(1), logger); - recovery.recoverToTarget(); - recoveryTarget.markAsDone(); - replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); - } - - private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { - Store.MetadataSnapshot result; - try { - result = replica.snapshotStoreMetadata(); - } catch (IndexNotFoundException e) { - // OK! - result = Store.MetadataSnapshot.EMPTY; - } catch (IOException e) { - logger.warn("failed read store, treating as empty", e); - result = Store.MetadataSnapshot.EMPTY; - } - return result; + boolean markAsRecovering) throws IOException { + ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering); } public synchronized DiscoveryNode getPrimaryNode() { @@ -367,24 +193,6 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { } } - private Set getShardDocUIDs(final IndexShard shard) throws IOException { - shard.refresh("get_uids"); - try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - Set ids = new HashSet<>(); - for (LeafReaderContext leafContext : searcher.reader().leaves()) { - LeafReader reader = leafContext.reader(); - Bits liveDocs = reader.getLiveDocs(); - for (int i = 0; i < reader.maxDoc(); i++) { - if (liveDocs == null || liveDocs.get(i)) { - Document uuid = reader.document(i, Collections.singleton(UidFieldMapper.NAME)); - ids.add(Uid.createUid(uuid.get(UidFieldMapper.NAME))); - } - } - } - return ids; - } - } - public synchronized void refresh(String source) { for (IndexShard shard : this) { shard.refresh(source); @@ -406,10 +214,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { public synchronized void close() throws Exception { if (closed == false) { closed = true; - for (IndexShard shard : this) { - shard.close("eol", false); - IOUtils.close(shard.store()); - } + closeShards(this); } else { throw new AlreadyClosedException("too bad"); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java new file mode 100644 index 00000000000..fc943bcebe9 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -0,0 +1,476 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLock; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.flush.FlushStats; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.InternalSettingsPlugin; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; + +public class IndexShardIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return pluginList(InternalSettingsPlugin.class); + } + + private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, + ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { + Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE); + Field versionField = new NumericDocValuesField("_version", 0); + document.add(uidField); + document.add(versionField); + return new ParsedDocument(versionField, id, type, routing, timestamp, ttl, Collections.singletonList(document), source, + mappingUpdate); + } + + public void testLockTryingToDelete() throws Exception { + createIndex("test"); + ensureGreen(); + NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); + + ClusterService cs = getInstanceFromNode(ClusterService.class); + final Index index = cs.state().metaData().index("test").getIndex(); + Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0)); + logger.info("--> paths: [{}]", (Object)shardPaths); + // Should not be able to acquire the lock because it's already open + try { + NodeEnvironment.acquireFSLockForPaths(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), shardPaths); + fail("should not have been able to acquire the lock"); + } catch (LockObtainFailedException e) { + assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); + } + // Test without the regular shard lock to assume we can acquire it + // (worst case, meaning that the shard lock could be acquired and + // we're green to delete the shard's directory) + ShardLock sLock = new DummyShardLock(new ShardId(index, 0)); + try { + env.deleteShardDirectoryUnderLock(sLock, IndexSettingsModule.newIndexSettings("test", Settings.EMPTY)); + fail("should not have been able to delete the directory"); + } catch (LockObtainFailedException e) { + assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); + } + } + + public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + client().prepareIndex("test", "test").setSource("{}").get(); + ensureGreen("test"); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0); + assertBusy(() -> { + IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test"); + assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); + indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0); + } + ); + IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); + assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); + } + + public void testDurableFlagHasEffect() { + createIndex("test"); + ensureGreen(); + client().prepareIndex("test", "bar", "1").setSource("{}").get(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + IndexShard shard = test.getShardOrNull(0); + setDurability(shard, Translog.Durability.REQUEST); + assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + setDurability(shard, Translog.Durability.ASYNC); + client().prepareIndex("test", "bar", "2").setSource("{}").get(); + assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + setDurability(shard, Translog.Durability.REQUEST); + client().prepareDelete("test", "bar", "1").get(); + assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + + setDurability(shard, Translog.Durability.ASYNC); + client().prepareDelete("test", "bar", "2").get(); + assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + setDurability(shard, Translog.Durability.REQUEST); + assertNoFailures(client().prepareBulk() + .add(client().prepareIndex("test", "bar", "3").setSource("{}")) + .add(client().prepareDelete("test", "bar", "1")).get()); + assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + + setDurability(shard, Translog.Durability.ASYNC); + assertNoFailures(client().prepareBulk() + .add(client().prepareIndex("test", "bar", "4").setSource("{}")) + .add(client().prepareDelete("test", "bar", "3")).get()); + setDurability(shard, Translog.Durability.REQUEST); + assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + } + + private void setDurability(IndexShard shard, Translog.Durability durability) { + client().admin().indices().prepareUpdateSettings(shard.shardId().getIndexName()).setSettings( + Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability.name()).build()).get(); + assertEquals(durability, shard.getTranslogDurability()); + } + + public void testUpdatePriority() { + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(IndexMetaData.SETTING_PRIORITY, 200)); + IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test")); + assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue()); + client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400) + .build()).get(); + assertEquals(400, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue()); + } + + public void testIndexDirIsDeletedWhenShardRemoved() throws Exception { + Environment env = getInstanceFromNode(Environment.class); + Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); + logger.info("--> idxPath: [{}]", idxPath); + Settings idxSettings = Settings.builder() + .put(IndexMetaData.SETTING_DATA_PATH, idxPath) + .build(); + createIndex("test", idxSettings); + ensureGreen("test"); + client().prepareIndex("test", "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse response = client().prepareSearch("test").get(); + assertHitCount(response, 1L); + client().admin().indices().prepareDelete("test").get(); + assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class))); + assertPathHasBeenCleared(idxPath); + } + + public void testExpectedShardSizeIsPresent() throws InterruptedException { + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + for (int i = 0; i < 50; i++) { + client().prepareIndex("test", "test").setSource("{}").get(); + } + ensureGreen("test"); + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + clusterInfoService.refresh(); + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test") + .getShards().get(0).primaryShard()); + assertNotNull(test); + assertTrue(test > 0); + } + + public void testIndexCanChangeCustomDataPath() throws Exception { + Environment env = getInstanceFromNode(Environment.class); + Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); + final String INDEX = "idx"; + Path startDir = idxPath.resolve("start-" + randomAsciiOfLength(10)); + Path endDir = idxPath.resolve("end-" + randomAsciiOfLength(10)); + logger.info("--> start dir: [{}]", startDir.toAbsolutePath().toString()); + logger.info("--> end dir: [{}]", endDir.toAbsolutePath().toString()); + // temp dirs are automatically created, but the end dir is what + // startDir is going to be renamed as, so it needs to be deleted + // otherwise we get all sorts of errors about the directory + // already existing + IOUtils.rm(endDir); + + Settings sb = Settings.builder() + .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString()) + .build(); + Settings sb2 = Settings.builder() + .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString()) + .build(); + + logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString()); + createIndex(INDEX, sb); + ensureGreen(INDEX); + client().prepareIndex(INDEX, "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get(); + + SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + + logger.info("--> closing the index [{}]", INDEX); + client().admin().indices().prepareClose(INDEX).get(); + logger.info("--> index closed, re-opening..."); + client().admin().indices().prepareOpen(INDEX).get(); + logger.info("--> index re-opened"); + ensureGreen(INDEX); + + resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + + // Now, try closing and changing the settings + + logger.info("--> closing the index [{}]", INDEX); + client().admin().indices().prepareClose(INDEX).get(); + + logger.info("--> moving data on disk [{}] to [{}]", startDir.getFileName(), endDir.getFileName()); + assert Files.exists(endDir) == false : "end directory should not exist!"; + Files.move(startDir, endDir, StandardCopyOption.REPLACE_EXISTING); + + logger.info("--> updating settings..."); + client().admin().indices().prepareUpdateSettings(INDEX) + .setSettings(sb2) + .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) + .get(); + + assert Files.exists(startDir) == false : "start dir shouldn't exist"; + + logger.info("--> settings updated and files moved, re-opening index"); + client().admin().indices().prepareOpen(INDEX).get(); + logger.info("--> index re-opened"); + ensureGreen(INDEX); + + resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); + assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); + + assertAcked(client().admin().indices().prepareDelete(INDEX)); + assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class))); + assertPathHasBeenCleared(startDir.toAbsolutePath()); + assertPathHasBeenCleared(endDir.toAbsolutePath()); + } + + public void testMaybeFlush() throws Exception { + createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST) + .build()); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + IndexShard shard = test.getShardOrNull(0); + assertFalse(shard.shouldFlush()); + client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), + new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + assertFalse(shard.shouldFlush()); + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), + new BytesArray(new byte[]{1}), null); + Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); + shard.index(index); + assertTrue(shard.shouldFlush()); + assertEquals(2, shard.getEngine().getTranslog().totalOperations()); + client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + assertBusy(() -> { // this is async + assertFalse(shard.shouldFlush()); + }); + assertEquals(0, shard.getEngine().getTranslog().totalOperations()); + shard.getEngine().getTranslog().sync(); + long size = shard.getEngine().getTranslog().sizeInBytes(); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), + shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); + client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( + IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) + .build()).get(); + client().prepareDelete("test", "test", "2").get(); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), + shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); + assertBusy(() -> { // this is async + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), + shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); + assertFalse(shard.shouldFlush()); + }); + assertEquals(0, shard.getEngine().getTranslog().totalOperations()); + } + + public void testStressMaybeFlush() throws Exception { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + assertFalse(shard.shouldFlush()); + client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( + IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), + new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + assertFalse(shard.shouldFlush()); + final AtomicBoolean running = new AtomicBoolean(true); + final int numThreads = randomIntBetween(2, 4); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + while (running.get()) { + shard.maybeFlush(); + } + } + }; + threads[i].start(); + } + barrier.await(); + FlushStats flushStats = shard.flushStats(); + long total = flushStats.getTotal(); + client().prepareIndex("test", "test", "1").setSource("{}").get(); + assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal())); + running.set(false); + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + assertEquals(total + 1, shard.flushStats().getTotal()); + } + + public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService(resolveIndex("test")); + IndexShard shard = indexService.getShardOrNull(0); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get(); + client().prepareDelete("test", "test", "0").get(); + client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); + + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {}; + shard.close("simon says", false); + AtomicReference shardRef = new AtomicReference<>(); + List failures = new ArrayList<>(); + IndexingOperationListener listener = new IndexingOperationListener() { + + @Override + public void postIndex(Engine.Index index, boolean created) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Exception e) { + failures.add(e); + throw e; + } + } + + + @Override + public void postDelete(Engine.Delete delete) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Exception e) { + failures.add(e); + throw e; + } + } + }; + final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener); + shardRef.set(newShard); + recoverShard(newShard); + + try { + ExceptionsHelper.rethrowAndSuppress(failures); + } finally { + newShard.close("just do it", randomBoolean()); + } + } + + + public static final IndexShard recoverShard(IndexShard newShard) throws IOException { + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + return newShard; + } + + public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, + IndexingOperationListener... listeners) throws IOException { + ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); + IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), + shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), + indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, + indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)); + return newShard; + } + + private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { + ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(), + existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING, + existingShardRouting.allocationId()); + shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery"), + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE); + return shardRouting; + } +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b4725a8506d..c0375b2f98b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -30,45 +30,28 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.Constants; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; -import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.TransportIndexAction; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; -import org.elasticsearch.cluster.routing.RecoverySource.LocalShardsRecoverySource; -import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; -import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -76,21 +59,13 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.env.ShardLock; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; @@ -101,9 +76,9 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogTests; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -112,20 +87,14 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.DummyShardLock; -import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.FieldMaskingReader; -import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -140,38 +109,23 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; /** * Simple unit-test IndexShard related operations. */ -public class IndexShardTests extends ESSingleNodeTestCase { - - @Override - protected Collection> getPlugins() { - return pluginList(InternalSettingsPlugin.class); - } +public class IndexShardTests extends IndexShardTestCase { public void testWriteShardState() throws Exception { try (NodeEnvironment env = newNodeEnvironment()) { @@ -197,73 +151,41 @@ public class IndexShardTests extends ESSingleNodeTestCase { } } - public void testLockTryingToDelete() throws Exception { - createIndex("test"); - ensureGreen(); - NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); - - ClusterService cs = getInstanceFromNode(ClusterService.class); - final Index index = cs.state().metaData().index("test").getIndex(); - Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0)); - logger.info("--> paths: [{}]", (Object)shardPaths); - // Should not be able to acquire the lock because it's already open - try { - NodeEnvironment.acquireFSLockForPaths(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), shardPaths); - fail("should not have been able to acquire the lock"); - } catch (LockObtainFailedException e) { - assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); - } - // Test without the regular shard lock to assume we can acquire it - // (worst case, meaning that the shard lock could be acquired and - // we're green to delete the shard's directory) - ShardLock sLock = new DummyShardLock(new ShardId(index, 0)); - try { - env.deleteShardDirectoryUnderLock(sLock, IndexSettingsModule.newIndexSettings("test", Settings.EMPTY)); - fail("should not have been able to delete the directory"); - } catch (LockObtainFailedException e) { - assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); - } - } - public void testPersistenceStateMetadataPersistence() throws Exception { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); - ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); + IndexShard shard = newStartedShard(); + final Path shardStatePath = shard.shardPath().getShardStatePath(); + ShardStateMetaData shardStateMetaData = load(logger, shardStatePath); assertEquals(getShardStateMetadata(shard), shardStateMetaData); ShardRouting routing = shard.shardRouting; shard.updateRoutingEntry(routing); - shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); + shardStateMetaData = load(logger, shardStatePath); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); - assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); + assertEquals(shardStateMetaData, + new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); routing = TestShardRouting.relocate(shard.shardRouting, "some node", 42L); shard.updateRoutingEntry(routing); - shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); + shardStateMetaData = load(logger, shardStatePath); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); - assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); + assertEquals(shardStateMetaData, + new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); + closeShards(shard); } public void testFailShard() throws Exception { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); + IndexShard shard = newStartedShard(); + final ShardPath shardPath = shard.shardPath(); + assertNotNull(shardPath); // fail shard shard.failShard("test shard fail", new CorruptIndexException("", "")); + closeShards(shard); // check state file still exists - ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); + ShardStateMetaData shardStateMetaData = load(logger, shardPath.getShardStatePath()); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); - ShardPath shardPath = ShardPath.loadShardPath(logger, env, shard.shardId(), test.getIndexSettings()); - assertNotNull(shardPath); // but index can't be opened for a failed shard - assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(), env::shardLock), + assertThat("store index should be corrupted", Store.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(), + (shardId, lockTimeoutMS) -> new DummyShardLock(shardId)), equalTo(false)); } @@ -286,10 +208,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testShardStateMetaHashCodeEquals() { AllocationId allocationId = randomBoolean() ? null : randomAllocationId(); - ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId); + ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), + randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId); assertEquals(meta, new ShardStateMetaData(meta.legacyVersion, meta.primary, meta.indexUUID, meta.allocationId)); - assertEquals(meta.hashCode(), new ShardStateMetaData(meta.legacyVersion, meta.primary, meta.indexUUID, meta.allocationId).hashCode()); + assertEquals(meta.hashCode(), + new ShardStateMetaData(meta.legacyVersion, meta.primary, meta.indexUUID, meta.allocationId).hashCode()); assertFalse(meta.equals(new ShardStateMetaData(meta.legacyVersion, !meta.primary, meta.indexUUID, meta.allocationId))); assertFalse(meta.equals(new ShardStateMetaData(meta.legacyVersion + 1, meta.primary, meta.indexUUID, meta.allocationId))); @@ -298,20 +222,17 @@ public class IndexShardTests extends ESSingleNodeTestCase { Set hashCodes = new HashSet<>(); for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode allocationId = randomBoolean() ? null : randomAllocationId(); - meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId); + meta = new ShardStateMetaData(randomLong(), randomBoolean(), + randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId); hashCodes.add(meta.hashCode()); } assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1); } - public void testDeleteIndexPreventsNewOperations() throws InterruptedException, ExecutionException, IOException { - assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); - ensureGreen("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); - IndexShard indexShard = indexService.getShardOrNull(0); - client().admin().indices().prepareDelete("test").get(); + public void testClosesPreventsNewOperations() throws InterruptedException, ExecutionException, IOException { + IndexShard indexShard = newStartedShard(); + closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); @@ -328,35 +249,27 @@ public class IndexShardTests extends ESSingleNodeTestCase { } public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { - assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); - ensureGreen("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); - IndexShard indexShard = indexService.getShardOrNull(0); - long primaryTerm = indexShard.getPrimaryTerm(); + final ShardId shardId = new ShardId("test", "_na_", 0); + final IndexShard indexShard; - ShardRouting temp = indexShard.routingEntry(); - final ShardRouting newPrimaryShardRouting; if (randomBoolean()) { // relocation target - newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), "other node", - true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(temp.allocationId())); + indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node", + true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); } else if (randomBoolean()) { // simulate promotion - ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null, - false, ShardRoutingState.STARTED, temp.allocationId()); - indexShard.updateRoutingEntry(newReplicaShardRouting); - primaryTerm = primaryTerm + 1; - indexShard.updatePrimaryTerm(primaryTerm); - newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null, - true, ShardRoutingState.STARTED, temp.allocationId()); + indexShard = newShard(shardId, false); + ShardRouting replicaRouting = indexShard.routingEntry(); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, + true, ShardRoutingState.STARTED, replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); } else { - newPrimaryShardRouting = temp; + indexShard = newStartedShard(true); } - indexShard.updateRoutingEntry(newPrimaryShardRouting); - + final long primaryTerm = indexShard.getPrimaryTerm(); assertEquals(0, indexShard.getActiveOperationsCount()); - if (newPrimaryShardRouting.isRelocationTarget() == false) { + if (indexShard.routingEntry().isRelocationTarget() == false) { try { indexShard.acquireReplicaOperationLock(primaryTerm, null, ThreadPool.Names.INDEX); fail("shard shouldn't accept operations as replica"); @@ -371,6 +284,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { Releasables.close(operation1, operation2); assertEquals(0, indexShard.getActiveOperationsCount()); + + closeShards(indexShard); } private Releasable acquirePrimaryOperationLockBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { @@ -379,56 +294,52 @@ public class IndexShardTests extends ESSingleNodeTestCase { return fut.get(); } - private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { + private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm) + throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); indexShard.acquireReplicaOperationLock(opPrimaryTerm, fut, ThreadPool.Names.INDEX); return fut.get(); } public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException { - assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); - ensureGreen("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); - IndexShard indexShard = indexService.getShardOrNull(0); - long primaryTerm = indexShard.getPrimaryTerm(); + final ShardId shardId = new ShardId("test", "_na_", 0); + final IndexShard indexShard; - // ugly hack to allow the shard to operated as a replica - final ShardRouting temp = indexShard.routingEntry(); - final ShardRouting newShardRouting; switch (randomInt(2)) { case 0: // started replica - newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null, - false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId())); - - indexShard.updateRoutingEntry(newShardRouting); + indexShard = newStartedShard(false); break; - case 1: + case 1: { // initializing replica / primary final boolean relocating = randomBoolean(); - newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), + ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node", relocating ? "sourceNode" : null, relocating ? randomBoolean() : false, ShardRoutingState.INITIALIZING, - relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId()); - indexShard.updateRoutingEntry(newShardRouting); + relocating ? AllocationId.newRelocation(AllocationId.newInitializing()) : AllocationId.newInitializing()); + indexShard = newShard(routing); break; - case 2: + } + case 2: { // relocation source - newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), "otherNode", - false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId())); - indexShard.updateRoutingEntry(newShardRouting); + indexShard = newStartedShard(false); + ShardRouting routing = indexShard.routingEntry(); + routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", + false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); + indexShard.updateRoutingEntry(routing); indexShard.relocated("test"); break; + } default: throw new UnsupportedOperationException("get your numbers straight"); } - logger.info("updated shard routing to {}", newShardRouting); + final ShardRouting shardRouting = indexShard.routingEntry(); + logger.info("shard routing to {}", shardRouting); assertEquals(0, indexShard.getActiveOperationsCount()); - if (newShardRouting.primary() == false) { + if (shardRouting.primary() == false) { try { indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); fail("shard shouldn't accept primary ops"); @@ -437,6 +348,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { } } + final long primaryTerm = indexShard.getPrimaryTerm(); + Releasable operation1 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); assertEquals(1, indexShard.getActiveOperationsCount()); Releasable operation2 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); @@ -454,23 +367,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { acquireReplicaOperationLockBlockingly(indexShard, primaryTerm + 1 + randomInt(20)).close(); Releasables.close(operation1, operation2); assertEquals(0, indexShard.getActiveOperationsCount()); - } - public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { - assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); - client().prepareIndex("test", "test").setSource("{}").get(); - ensureGreen("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0); - assertBusy(() -> { - IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test"); - assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); - indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0); - } - ); - IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); - assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); + closeShards(indexShard); } public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws IOException { @@ -483,47 +381,40 @@ public class IndexShardTests extends ESSingleNodeTestCase { } public void testAcquireIndexCommit() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); + final IndexShard shard = newStartedShard(); int numDocs = randomInt(20); for (int i = 0; i < numDocs; i++) { - client().prepareIndex("test", "type", "id_" + i).setSource("{}").get(); + indexDoc(shard, "type", "id_" + i); } final boolean flushFirst = randomBoolean(); IndexCommit commit = shard.acquireIndexCommit(flushFirst); int moreDocs = randomInt(20); for (int i = 0; i < moreDocs; i++) { - client().prepareIndex("test", "type", "id_" + numDocs + i).setSource("{}").get(); + indexDoc(shard, "type", "id_" + numDocs + i); } - shard.flush(new FlushRequest("index")); + flushShard(shard); // check that we can still read the commit that we captured try (IndexReader reader = DirectoryReader.open(commit)) { assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0)); } shard.releaseIndexCommit(commit); - shard.flush(new FlushRequest("index").force(true)); + flushShard(shard, true); + // check it's clean up assertThat(DirectoryReader.listCommits(shard.store().directory()), hasSize(1)); + + closeShards(shard); } /*** * test one can snapshot the store at various lifecycle stages */ public void testSnapshotStore() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - client().admin().indices().prepareFlush().get(); - ShardRouting routing = shard.routingEntry(); - test.removeShard(0, "b/c simon says so"); - routing = routing.reinitializePrimaryShard(); - IndexShard newShard = test.createShard(routing); + final IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0"); + flushShard(shard); + + final IndexShard newShard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); @@ -539,7 +430,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); - newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); @@ -548,48 +439,13 @@ public class IndexShardTests extends ESSingleNodeTestCase { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + + closeShards(newShard); } - public void testDurableFlagHasEffect() { - createIndex("test"); - ensureGreen(); - client().prepareIndex("test", "bar", "1").setSource("{}").get(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); - setDurability(shard, Translog.Durability.REQUEST); - assertFalse(shard.getEngine().getTranslog().syncNeeded()); - setDurability(shard, Translog.Durability.ASYNC); - client().prepareIndex("test", "bar", "2").setSource("{}").get(); - assertTrue(shard.getEngine().getTranslog().syncNeeded()); - setDurability(shard, Translog.Durability.REQUEST); - client().prepareDelete("test", "bar", "1").get(); - assertFalse(shard.getEngine().getTranslog().syncNeeded()); - setDurability(shard, Translog.Durability.ASYNC); - client().prepareDelete("test", "bar", "2").get(); - assertTrue(shard.getEngine().getTranslog().syncNeeded()); - setDurability(shard, Translog.Durability.REQUEST); - assertNoFailures(client().prepareBulk() - .add(client().prepareIndex("test", "bar", "3").setSource("{}")) - .add(client().prepareDelete("test", "bar", "1")).get()); - assertFalse(shard.getEngine().getTranslog().syncNeeded()); - - setDurability(shard, Translog.Durability.ASYNC); - assertNoFailures(client().prepareBulk() - .add(client().prepareIndex("test", "bar", "4").setSource("{}")) - .add(client().prepareDelete("test", "bar", "3")).get()); - setDurability(shard, Translog.Durability.REQUEST); - assertTrue(shard.getEngine().getTranslog().syncNeeded()); - } - - public void testAsyncFsync() throws InterruptedException { - createIndex("test"); - ensureGreen(); - client().prepareIndex("test", "bar", "1").setSource("{}").get(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); + public void testAsyncFsync() throws InterruptedException, IOException { + IndexShard shard = newStartedShard(); Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); Thread[] thread = new Thread[randomIntBetween(3, 5)]; CountDownLatch latch = new CountDownLatch(thread.length); @@ -607,7 +463,9 @@ public class IndexShardTests extends ESSingleNodeTestCase { } catch (Exception ex) { throw new RuntimeException(ex); } - }; + } + + ; }; thread[i].start(); } @@ -616,170 +474,37 @@ public class IndexShardTests extends ESSingleNodeTestCase { thread[i].join(); } assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); + + closeShards(shard); } - private void setDurability(IndexShard shard, Translog.Durability durability) { - client().admin().indices().prepareUpdateSettings(shard.shardId.getIndexName()).setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability.name()).build()).get(); - assertEquals(durability, shard.getTranslogDurability()); - } - - public void testMinimumCompatVersion() { + public void testMinimumCompatVersion() throws IOException { Version versionCreated = VersionUtils.randomVersion(random()); - assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id)); - client().prepareIndex("test", "test").setSource("{}").get(); - ensureGreen("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard test = indicesService.indexService(resolveIndex("test")).getShardOrNull(0); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, versionCreated.id) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard test = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoveryShardFromStore(test); + + indexDoc(test, "test", "test"); assertEquals(versionCreated.luceneVersion, test.minimumCompatibleVersion()); - client().prepareIndex("test", "test").setSource("{}").get(); + indexDoc(test, "test", "test"); assertEquals(versionCreated.luceneVersion, test.minimumCompatibleVersion()); test.getEngine().flush(); assertEquals(Version.CURRENT.luceneVersion, test.minimumCompatibleVersion()); + + closeShards(test); } - public void testUpdatePriority() { - assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(IndexMetaData.SETTING_PRIORITY, 200)); - IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test")); - assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400).build()).get(); - assertEquals(400, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue()); - } - - public void testRecoverIntoLeftover() throws IOException { - createIndex("test"); - ensureGreen("test"); - client().prepareIndex("test", "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get(); - client().admin().indices().prepareFlush("test").get(); - SearchResponse response = client().prepareSearch("test").get(); - assertHitCount(response, 1L); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); - ShardPath shardPath = shard.shardPath(); - Path dataPath = shardPath.getDataPath(); - client().admin().indices().prepareClose("test").get(); - Path tempDir = createTempDir(); - Files.move(dataPath, tempDir.resolve("test")); - client().admin().indices().prepareDelete("test").get(); - Files.createDirectories(dataPath.getParent()); - Files.move(tempDir.resolve("test"), dataPath); - createIndex("test"); - ensureGreen("test"); - response = client().prepareSearch("test").get(); - assertHitCount(response, 0L); - } - - public void testIndexDirIsDeletedWhenShardRemoved() throws Exception { - Environment env = getInstanceFromNode(Environment.class); - Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); - logger.info("--> idxPath: [{}]", idxPath); - Settings idxSettings = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, idxPath) - .build(); - createIndex("test", idxSettings); - ensureGreen("test"); - client().prepareIndex("test", "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get(); - SearchResponse response = client().prepareSearch("test").get(); - assertHitCount(response, 1L); - client().admin().indices().prepareDelete("test").get(); - assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class))); - assertPathHasBeenCleared(idxPath); - } - - public void testExpectedShardSizeIsPresent() throws InterruptedException { - assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); - for (int i = 0; i < 50; i++) { - client().prepareIndex("test", "test").setSource("{}").get(); - } - ensureGreen("test"); - InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); - clusterInfoService.refresh(); - ClusterState state = getInstanceFromNode(ClusterService.class).state(); - Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test").getShards().get(0).primaryShard()); - assertNotNull(test); - assertTrue(test > 0); - } - - public void testIndexCanChangeCustomDataPath() throws Exception { - Environment env = getInstanceFromNode(Environment.class); - Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); - final String INDEX = "idx"; - Path startDir = idxPath.resolve("start-" + randomAsciiOfLength(10)); - Path endDir = idxPath.resolve("end-" + randomAsciiOfLength(10)); - logger.info("--> start dir: [{}]", startDir.toAbsolutePath().toString()); - logger.info("--> end dir: [{}]", endDir.toAbsolutePath().toString()); - // temp dirs are automatically created, but the end dir is what - // startDir is going to be renamed as, so it needs to be deleted - // otherwise we get all sorts of errors about the directory - // already existing - IOUtils.rm(endDir); - - Settings sb = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString()) - .build(); - Settings sb2 = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString()) - .build(); - - logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString()); - createIndex(INDEX, sb); - ensureGreen(INDEX); - client().prepareIndex(INDEX, "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get(); - - SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); - assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); - - logger.info("--> closing the index [{}]", INDEX); - client().admin().indices().prepareClose(INDEX).get(); - logger.info("--> index closed, re-opening..."); - client().admin().indices().prepareOpen(INDEX).get(); - logger.info("--> index re-opened"); - ensureGreen(INDEX); - - resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); - assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); - - // Now, try closing and changing the settings - - logger.info("--> closing the index [{}]", INDEX); - client().admin().indices().prepareClose(INDEX).get(); - - logger.info("--> moving data on disk [{}] to [{}]", startDir.getFileName(), endDir.getFileName()); - assert Files.exists(endDir) == false : "end directory should not exist!"; - Files.move(startDir, endDir, StandardCopyOption.REPLACE_EXISTING); - - logger.info("--> updating settings..."); - client().admin().indices().prepareUpdateSettings(INDEX) - .setSettings(sb2) - .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) - .get(); - - assert Files.exists(startDir) == false : "start dir shouldn't exist"; - - logger.info("--> settings updated and files moved, re-opening index"); - client().admin().indices().prepareOpen(INDEX).get(); - logger.info("--> index re-opened"); - ensureGreen(INDEX); - - resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get(); - assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L)); - - assertAcked(client().admin().indices().prepareDelete(INDEX)); - assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class))); - assertPathHasBeenCleared(startDir.toAbsolutePath()); - assertPathHasBeenCleared(endDir.toAbsolutePath()); - } public void testShardStats() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); - ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(indicesService.getIndicesQueryCache(), shard, new CommonStatsFlags()), shard.commitStats()); + IndexShard shard = newStartedShard(); + ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), + new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()), shard.commitStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); @@ -790,7 +515,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { StreamInput in = out.bytes().streamInput(); stats = ShardStats.readShardStats(in); } - XContentBuilder builder = XContentFactory.jsonBuilder(); + XContentBuilder builder = jsonBuilder(); builder.startObject(); stats.toXContent(builder, EMPTY_PARAMS); builder.endObject(); @@ -802,9 +527,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { expectedSubSequence.append("\",\"is_custom_data_path\":").append(shard.shardPath().isCustomDataPath()).append("}"); assumeFalse("Some path weirdness on windows", Constants.WINDOWS); assertTrue(xContent.contains(expectedSubSequence)); + + closeShards(shard); } - private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { + private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, + ParseContext.Document document, BytesReference source, Mapping mappingUpdate) { Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE); Field versionField = new NumericDocValuesField("_version", 0); document.add(uidField); @@ -813,12 +541,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { } public void testIndexingOperationsListeners() throws IOException { - createIndex("test_iol"); - ensureGreen(); - client().prepareIndex("test_iol", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test_iol")); - IndexShard shard = test.getShardOrNull(0); + IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); AtomicInteger preIndex = new AtomicInteger(); AtomicInteger postIndexCreate = new AtomicInteger(); AtomicInteger postIndexUpdate = new AtomicInteger(); @@ -827,7 +551,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { AtomicInteger postDelete = new AtomicInteger(); AtomicInteger postDeleteException = new AtomicInteger(); shard.close("simon says", true); - shard = reinitWithWrapper(test, shard, null, new IndexingOperationListener() { + shard = reinitShard(shard, new IndexingOperationListener() { @Override public Engine.Index preIndex(Engine.Index operation) { preIndex.incrementAndGet(); @@ -836,7 +560,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void postIndex(Engine.Index index, boolean created) { - if(created) { + if (created) { postIndexCreate.incrementAndGet(); } else { postIndexUpdate.incrementAndGet(); @@ -865,8 +589,10 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }); + recoveryShardFromStore(shard); - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), + new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); shard.index(index); assertEquals(1, preIndex.get()); @@ -928,95 +654,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(1, preDelete.get()); assertEquals(1, postDelete.get()); assertEquals(0, postDeleteException.get()); - } - public void testMaybeFlush() throws Exception { - createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST).build()); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexShard shard = test.getShardOrNull(0); - assertFalse(shard.shouldFlush()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - assertFalse(shard.shouldFlush()); - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); - Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.index(index); - assertTrue(shard.shouldFlush()); - assertEquals(2, shard.getEngine().getTranslog().totalOperations()); - client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - assertBusy(() -> { // this is async - assertFalse(shard.shouldFlush()); - }); - assertEquals(0, shard.getEngine().getTranslog().totalOperations()); - shard.getEngine().getTranslog().sync(); - long size = shard.getEngine().getTranslog().sizeInBytes(); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) - .build()).get(); - client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); - assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); - assertFalse(shard.shouldFlush()); - }); - assertEquals(0, shard.getEngine().getTranslog().totalOperations()); - } - - public void testStressMaybeFlush() throws Exception { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - assertFalse(shard.shouldFlush()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - assertFalse(shard.shouldFlush()); - final AtomicBoolean running = new AtomicBoolean(true); - final int numThreads = randomIntBetween(2, 4); - Thread[] threads = new Thread[numThreads]; - CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - while (running.get()) { - shard.maybeFlush(); - } - } - }; - threads[i].start(); - } - barrier.await(); - FlushStats flushStats = shard.flushStats(); - long total = flushStats.getTotal(); - client().prepareIndex("test", "test", "1").setSource("{}").get(); - assertBusy(() -> { - assertEquals(total + 1, shard.flushStats().getTotal()); - }); - running.set(false); - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - assertEquals(total + 1, shard.flushStats().getTotal()); + closeShards(shard); } public void testLockingBeforeAndAfterRelocated() throws Exception { - assertAcked(client().admin().indices().prepareCreate("test").setSettings( - Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) - ).get()); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - assertBusy(() -> assertThat(shard.state(), equalTo(IndexShardState.STARTED))); + final IndexShard shard = newStartedShard(true); CountDownLatch latch = new CountDownLatch(1); Thread recoveryThread = new Thread(() -> { latch.countDown(); @@ -1041,17 +684,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { // lock can again be acquired assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); } + + closeShards(shard); } public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { - assertAcked(client().admin().indices().prepareCreate("test").setSettings( - Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) - ).get()); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - assertBusy(() -> assertThat(shard.state(), equalTo(IndexShardState.STARTED))); + final IndexShard shard = newStartedShard(true); Thread recoveryThread = new Thread(() -> { try { shard.relocated("simulated recovery"); @@ -1079,16 +717,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { } recoveryThread.join(); + + closeShards(shard); } public void testStressRelocated() throws Exception { - assertAcked(client().admin().indices().prepareCreate("test").setSettings( - Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) - ).get()); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); + final IndexShard shard = newStartedShard(true); final int numThreads = randomIntBetween(2, 4); Thread[] indexThreads = new Thread[numThreads]; CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads); @@ -1136,85 +770,69 @@ public class IndexShardTests extends ESSingleNodeTestCase { for (Thread indexThread : indexThreads) { indexThread.join(); } + + closeShards(shard); } public void testRecoverFromStore() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); + final IndexShard shard = newStartedShard(true); int translogOps = 1; - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + indexDoc(shard, "test", "0"); if (randomBoolean()) { - client().admin().indices().prepareFlush().get(); + flushShard(shard); translogOps = 0; } - ShardRouting routing = shard.routingEntry(); - test.removeShard(0, "b/c simon says so"); - routing = ShardRoutingHelper.reinitPrimary(routing); - IndexShard newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing); + IndexShard newShard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - newShard.updateRoutingEntry(routing.moveToStarted()); - SearchResponse response = client().prepareSearch().get(); - assertHitCount(response, 1); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 1); + closeShards(newShard); } public void testRecoverFromCleanStore() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + final IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0"); if (randomBoolean()) { - client().admin().indices().prepareFlush().get(); + flushShard(shard); } - ShardRouting routing = shard.routingEntry(); - test.removeShard(0, "b/c simon says so"); - routing = ShardRoutingHelper.reinitPrimary(routing, UnassignedInfo.Reason.INDEX_CREATED, StoreRecoverySource.EMPTY_STORE_INSTANCE); - IndexShard newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing); + final ShardRouting shardRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(shard, + ShardRoutingHelper.initWithSameId(shardRouting, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE) + ); + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); - SearchResponse response = client().prepareSearch().get(); - assertHitCount(response, 0); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 0); + closeShards(newShard); } public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); - - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + final IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0"); if (randomBoolean()) { - client().admin().indices().prepareFlush().get(); + flushShard(shard); } - final ShardRouting origRouting = shard.routingEntry(); - ShardRouting routing = origRouting; + Store store = shard.store(); store.incRef(); - test.removeShard(0, "b/c simon says so"); + closeShards(shard); cleanLuceneIndex(store.directory()); store.decRef(); - routing = ShardRoutingHelper.reinitPrimary(routing); - IndexShard newShard = test.createShard(routing); + IndexShard newShard = reinitShard(shard); + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + ShardRouting routing = newShard.routingEntry(); newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); try { newShard.recoverFromStore(); @@ -1224,7 +842,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { } routing = ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so")); - routing = ShardRoutingHelper.initialize(routing, origRouting.currentNodeId()); + routing = ShardRoutingHelper.initialize(routing, newShard.routingEntry().currentNodeId()); assertTrue("it's already recovering, we should ignore new ones", newShard.ignoreRecoveryAttempt()); try { newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); @@ -1232,30 +850,25 @@ public class IndexShardTests extends ESSingleNodeTestCase { } catch (IllegalIndexShardStateException e) { // OK! } - test.removeShard(0, "I broken it"); - routing = routing.updateUnassigned(routing.unassignedInfo(), StoreRecoverySource.EMPTY_STORE_INSTANCE); - newShard = test.createShard(routing); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + + newShard = reinitShard(newShard, + ShardRoutingHelper.initWithSameId(routing, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE)); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); - newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); - SearchResponse response = client().prepareSearch().get(); - assertHitCount(response, 0); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 0); // we can't issue this request through a client because of the inconsistencies we created with the cluster state // doing it directly instead - IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request(); - request.process(null, false, "test"); - TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null); + indexDoc(newShard, "test", "0"); newShard.refresh("test"); - assertHitCount(client().prepareSearch().get(), 1); + assertDocCount(newShard, 1); + + closeShards(newShard); } public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException, IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - final IndexShard shard = test.getShardOrNull(0); + final IndexShard shard = newStartedShard(true); ShardRouting origRouting = shard.routingEntry(); assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); @@ -1267,37 +880,37 @@ public class IndexShardTests extends ESSingleNodeTestCase { fail("Expected IndexShardRelocatedException"); } catch (IndexShardRelocatedException expected) { } + + closeShards(shard); } public void testRestoreShard() throws IOException { - createIndex("test"); - createIndex("test_target"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); - IndexService test_target = indicesService.indexService(resolveIndex("test_target")); - final IndexShard test_shard = test.getShardOrNull(0); + final IndexShard source = newStartedShard(true); + IndexShard target = newStartedShard(true); - client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); - client().prepareIndex("test_target", "test", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get(); - assertHitCount(client().prepareSearch("test_target").get(), 1); - assertSearchHits(client().prepareSearch("test_target").get(), "1"); - client().admin().indices().prepareFlush("test").get(); // only flush test - final ShardRouting origRouting = test_target.getShardOrNull(0).routingEntry(); + indexDoc(source, "test", "0"); + if (randomBoolean()) { + source.refresh("test"); + } + indexDoc(target, "test", "1"); + target.refresh("test"); + assertDocs(target, new Uid("test", "1")); + flushShard(source); // only flush source + final ShardRouting origRouting = target.routingEntry(); ShardRouting routing = ShardRoutingHelper.reinitPrimary(origRouting); final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); - routing = ShardRoutingHelper.newWithRestoreSource(routing, new SnapshotRecoverySource(snapshot, Version.CURRENT, "test")); - test_target.removeShard(0, "just do it man!"); - final IndexShard test_target_shard = test_target.createShard(routing); - Store sourceStore = test_shard.store(); - Store targetStore = test_target_shard.store(); + routing = ShardRoutingHelper.newWithRestoreSource(routing, + new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test")); + target = reinitShard(target, routing); + Store sourceStore = source.store(); + Store targetStore = target.store(); - test_target_shard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - test_target_shard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository("test") { + target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override - public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { try { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { @@ -1312,19 +925,17 @@ public class IndexShardTests extends ESSingleNodeTestCase { } })); - test_target_shard.updateRoutingEntry(routing.moveToStarted()); - assertHitCount(client().prepareSearch("test_target").get(), 1); - assertSearchHits(client().prepareSearch("test_target").get(), "0"); + target.updateRoutingEntry(routing.moveToStarted()); + assertDocs(target, new Uid("test", "0")); + + closeShards(source, target); } public void testSearcherWrapperIsUsed() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); + IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); + indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}"); + shard.refresh("test"); Engine.GetResult getResult = shard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); assertTrue(getResult.exists()); @@ -1347,35 +958,28 @@ public class IndexShardTests extends ESSingleNodeTestCase { return searcher; } }; - shard.close("simon says", true); - IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); - try { - try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { - TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); - assertEquals(search.totalHits, 0); - search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); - assertEquals(search.totalHits, 1); - } - getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); - assertTrue(getResult.exists()); - assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader - assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); - getResult.release(); - } finally { - newShard.close("just do it", randomBoolean()); + closeShards(shard); + IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper); + + recoveryShardFromStore(newShard); + + try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { + TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); + assertEquals(search.totalHits, 0); + search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); + assertEquals(search.totalHits, 1); } + getResult = newShard.get(new Engine.Get(false, new Term(UidFieldMapper.NAME, Uid.createUid("test", "1")))); + assertTrue(getResult.exists()); + assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader + assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); + getResult.release(); + + closeShards(newShard); } - public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); - client().admin().indices().preparePutMapping("test").setType("test").setSource("foo", "type=text,fielddata=true").get(); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - + public void testSearcherWrapperWorksWithGlobalOrdinals() throws IOException { IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @Override public DirectoryReader wrap(DirectoryReader reader) throws IOException { @@ -1388,46 +992,53 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }; - shard.close("simon says", true); - IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); - try { - // test global ordinals are evicted - MappedFieldType foo = newShard.mapperService().fullName("foo"); - IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); - FieldDataStats before = shard.fieldData().stats("foo"); - assertThat(before.getMemorySizeInBytes(), equalTo(0L)); - FieldDataStats after = null; - try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { - assumeTrue("we have to have more than one segment", searcher.getDirectoryReader().leaves().size() > 1); - ifd.loadGlobal(searcher.getDirectoryReader()); - after = shard.fieldData().stats("foo"); - assertEquals(after.getEvictions(), before.getEvictions()); - // If a field doesn't exist an empty IndexFieldData is returned and that isn't cached: - assertThat(after.getMemorySizeInBytes(), equalTo(0L)); - } - assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); - assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), after.getMemorySizeInBytes()); - newShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - newShard.refresh("test"); - assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes()); - assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); - } finally { - newShard.close("just do it", randomBoolean()); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\", \"fielddata\": true }}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, wrapper); + recoveryShardFromStore(shard); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); + shard.refresh("created segment 1"); + indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}"); + shard.refresh("created segment 2"); + + // test global ordinals are evicted + MappedFieldType foo = shard.mapperService().fullName("foo"); + IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo); + FieldDataStats before = shard.fieldData().stats("foo"); + assertThat(before.getMemorySizeInBytes(), equalTo(0L)); + FieldDataStats after = null; + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + assumeTrue("we have to have more than one segment", searcher.getDirectoryReader().leaves().size() > 1); + ifd.loadGlobal(searcher.getDirectoryReader()); + after = shard.fieldData().stats("foo"); + assertEquals(after.getEvictions(), before.getEvictions()); + // If a field doesn't exist an empty IndexFieldData is returned and that isn't cached: + assertThat(after.getMemorySizeInBytes(), equalTo(0L)); } + assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), after.getMemorySizeInBytes()); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + shard.refresh("test"); + assertEquals(shard.fieldData().stats("foo").getMemorySizeInBytes(), before.getMemorySizeInBytes()); + assertEquals(shard.fieldData().stats("foo").getEvictions(), before.getEvictions()); + + closeShards(shard); } - public void testIndexingOperationListnenersIsInvokedOnRecovery() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get(); - client().prepareDelete("test", "test", "0").get(); - client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {}; - shard.close("simon says", false); + public void testIndexingOperationListenersIsInvokedOnRecovery() throws IOException { + IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); + deleteDoc(shard, "test", "0"); + indexDoc(shard, "test", "1", "{\"foo\" : \"bar\"}"); + shard.refresh("test"); + final AtomicInteger preIndex = new AtomicInteger(); final AtomicInteger postIndex = new AtomicInteger(); final AtomicInteger preDelete = new AtomicInteger(); @@ -1456,85 +1067,28 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }; - final IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper, listener); - try { - IndexingStats indexingStats = newShard.indexingStats(); - // ensure we are not influencing the indexing stats - assertEquals(0, indexingStats.getTotal().getDeleteCount()); - assertEquals(0, indexingStats.getTotal().getDeleteCurrent()); - assertEquals(0, indexingStats.getTotal().getIndexCount()); - assertEquals(0, indexingStats.getTotal().getIndexCurrent()); - assertEquals(0, indexingStats.getTotal().getIndexFailedCount()); - assertEquals(2, preIndex.get()); - assertEquals(2, postIndex.get()); - assertEquals(1, preDelete.get()); - assertEquals(1, postDelete.get()); - } finally { - newShard.close("just do it", randomBoolean()); - } + final IndexShard newShard = reinitShard(shard, listener); + recoveryShardFromStore(newShard); + IndexingStats indexingStats = newShard.indexingStats(); + // ensure we are not influencing the indexing stats + assertEquals(0, indexingStats.getTotal().getDeleteCount()); + assertEquals(0, indexingStats.getTotal().getDeleteCurrent()); + assertEquals(0, indexingStats.getTotal().getIndexCount()); + assertEquals(0, indexingStats.getTotal().getIndexCurrent()); + assertEquals(0, indexingStats.getTotal().getIndexFailedCount()); + assertEquals(2, preIndex.get()); + assertEquals(2, postIndex.get()); + assertEquals(1, preDelete.get()); + assertEquals(1, postDelete.get()); + + closeShards(newShard); } - public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get(); - client().prepareDelete("test", "test", "0").get(); - client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - - IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {}; - shard.close("simon says", false); - AtomicReference shardRef = new AtomicReference<>(); - List failures = new ArrayList<>(); - IndexingOperationListener listener = new IndexingOperationListener() { - - @Override - public void postIndex(Engine.Index index, boolean created) { - try { - assertNotNull(shardRef.get()); - // this is all IMC needs to do - check current memory and refresh - assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); - shardRef.get().refresh("test"); - } catch (Exception e) { - failures.add(e); - throw e; - } - } - - - @Override - public void postDelete(Engine.Delete delete) { - try { - assertNotNull(shardRef.get()); - // this is all IMC needs to do - check current memory and refresh - assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); - shardRef.get().refresh("test"); - } catch (Exception e) { - failures.add(e); - throw e; - } - } - }; - final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener); - shardRef.set(newShard); - recoverShard(newShard); - - try { - ExceptionsHelper.rethrowAndSuppress(failures); - } finally { - newShard.close("just do it", randomBoolean()); - } - } public void testSearchIsReleaseIfWrapperFails() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); - client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); + IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); + shard.refresh("test"); IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { @Override public DirectoryReader wrap(DirectoryReader reader) throws IOException { @@ -1546,180 +1100,147 @@ public class IndexShardTests extends ESSingleNodeTestCase { } }; - shard.close("simon says", true); - IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper); + closeShards(shard); + IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper); + + recoveryShardFromStore(newShard); + try { newShard.acquireSearcher("test"); fail("exception expected"); } catch (RuntimeException ex) { // - } finally { - newShard.close("just do it", randomBoolean()); } + closeShards(newShard); } - public static final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { - IndexShard newShard = newIndexShard(indexService, shard, wrapper, listeners); - return recoverShard(newShard); - } - - public static final IndexShard recoverShard(IndexShard newShard) throws IOException { - DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); - assertTrue(newShard.recoverFromStore()); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); - return newShard; - } - - public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { - ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); - IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), - shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), - indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)); - return newShard; - } - - private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { - ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(), - existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING, - existingShardRouting.allocationId()); - shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery"), - StoreRecoverySource.EXISTING_STORE_INSTANCE); - return shardRouting; - } public void testTranslogRecoverySyncsTranslog() throws IOException { - createIndex("testindexfortranslogsync"); - client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("testindexfortranslogsync")); - IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = getInitializingShardRouting(shard.routingEntry()); - test.removeShard(0, "b/c britta says so"); - IndexShard newShard = test.createShard(routing); - DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("for testing", new RecoveryState(routing, localNode, null)); - List operations = new ArrayList<>(); - operations.add(new Translog.Index("testtype", "1", BytesReference.toBytes(jsonBuilder().startObject().field("foo", "bar").endObject().bytes()))); - newShard.prepareForIndexRecovery(); - newShard.recoveryState().getTranslog().totalOperations(operations.size()); - newShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); - newShard.performBatchRecovery(operations); - assertFalse(newShard.getTranslog().syncNeeded()); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoveryShardFromStore(primary); + + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); + recoverReplica(replica, primary, (shard, discoveryNode) -> + new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { + }) { + @Override + public void indexTranslogOperations(List operations, int totalTranslogOps) { + super.indexTranslogOperations(operations, totalTranslogOps); + assertFalse(replica.getTranslog().syncNeeded()); + } + }, true); + + closeShards(primary, replica); } - public void testIndexingBufferDuringInternalRecovery() throws IOException { - createIndex("index"); - client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("index")); - IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = getInitializingShardRouting(shard.routingEntry()); - test.removeShard(0, "b/c britta says so"); - IndexShard newShard = test.createShard(routing); - newShard.shardRouting = routing; + public void testShardActiveDuringInternalRecovery() throws IOException { + IndexShard shard = newStartedShard(true); + indexDoc(shard, "type", "0"); + shard = reinitShard(shard); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("for testing", new RecoveryState(routing, localNode, null)); + shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, null)); // Shard is still inactive since we haven't started recovering yet - assertFalse(newShard.isActive()); - newShard.prepareForIndexRecovery(); + assertFalse(shard.isActive()); + shard.prepareForIndexRecovery(); // Shard is still inactive since we haven't started recovering yet - assertFalse(newShard.isActive()); - newShard.performTranslogRecovery(true); + assertFalse(shard.isActive()); + shard.performTranslogRecovery(true); // Shard should now be active since we did recover: - assertTrue(newShard.isActive()); + assertTrue(shard.isActive()); + closeShards(shard); } - public void testIndexingBufferDuringPeerRecovery() throws IOException { - createIndex("index"); - client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("index")); - IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = getInitializingShardRouting(shard.routingEntry()); - test.removeShard(0, "b/c britta says so"); - IndexShard newShard = test.createShard(routing); + public void testShardActiveDuringPeerRecovery() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoveryShardFromStore(primary); + + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("for testing", new RecoveryState(routing, localNode, null)); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); // Shard is still inactive since we haven't started recovering yet - assertFalse(newShard.isActive()); - List operations = new ArrayList<>(); - operations.add(new Translog.Index("testtype", "1", BytesReference.toBytes(jsonBuilder().startObject().field("foo", "bar").endObject().bytes()))); - newShard.prepareForIndexRecovery(); - newShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); - // Shard is still inactive since we haven't started recovering yet - assertFalse(newShard.isActive()); - newShard.performBatchRecovery(operations); - // Shard should now be active since we did recover: - assertTrue(newShard.isActive()); + assertFalse(replica.isActive()); + recoverReplica(replica, primary, (shard, discoveryNode) -> + new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { + }) { + @Override + public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp); + // Shard is still inactive since we haven't started recovering yet + assertFalse(replica.isActive()); + + } + + @Override + public void indexTranslogOperations(List operations, int totalTranslogOps) { + super.indexTranslogOperations(operations, totalTranslogOps); + // Shard should now be active since we did recover: + assertTrue(replica.isActive()); + } + }, false); + + closeShards(primary, replica); } public void testRecoverFromLocalShard() throws IOException { - createIndex("index"); - createIndex("index_1"); - createIndex("index_2"); - client().admin().indices().preparePutMapping("index").setType("test").setSource(jsonBuilder().startObject() - .startObject("test") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); - client().prepareIndex("index", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("index", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get(); + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("source") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + + IndexShard sourceShard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoveryShardFromStore(sourceShard); + + indexDoc(sourceShard, "test", "0", "{\"foo\" : \"bar\"}"); + indexDoc(sourceShard, "test", "1", "{\"foo\" : \"bar\"}"); + sourceShard.refresh("test"); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("index_1")); - IndexShard shard = test.getShardOrNull(0); - ShardRouting routing = ShardRoutingHelper.initWithSameId(shard.routingEntry(), LocalShardsRecoverySource.INSTANCE); - test.removeShard(0, "b/c simon says so"); + ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, + ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE); + + final IndexShard targetShard; DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + Map requestedMappingUpdates = ConcurrentCollections.newConcurrentMap(); { - final IndexShard newShard = test.createShard(routing); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + targetShard = newShard(targetRouting); + targetShard.markAsRecovering("store", new RecoveryState(targetShard.routingEntry(), localNode, null)); BiConsumer mappingConsumer = (type, mapping) -> { - try { - client().admin().indices().preparePutMapping().setConcreteIndex(newShard.indexSettings().getIndex()) - .setType(type) - .setSource(mapping.source().string()) - .get(); - } catch (IOException ex) { - throw new ElasticsearchException("failed to stringify mapping source", ex); - } + assertNull(requestedMappingUpdates.put(type, mapping)); }; - expectThrows(IllegalArgumentException.class, () -> { - IndexService index = indicesService.indexService(resolveIndex("index")); - IndexService index_2 = indicesService.indexService(resolveIndex("index_2")); - newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(index.getShard(0), index_2.getShard(0))); - }); - IndexService indexService = indicesService.indexService(resolveIndex("index")); - assertTrue(newShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(indexService.getShard(0)))); - RecoveryState recoveryState = newShard.recoveryState(); + final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true); + recoveryShardFromStore(differentIndex); + expectThrows(IllegalArgumentException.class, () -> { + targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex)); + }); + closeShards(differentIndex); + + assertTrue(targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard))); + RecoveryState recoveryState = targetShard.recoveryState(); assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage()); assertTrue(recoveryState.getIndex().fileDetails().size() > 0); for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { @@ -1729,96 +1250,115 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(file.recovered(), file.length()); } } - routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing); - assertHitCount(client().prepareSearch("index_1").get(), 2); + targetShard.updateRoutingEntry(ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); + assertDocCount(targetShard, 2); } // now check that it's persistent ie. that the added shards are committed { - routing = shard.routingEntry(); - test.removeShard(0, "b/c simon says so"); - routing = ShardRoutingHelper.reinitPrimary(routing); - final IndexShard newShard = test.createShard(routing); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - assertTrue(newShard.recoverFromStore()); - routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing); - assertHitCount(client().prepareSearch("index_1").get(), 2); + final IndexShard newShard = reinitShard(targetShard); + recoveryShardFromStore(newShard); + assertDocCount(newShard, 2); + closeShards(newShard); } - GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("index_1").get(); - ImmutableOpenMap> mappings = mappingsResponse.getMappings(); - assertNotNull(mappings.get("index_1")); - assertNotNull(mappings.get("index_1").get("test")); - assertEquals(mappings.get("index_1").get("test").get().source().string(), "{\"test\":{\"properties\":{\"foo\":{\"type\":\"text\"}}}}"); + assertThat(requestedMappingUpdates, hasKey("test")); + assertThat(requestedMappingUpdates.get("test").get().source().string(), equalTo("{\"properties\":{\"foo\":{\"type\":\"text\"}}}")); + closeShards(sourceShard, targetShard); } /** A dummy repository for testing which just needs restore overridden */ private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { private final String indexName; + public RestoreOnlyRepository(String indexName) { super(Settings.EMPTY); this.indexName = indexName; } + @Override - protected void doStart() {} + protected void doStart() { + } + @Override - protected void doStop() {} + protected void doStop() { + } + @Override - protected void doClose() {} + protected void doClose() { + } + @Override public RepositoryMetaData getMetadata() { return null; } + @Override public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { return null; } + @Override public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { return null; } + @Override public RepositoryData getRepositoryData() { Map> map = new HashMap<>(); - map.put(new IndexId(indexName, "blah"), Collections.emptySet()); + map.put(new IndexId(indexName, "blah"), emptySet()); return new RepositoryData(Collections.emptyList(), map); } + @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) {} + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + } + @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures) { return null; } + @Override - public void deleteSnapshot(SnapshotId snapshotId) {} + public void deleteSnapshot(SnapshotId snapshotId) { + } + @Override public long getSnapshotThrottleTimeInNanos() { return 0; } + @Override public long getRestoreThrottleTimeInNanos() { return 0; } + @Override public String startVerification() { return null; } + @Override - public void endVerification(String verificationToken) {} + public void endVerification(String verificationToken) { + } + @Override public boolean isReadOnly() { return false; } + @Override - public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {} + public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { return null; } + @Override - public void verify(String verificationToken, DiscoveryNode localNode) {} + public void verify(String verificationToken, DiscoveryNode localNode) { + } } public static Engine getEngineFromShard(IndexShard shard) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/ShardUtilsTests.java b/core/src/test/java/org/elasticsearch/index/shard/ShardUtilsTests.java index e960622d1c1..34c1789824e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/ShardUtilsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/ShardUtilsTests.java @@ -28,6 +28,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -64,4 +65,7 @@ public class ShardUtilsTests extends ESTestCase { IOUtils.close(writer, dir); } + public static Engine getShardEngine(IndexShard shard) { + return shard.getEngine(); + } } diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 858e046d67c..2d0e4a3aeb9 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; @@ -31,7 +30,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardTests; +import org.elasticsearch.index.shard.IndexShardIT; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -443,7 +442,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { shard.writeIndexingBuffer(); } }; - final IndexShard newShard = IndexShardTests.newIndexShard(indexService, shard, wrapper, imc); + final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, imc); shardRef.set(newShard); try { assertEquals(0, imc.availableShards().size()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java similarity index 100% rename from core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java rename to test/framework/src/main/java/org/elasticsearch/cluster/routing/ShardRoutingHelper.java diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java new file mode 100644 index 00000000000..23aed676af8 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -0,0 +1,477 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MapperTestUtils; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.cache.IndexCache; +import org.elasticsearch.index.cache.query.DisabledQueryCache; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.fielddata.IndexFieldDataService; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryFailedException; +import org.elasticsearch.indices.recovery.RecoverySourceHandler; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.indices.recovery.StartRecoveryRequest; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; + +/** + * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, + * containing utilities for shard creation and recoveries. See {{@link #newShard(boolean)}} and + * {@link #newStartedShard()} for a good starting points + */ +public abstract class IndexShardTestCase extends ESTestCase { + + protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { + @Override + public void onRecoveryDone(RecoveryState state) { + + } + + @Override + public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + throw new AssertionError(e); + } + }; + + protected ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + } + + @Override + public void tearDown() throws Exception { + try { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } finally { + super.tearDown(); + } + } + + private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { + final ShardId shardId = shardPath.getShardId(); + final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { + @Override + public Directory newDirectory() throws IOException { + return newFSDirectory(shardPath.resolveIndex()); + } + + @Override + public long throttleTimeInNanos() { + return 0; + } + }; + return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); + } + + /** + * creates a new initializing shard. The shard will have its own unique data path. + * + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica + * (ready to recover from another shard) + */ + protected IndexShard newShard(boolean primary) throws IOException { + ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), "n1", primary, + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting); + } + + /** + * creates a new initializing shard. The shard will have its own unique data path. + * + * @param shardRouting the {@link ShardRouting} to use for this shard + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard(ShardRouting shardRouting, IndexingOperationListener... listeners) throws IOException { + assert shardRouting.initializing() : shardRouting; + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName()) + .settings(settings) + .primaryTerm(0, 1); + return newShard(shardRouting, metaData.build(), listeners); + } + + /** + * creates a new initializing shard. The shard will have its own unique data path. + * + * @param shardId the shard id to use + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica + * (ready to recover from another shard) + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperationListener... listeners) throws IOException { + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAsciiOfLength(5), primary, + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting, listeners); + } + + /** + * creates a new initializing shard. The shard will will be put in its proper path under the + * supplied node id. + * + * @param shardId the shard id to use + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica + * (ready to recover from another shard) + */ + protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData, + @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting, indexMetaData, searcherWrapper); + } + + /** + * creates a new initializing shard. The shard will will be put in its proper path under the + * current node id the shard is assigned to. + * + * @param routing shard routing to use + * @param indexMetaData indexMetaData for the shard, including any mapping + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) + throws IOException { + return newShard(routing, indexMetaData, null, listeners); + } + + /** + * creates a new initializing shard. The shard will will be put in its proper path under the + * current node id the shard is assigned to. + * + * @param routing shard routing to use + * @param indexMetaData indexMetaData for the shard, including any mapping + * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, + @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingOperationListener... listeners) + throws IOException { + // add node id as name to settings for popper logging + final ShardId shardId = routing.shardId(); + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); + ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, listeners); + } + + /** + * creates a new initializing shard. + * + * @param routing shard routing to use + * @param shardPath path to use for shard data + * @param indexMetaData indexMetaData for the shard, including any mapping + * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param listeners an optional set of listeners to add to the shard + */ + protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, + @Nullable IndexSearcherWrapper indexSearcherWrapper, + IndexingOperationListener... listeners) throws IOException { + final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); + final IndexShard indexShard; + final Store store = createStore(indexSettings, shardPath); + boolean success = false; + try { + IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); + MapperService mapperService = MapperTestUtils.newMapperService(createTempDir(), indexSettings.getSettings()); + for (ObjectObjectCursor typeMapping : indexMetaData.getMappings()) { + mapperService.merge(typeMapping.key, typeMapping.value.source(), MapperService.MergeReason.MAPPING_RECOVERY, true); + } + SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap()); + final IndexEventListener indexEventListener = new IndexEventListener() { + }; + final Engine.Warmer warmer = searcher -> { + }; + IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(nodeSettings, new IndexFieldDataCache.Listener() { + }); + IndexFieldDataService indexFieldDataService = new IndexFieldDataService(indexSettings, indicesFieldDataCache, + new NoneCircuitBreakerService(), mapperService); + indexShard = new IndexShard(routing, indexSettings, shardPath, store, indexCache, mapperService, similarityService, + indexFieldDataService, null, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, + Collections.emptyList(), Arrays.asList(listeners)); + success = true; + } finally { + if (success == false) { + IOUtils.close(store); + } + } + return indexShard; + } + + /** + * Takes an existing shard, closes it and and starts a new initialing shard at the same location + * + * @param listeners new listerns to use for the newly created shard + */ + protected IndexShard reinitShard(IndexShard current, IndexingOperationListener... listeners) throws IOException { + final ShardRouting shardRouting = current.routingEntry(); + return reinitShard(current, ShardRoutingHelper.initWithSameId(shardRouting, + shardRouting.primary() ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE + ), listeners); + } + + /** + * Takes an existing shard, closes it and and starts a new initialing shard at the same location + * + * @param routing the shard routing to use for the newly created shard. + * @param listeners new listerns to use for the newly created shard + */ + protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { + closeShards(current); + return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, listeners); + } + + /** + * creates a new empyu shard and starts it. The shard will be either a replica or a primary. + */ + protected IndexShard newStartedShard() throws IOException { + return newStartedShard(randomBoolean()); + } + + /** + * creates a new empty shard and starts it. + * + * @param primary controls whether the shard will be a primary or a replica. + */ + protected IndexShard newStartedShard(boolean primary) throws IOException { + IndexShard shard = newShard(primary); + if (primary) { + recoveryShardFromStore(shard); + } else { + recoveryEmptyReplica(shard); + } + return shard; + } + + protected void closeShards(IndexShard... shards) throws IOException { + closeShards(Arrays.asList(shards)); + } + + protected void closeShards(Iterable shards) throws IOException { + for (IndexShard shard : shards) { + if (shard != null) { + try { + shard.close("test", false); + } finally { + IOUtils.close(shard.store()); + } + } + } + } + + protected void recoveryShardFromStore(IndexShard primary) throws IOException { + primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), + getFakeDiscoNode(primary.routingEntry().currentNodeId()), + null)); + primary.recoverFromStore(); + primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); + } + + protected void recoveryEmptyReplica(IndexShard replica) throws IOException { + IndexShard primary = null; + try { + primary = newStartedShard(true); + recoverReplica(replica, primary); + } finally { + closeShards(primary); + } + } + + private DiscoveryNode getFakeDiscoNode(String id) { + return new DiscoveryNode(id, new LocalTransportAddress("_fake_" + id), Version.CURRENT); + } + + /** recovers a replica from the given primary **/ + protected void recoverReplica(IndexShard replica, IndexShard primary) throws IOException { + recoverReplica(replica, primary, + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> { + }), + true); + } + + /** + * Recovers a replica from the give primary, allow the user to supply a custom recovery target. + * A typical usage of a custome recovery target is to assert things in the various stages of recovery + * + * @param markAsRecovering set to false if you have already marked the replica as recovering + */ + protected void recoverReplica(IndexShard replica, IndexShard primary, + BiFunction targetSupplier, + boolean markAsRecovering) + throws IOException { + final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); + final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); + if (markAsRecovering) { + replica.markAsRecovering("remote", + new RecoveryState(replica.routingEntry(), pNode, rNode)); + } else { + assertEquals(replica.state(), IndexShardState.RECOVERING); + } + replica.prepareForIndexRecovery(); + RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); + StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, + getMetadataSnapshotOrEmpty(replica), false, 0); + RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> { + }, + (int) ByteSizeUnit.MB.toKB(1), logger); + recovery.recoverToTarget(); + recoveryTarget.markAsDone(); + replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); + } + + private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException { + Store.MetadataSnapshot result; + try { + result = replica.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // OK! + result = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn("failed read store, treating as empty", e); + result = Store.MetadataSnapshot.EMPTY; + } + return result; + } + + protected Set getShardDocUIDs(final IndexShard shard) throws IOException { + shard.refresh("get_uids"); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + Set ids = new HashSet<>(); + for (LeafReaderContext leafContext : searcher.reader().leaves()) { + LeafReader reader = leafContext.reader(); + Bits liveDocs = reader.getLiveDocs(); + for (int i = 0; i < reader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + Document uuid = reader.document(i, Collections.singleton(UidFieldMapper.NAME)); + ids.add(Uid.createUid(uuid.get(UidFieldMapper.NAME))); + } + } + } + return ids; + } + } + + protected void assertDocCount(IndexShard shard, int docDount) throws IOException { + assertThat(getShardDocUIDs(shard), hasSize(docDount)); + } + + protected void assertDocs(IndexShard shard, Uid... uids) throws IOException { + final Set shardDocUIDs = getShardDocUIDs(shard); + assertThat(shardDocUIDs, contains(uids)); + assertThat(shardDocUIDs, hasSize(uids.length)); + } + + + protected Engine.Index indexDoc(IndexShard shard, String type, String id) { + return indexDoc(shard, type, id, "{}"); + } + + protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) { + final Engine.Index index; + if (shard.routingEntry().primary()) { + index = shard.prepareIndexOnPrimary( + SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), + Versions.MATCH_ANY, VersionType.INTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + } else { + index = shard.prepareIndexOnReplica( + SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), + 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + } + shard.index(index); + return index; + } + + protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) { + final Engine.Delete delete; + if (shard.routingEntry().primary()) { + delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL); + } else { + delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL); + } + shard.delete(delete); + return delete; + } + + protected void flushShard(IndexShard shard) { + flushShard(shard, false); + } + + protected void flushShard(IndexShard shard, boolean force) { + shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force)); + } +}