Introduce IndexShardTestCase (#20411)

Introduce a base class for unit tests that are based on real `IndexShard`s. The base class takes care of all the little details needed to create and recover shards. 

This commit also moves `IndexShardTests` and `ESIndexLevelReplicationTestCase` to use the new base class. All tests in `IndexShardTests` that required a full node environment were moved to a new `IndexShardIT` suite.
This commit is contained in:
Boaz Leskes 2016-09-12 18:20:25 +02:00 committed by GitHub
parent f39f9b9760
commit b08352047d
8 changed files with 1419 additions and 1116 deletions

View File

@ -43,6 +43,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -52,13 +54,13 @@ import static org.hamcrest.Matchers.nullValue;
public class IndexServiceTests extends ESSingleNodeTestCase { public class IndexServiceTests extends ESSingleNodeTestCase {
public void testDetermineShadowEngineShouldBeUsed() { public void testDetermineShadowEngineShouldBeUsed() {
Settings regularSettings = Settings.builder() Settings regularSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) .put(SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 1)
.build(); .build();
Settings shadowSettings = Settings.builder() Settings shadowSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) .put(SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.build(); .build();

View File

@ -18,15 +18,7 @@
*/ */
package org.elasticsearch.index.replication; package org.elasticsearch.index.replication;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
@ -41,52 +33,21 @@ import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySourceHandler;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -94,10 +55,8 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -107,98 +66,24 @@ import java.util.stream.StreamSupport;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase {
protected ThreadPool threadPool;
protected final Index index = new Index("test", "uuid"); protected final Index index = new Index("test", "uuid");
private final ShardId shardId = new ShardId(index, 0); private final ShardId shardId = new ShardId(index, 0);
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
fail(ExceptionsHelper.detailedMessage(e));
}
};
@TestLogging("index.shard:TRACE,index.replication:TRACE,indices.recovery:TRACE")
public void testIndexingDuringFileRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(randomInt(1))) {
shards.startAll();
int docs = shards.indexDocs(randomInt(50));
shards.flush();
IndexShard replica = shards.addReplica();
final CountDownLatch recoveryBlocked = new CountDownLatch(1);
final CountDownLatch releaseRecovery = new CountDownLatch(1);
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(replica,
new BiFunction<IndexShard, DiscoveryNode, RecoveryTarget>() {
@Override
public RecoveryTarget apply(IndexShard indexShard, DiscoveryNode node) {
return new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) {
@Override
public void renameAllTempFiles() throws IOException {
super.renameAllTempFiles();
recoveryBlocked.countDown();
try {
releaseRecovery.await();
} catch (InterruptedException e) {
throw new IOException("terminated by interrupt", e);
}
}
};
}
});
recoveryBlocked.await();
docs += shards.indexDocs(randomInt(20));
releaseRecovery.countDown();
recoveryFuture.get();
shards.assertAllEqual(docs);
}
}
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
}
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
final ShardId shardId = shardPath.getShardId();
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return newFSDirectory(shardPath.resolveIndex());
}
@Override
public long throttleTimeInNanos() {
return 0;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
}
protected ReplicationGroup createGroup(int replicas) throws IOException { protected ReplicationGroup createGroup(int replicas) throws IOException {
final Path homePath = createTempDir(); Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build(); .build();
IndexMetaData metaData = IndexMetaData.builder(index.getName()).settings(build).primaryTerm(0, 1).build(); IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName())
return new ReplicationGroup(metaData, homePath); .settings(settings)
.primaryTerm(0, 1);
for (Map.Entry<String, String> typeMapping: indexMapping.entrySet()) {
metaData.putMapping(typeMapping.getKey(), typeMapping.getValue());
}
return new ReplicationGroup(metaData.build());
} }
protected DiscoveryNode getDiscoveryNode(String id) { protected DiscoveryNode getDiscoveryNode(String id) {
@ -206,50 +91,22 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT);
} }
private IndexShard newShard(boolean primary, DiscoveryNode node, IndexMetaData indexMetaData, Path homePath) throws IOException {
// add node name to settings for propper logging
final Settings nodeSettings = Settings.builder().put("node.name", node.getName()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, node.getId(), primary, ShardRoutingState.INITIALIZING,
primary ? StoreRecoverySource.EMPTY_STORE_INSTANCE : PeerRecoverySource.INSTANCE);
final Path path = Files.createDirectories(homePath.resolve(node.getId()));
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(path);
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
Store store = createStore(indexSettings, shardPath);
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
MapperService mapperService = MapperTestUtils.newMapperService(homePath, indexSettings.getSettings());
for (Map.Entry<String, String> type : indexMapping.entrySet()) {
mapperService.merge(type.getKey(), new CompressedXContent(type.getValue()), MapperService.MergeReason.MAPPING_RECOVERY, true);
}
SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap());
final IndexEventListener indexEventListener = new IndexEventListener() {
};
final Engine.Warmer warmer = searcher -> {
};
return new IndexShard(shardRouting, indexSettings, shardPath, store, indexCache, mapperService, similarityService, null, null,
indexEventListener, null, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(),
Collections.emptyList());
}
protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> { protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> {
private final IndexShard primary; private final IndexShard primary;
private final List<IndexShard> replicas; private final List<IndexShard> replicas;
private final IndexMetaData indexMetaData; private final IndexMetaData indexMetaData;
private final Path homePath;
private final AtomicInteger replicaId = new AtomicInteger(); private final AtomicInteger replicaId = new AtomicInteger();
private final AtomicInteger docId = new AtomicInteger(); private final AtomicInteger docId = new AtomicInteger();
boolean closed = false; boolean closed = false;
ReplicationGroup(final IndexMetaData indexMetaData, Path homePath) throws IOException { ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
primary = newShard(true, getDiscoveryNode("s0"), indexMetaData, homePath); primary = newShard(shardId, true, "s0", indexMetaData, null);
replicas = new ArrayList<>(); replicas = new ArrayList<>();
this.indexMetaData = indexMetaData; this.indexMetaData = indexMetaData;
this.homePath = homePath;
for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) {
addReplica(); addReplica();
} }
} }
public int indexDocs(final int numOfDoc) throws Exception { public int indexDocs(final int numOfDoc) throws Exception {
@ -289,7 +146,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
} }
public synchronized IndexShard addReplica() throws IOException { public synchronized IndexShard addReplica() throws IOException {
final IndexShard replica = newShard(false, getDiscoveryNode("s" + replicaId.incrementAndGet()), indexMetaData, homePath); final IndexShard replica = newShard(shardId, false,"s" + replicaId.incrementAndGet(), indexMetaData, null);
replicas.add(replica); replicas.add(replica);
return replica; return replica;
} }
@ -304,39 +161,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
} }
public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier, public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) boolean markAsRecovering) throws IOException {
throws IOException { ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering);
final DiscoveryNode pNode = getPrimaryNode();
final DiscoveryNode rNode = getDiscoveryNode(replica.routingEntry().currentNodeId());
if (markAsRecovering) {
replica.markAsRecovering("remote",
new RecoveryState(replica.routingEntry(), pNode, rNode));
} else {
assertEquals(replica.state(), IndexShardState.RECOVERING);
}
replica.prepareForIndexRecovery();
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
getMetadataSnapshotOrEmpty(replica), false, 0);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {},
(int) ByteSizeUnit.MB.toKB(1), logger);
recovery.recoverToTarget();
recoveryTarget.markAsDone();
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));
}
private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException {
Store.MetadataSnapshot result;
try {
result = replica.snapshotStoreMetadata();
} catch (IndexNotFoundException e) {
// OK!
result = Store.MetadataSnapshot.EMPTY;
} catch (IOException e) {
logger.warn("failed read store, treating as empty", e);
result = Store.MetadataSnapshot.EMPTY;
}
return result;
} }
public synchronized DiscoveryNode getPrimaryNode() { public synchronized DiscoveryNode getPrimaryNode() {
@ -367,24 +193,6 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
} }
} }
private Set<Uid> getShardDocUIDs(final IndexShard shard) throws IOException {
shard.refresh("get_uids");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
Set<Uid> ids = new HashSet<>();
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
LeafReader reader = leafContext.reader();
Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < reader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
Document uuid = reader.document(i, Collections.singleton(UidFieldMapper.NAME));
ids.add(Uid.createUid(uuid.get(UidFieldMapper.NAME)));
}
}
}
return ids;
}
}
public synchronized void refresh(String source) { public synchronized void refresh(String source) {
for (IndexShard shard : this) { for (IndexShard shard : this) {
shard.refresh(source); shard.refresh(source);
@ -406,10 +214,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
public synchronized void close() throws Exception { public synchronized void close() throws Exception {
if (closed == false) { if (closed == false) {
closed = true; closed = true;
for (IndexShard shard : this) { closeShards(this);
shard.close("eol", false);
IOUtils.close(shard.store());
}
} else { } else {
throw new AlreadyClosedException("too bad"); throw new AlreadyClosedException("too bad");
} }

View File

@ -0,0 +1,476 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
public class IndexShardIT extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(InternalSettingsPlugin.class);
}
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl,
ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
document.add(uidField);
document.add(versionField);
return new ParsedDocument(versionField, id, type, routing, timestamp, ttl, Collections.singletonList(document), source,
mappingUpdate);
}
public void testLockTryingToDelete() throws Exception {
createIndex("test");
ensureGreen();
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
ClusterService cs = getInstanceFromNode(ClusterService.class);
final Index index = cs.state().metaData().index("test").getIndex();
Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0));
logger.info("--> paths: [{}]", (Object)shardPaths);
// Should not be able to acquire the lock because it's already open
try {
NodeEnvironment.acquireFSLockForPaths(IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), shardPaths);
fail("should not have been able to acquire the lock");
} catch (LockObtainFailedException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
}
// Test without the regular shard lock to assume we can acquire it
// (worst case, meaning that the shard lock could be acquired and
// we're green to delete the shard's directory)
ShardLock sLock = new DummyShardLock(new ShardId(index, 0));
try {
env.deleteShardDirectoryUnderLock(sLock, IndexSettingsModule.newIndexSettings("test", Settings.EMPTY));
fail("should not have been able to delete the directory");
} catch (LockObtainFailedException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
}
}
public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
assertBusy(() -> {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
}
);
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
public void testDurableFlagHasEffect() {
createIndex("test");
ensureGreen();
client().prepareIndex("test", "bar", "1").setSource("{}").get();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
IndexShard shard = test.getShardOrNull(0);
setDurability(shard, Translog.Durability.REQUEST);
assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.ASYNC);
client().prepareIndex("test", "bar", "2").setSource("{}").get();
assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.REQUEST);
client().prepareDelete("test", "bar", "1").get();
assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.ASYNC);
client().prepareDelete("test", "bar", "2").get();
assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.REQUEST);
assertNoFailures(client().prepareBulk()
.add(client().prepareIndex("test", "bar", "3").setSource("{}"))
.add(client().prepareDelete("test", "bar", "1")).get());
assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.ASYNC);
assertNoFailures(client().prepareBulk()
.add(client().prepareIndex("test", "bar", "4").setSource("{}"))
.add(client().prepareDelete("test", "bar", "3")).get());
setDurability(shard, Translog.Durability.REQUEST);
assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded());
}
private void setDurability(IndexShard shard, Translog.Durability durability) {
client().admin().indices().prepareUpdateSettings(shard.shardId().getIndexName()).setSettings(
Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability.name()).build()).get();
assertEquals(durability, shard.getTranslogDurability());
}
public void testUpdatePriority() {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(IndexMetaData.SETTING_PRIORITY, 200));
IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test"));
assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400)
.build()).get();
assertEquals(400, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue());
}
public void testIndexDirIsDeletedWhenShardRemoved() throws Exception {
Environment env = getInstanceFromNode(Environment.class);
Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));
logger.info("--> idxPath: [{}]", idxPath);
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_DATA_PATH, idxPath)
.build();
createIndex("test", idxSettings);
ensureGreen("test");
client().prepareIndex("test", "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get();
SearchResponse response = client().prepareSearch("test").get();
assertHitCount(response, 1L);
client().admin().indices().prepareDelete("test").get();
assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class)));
assertPathHasBeenCleared(idxPath);
}
public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
for (int i = 0; i < 50; i++) {
client().prepareIndex("test", "test").setSource("{}").get();
}
ensureGreen("test");
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
clusterInfoService.refresh();
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Long test = clusterInfoService.getClusterInfo().getShardSize(state.getRoutingTable().index("test")
.getShards().get(0).primaryShard());
assertNotNull(test);
assertTrue(test > 0);
}
public void testIndexCanChangeCustomDataPath() throws Exception {
Environment env = getInstanceFromNode(Environment.class);
Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));
final String INDEX = "idx";
Path startDir = idxPath.resolve("start-" + randomAsciiOfLength(10));
Path endDir = idxPath.resolve("end-" + randomAsciiOfLength(10));
logger.info("--> start dir: [{}]", startDir.toAbsolutePath().toString());
logger.info("--> end dir: [{}]", endDir.toAbsolutePath().toString());
// temp dirs are automatically created, but the end dir is what
// startDir is going to be renamed as, so it needs to be deleted
// otherwise we get all sorts of errors about the directory
// already existing
IOUtils.rm(endDir);
Settings sb = Settings.builder()
.put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString())
.build();
Settings sb2 = Settings.builder()
.put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString())
.build();
logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString());
createIndex(INDEX, sb);
ensureGreen(INDEX);
client().prepareIndex(INDEX, "bar", "1").setSource("{}").setRefreshPolicy(IMMEDIATE).get();
SearchResponse resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
logger.info("--> closing the index [{}]", INDEX);
client().admin().indices().prepareClose(INDEX).get();
logger.info("--> index closed, re-opening...");
client().admin().indices().prepareOpen(INDEX).get();
logger.info("--> index re-opened");
ensureGreen(INDEX);
resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
// Now, try closing and changing the settings
logger.info("--> closing the index [{}]", INDEX);
client().admin().indices().prepareClose(INDEX).get();
logger.info("--> moving data on disk [{}] to [{}]", startDir.getFileName(), endDir.getFileName());
assert Files.exists(endDir) == false : "end directory should not exist!";
Files.move(startDir, endDir, StandardCopyOption.REPLACE_EXISTING);
logger.info("--> updating settings...");
client().admin().indices().prepareUpdateSettings(INDEX)
.setSettings(sb2)
.setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true))
.get();
assert Files.exists(startDir) == false : "start dir shouldn't exist";
logger.info("--> settings updated and files moved, re-opening index");
client().admin().indices().prepareOpen(INDEX).get();
logger.info("--> index re-opened");
ensureGreen(INDEX);
resp = client().prepareSearch(INDEX).setQuery(matchAllQuery()).get();
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
assertAcked(client().admin().indices().prepareDelete(INDEX));
assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class)));
assertPathHasBeenCleared(startDir.toAbsolutePath());
assertPathHasBeenCleared(endDir.toAbsolutePath());
}
public void testMaybeFlush() throws Exception {
createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.build());
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldFlush());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(),
new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.index(index);
assertTrue(shard.shouldFlush());
assertEquals(2, shard.getEngine().getTranslog().totalOperations());
client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldFlush());
});
assertEquals(0, shard.getEngine().getTranslog().totalOperations());
shard.getEngine().getTranslog().sync();
long size = shard.getEngine().getTranslog().sizeInBytes();
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(),
shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
.build()).get();
client().prepareDelete("test", "test", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(),
shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
assertBusy(() -> { // this is async
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(),
shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
assertFalse(shard.shouldFlush());
});
assertEquals(0, shard.getEngine().getTranslog().totalOperations());
}
public void testStressMaybeFlush() throws Exception {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);
final int numThreads = randomIntBetween(2, 4);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
while (running.get()) {
shard.maybeFlush();
}
}
};
threads[i].start();
}
barrier.await();
FlushStats flushStats = shard.flushStats();
long total = flushStats.getTotal();
client().prepareIndex("test", "test", "1").setSource("{}").get();
assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal()));
running.set(false);
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
assertEquals(total + 1, shard.flushStats().getTotal());
}
public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService(resolveIndex("test"));
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get();
client().prepareDelete("test", "test", "0").get();
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefreshPolicy(IMMEDIATE).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
shard.close("simon says", false);
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
List<Exception> failures = new ArrayList<>();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index, boolean created) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0);
shardRef.get().refresh("test");
} catch (Exception e) {
failures.add(e);
throw e;
}
}
@Override
public void postDelete(Engine.Delete delete) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0);
shardRef.get().refresh("test");
} catch (Exception e) {
failures.add(e);
throw e;
}
}
};
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, listener);
shardRef.set(newShard);
recoverShard(newShard);
try {
ExceptionsHelper.rethrowAndSuppress(failures);
} finally {
newShard.close("just do it", randomBoolean());
}
}
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
return newShard;
}
public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
IndexingOperationListener... listeners) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners));
return newShard;
}
private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(),
existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING,
existingShardRouting.allocationId());
shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery"),
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE);
return shardRouting;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
@ -64,4 +65,7 @@ public class ShardUtilsTests extends ESTestCase {
IOUtils.close(writer, dir); IOUtils.close(writer, dir);
} }
public static Engine getShardEngine(IndexShard shard) {
return shard.getEngine();
}
} }

View File

@ -21,7 +21,6 @@ package org.elasticsearch.indices;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
@ -31,7 +30,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTests; import org.elasticsearch.index.shard.IndexShardIT;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -443,7 +442,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
shard.writeIndexingBuffer(); shard.writeIndexingBuffer();
} }
}; };
final IndexShard newShard = IndexShardTests.newIndexShard(indexService, shard, wrapper, imc); final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, imc);
shardRef.set(newShard); shardRef.set(newShard);
try { try {
assertEquals(0, imc.availableShards().size()); assertEquals(0, imc.availableShards().size());

View File

@ -0,0 +1,477 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySourceHandler;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
/**
* A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
* containing utilities for shard creation and recoveries. See {{@link #newShard(boolean)}} and
* {@link #newStartedShard()} for a good starting points
*/
public abstract class IndexShardTestCase extends ESTestCase {
protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
throw new AssertionError(e);
}
};
protected ThreadPool threadPool;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
}
@Override
public void tearDown() throws Exception {
try {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
} finally {
super.tearDown();
}
}
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
final ShardId shardId = shardPath.getShardId();
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return newFSDirectory(shardPath.resolveIndex());
}
@Override
public long throttleTimeInNanos() {
return 0;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
}
/**
* creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
*/
protected IndexShard newShard(boolean primary) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), "n1", primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting);
}
/**
* creates a new initializing shard. The shard will have its own unique data path.
*
* @param shardRouting the {@link ShardRouting} to use for this shard
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting shardRouting, IndexingOperationListener... listeners) throws IOException {
assert shardRouting.initializing() : shardRouting;
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
.primaryTerm(0, 1);
return newShard(shardRouting, metaData.build(), listeners);
}
/**
* creates a new initializing shard. The shard will have its own unique data path.
*
* @param shardId the shard id to use
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperationListener... listeners) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAsciiOfLength(5), primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, listeners);
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* supplied node id.
*
* @param shardId the shard id to use
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
*/
protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper);
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
*
* @param routing shard routing to use
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners)
throws IOException {
return newShard(routing, indexMetaData, null, listeners);
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
*
* @param routing shard routing to use
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingOperationListener... listeners)
throws IOException {
// add node id as name to settings for popper logging
final ShardId shardId = routing.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, listeners);
}
/**
* creates a new initializing shard.
*
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
final IndexShard indexShard;
final Store store = createStore(indexSettings, shardPath);
boolean success = false;
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
MapperService mapperService = MapperTestUtils.newMapperService(createTempDir(), indexSettings.getSettings());
for (ObjectObjectCursor<String, MappingMetaData> typeMapping : indexMetaData.getMappings()) {
mapperService.merge(typeMapping.key, typeMapping.value.source(), MapperService.MergeReason.MAPPING_RECOVERY, true);
}
SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap());
final IndexEventListener indexEventListener = new IndexEventListener() {
};
final Engine.Warmer warmer = searcher -> {
};
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(nodeSettings, new IndexFieldDataCache.Listener() {
});
IndexFieldDataService indexFieldDataService = new IndexFieldDataService(indexSettings, indicesFieldDataCache,
new NoneCircuitBreakerService(), mapperService);
indexShard = new IndexShard(routing, indexSettings, shardPath, store, indexCache, mapperService, similarityService,
indexFieldDataService, null, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer,
Collections.emptyList(), Arrays.asList(listeners));
success = true;
} finally {
if (success == false) {
IOUtils.close(store);
}
}
return indexShard;
}
/**
* Takes an existing shard, closes it and and starts a new initialing shard at the same location
*
* @param listeners new listerns to use for the newly created shard
*/
protected IndexShard reinitShard(IndexShard current, IndexingOperationListener... listeners) throws IOException {
final ShardRouting shardRouting = current.routingEntry();
return reinitShard(current, ShardRoutingHelper.initWithSameId(shardRouting,
shardRouting.primary() ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
), listeners);
}
/**
* Takes an existing shard, closes it and and starts a new initialing shard at the same location
*
* @param routing the shard routing to use for the newly created shard.
* @param listeners new listerns to use for the newly created shard
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, listeners);
}
/**
* creates a new empyu shard and starts it. The shard will be either a replica or a primary.
*/
protected IndexShard newStartedShard() throws IOException {
return newStartedShard(randomBoolean());
}
/**
* creates a new empty shard and starts it.
*
* @param primary controls whether the shard will be a primary or a replica.
*/
protected IndexShard newStartedShard(boolean primary) throws IOException {
IndexShard shard = newShard(primary);
if (primary) {
recoveryShardFromStore(shard);
} else {
recoveryEmptyReplica(shard);
}
return shard;
}
protected void closeShards(IndexShard... shards) throws IOException {
closeShards(Arrays.asList(shards));
}
protected void closeShards(Iterable<IndexShard> shards) throws IOException {
for (IndexShard shard : shards) {
if (shard != null) {
try {
shard.close("test", false);
} finally {
IOUtils.close(shard.store());
}
}
}
}
protected void recoveryShardFromStore(IndexShard primary) throws IOException {
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
null));
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
}
protected void recoveryEmptyReplica(IndexShard replica) throws IOException {
IndexShard primary = null;
try {
primary = newStartedShard(true);
recoverReplica(replica, primary);
} finally {
closeShards(primary);
}
}
private DiscoveryNode getFakeDiscoNode(String id) {
return new DiscoveryNode(id, new LocalTransportAddress("_fake_" + id), Version.CURRENT);
}
/** recovers a replica from the given primary **/
protected void recoverReplica(IndexShard replica, IndexShard primary) throws IOException {
recoverReplica(replica, primary,
(r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {
}),
true);
}
/**
* Recovers a replica from the give primary, allow the user to supply a custom recovery target.
* A typical usage of a custome recovery target is to assert things in the various stages of recovery
*
* @param markAsRecovering set to false if you have already marked the replica as recovering
*/
protected void recoverReplica(IndexShard replica, IndexShard primary,
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering)
throws IOException {
final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId());
final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId());
if (markAsRecovering) {
replica.markAsRecovering("remote",
new RecoveryState(replica.routingEntry(), pNode, rNode));
} else {
assertEquals(replica.state(), IndexShardState.RECOVERING);
}
replica.prepareForIndexRecovery();
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
getMetadataSnapshotOrEmpty(replica), false, 0);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {
},
(int) ByteSizeUnit.MB.toKB(1), logger);
recovery.recoverToTarget();
recoveryTarget.markAsDone();
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));
}
private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException {
Store.MetadataSnapshot result;
try {
result = replica.snapshotStoreMetadata();
} catch (IndexNotFoundException e) {
// OK!
result = Store.MetadataSnapshot.EMPTY;
} catch (IOException e) {
logger.warn("failed read store, treating as empty", e);
result = Store.MetadataSnapshot.EMPTY;
}
return result;
}
protected Set<Uid> getShardDocUIDs(final IndexShard shard) throws IOException {
shard.refresh("get_uids");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
Set<Uid> ids = new HashSet<>();
for (LeafReaderContext leafContext : searcher.reader().leaves()) {
LeafReader reader = leafContext.reader();
Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < reader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
Document uuid = reader.document(i, Collections.singleton(UidFieldMapper.NAME));
ids.add(Uid.createUid(uuid.get(UidFieldMapper.NAME)));
}
}
}
return ids;
}
}
protected void assertDocCount(IndexShard shard, int docDount) throws IOException {
assertThat(getShardDocUIDs(shard), hasSize(docDount));
}
protected void assertDocs(IndexShard shard, Uid... uids) throws IOException {
final Set<Uid> shardDocUIDs = getShardDocUIDs(shard);
assertThat(shardDocUIDs, contains(uids));
assertThat(shardDocUIDs, hasSize(uids.length));
}
protected Engine.Index indexDoc(IndexShard shard, String type, String id) {
return indexDoc(shard, type, id, "{}");
}
protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) {
final Engine.Index index;
if (shard.routingEntry().primary()) {
index = shard.prepareIndexOnPrimary(
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)),
Versions.MATCH_ANY, VersionType.INTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
} else {
index = shard.prepareIndexOnReplica(
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
}
shard.index(index);
return index;
}
protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) {
final Engine.Delete delete;
if (shard.routingEntry().primary()) {
delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);
} else {
delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL);
}
shard.delete(delete);
return delete;
}
protected void flushShard(IndexShard shard) {
flushShard(shard, false);
}
protected void flushShard(IndexShard shard, boolean force) {
shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force));
}
}