Refactor StoreRecoveryService to be a simple package private util class

StoreRecoveryService used to be a pretty heavy class with lots of dependencies.
This class was basically not testable in isolation and had an async API with a listener.
This commit refactors this class to be a simple utility classs with a sync API hidden behind
the IndexShard interface. It includes single node tests and moves all the async properities to
the caller side.
Note, this change also removes the mapping update on master from the store recovery code since
it's not needed anymore in 3.0 because all stores have been subject to sync mapping updates such
that the master already has all the mappings for documents that made it into the transaction log.

Closes #13766
This commit is contained in:
Simon Willnauer 2015-09-23 23:07:17 +02:00
parent 882fe0784c
commit 3b5ed08d49
10 changed files with 486 additions and 392 deletions

View File

@ -53,7 +53,6 @@ import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
@ -442,9 +441,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
// ignore
}
}
closeInjectorResource(sId, shardInjector,
StoreRecoveryService.class);
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId, indexShard, indexSettings);
}

View File

@ -87,6 +87,7 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -155,7 +156,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final TranslogConfig translogConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndicesQueryCache indicesQueryCache;
private final StoreRecoveryService storeRecoveryService;
private TimeValue refreshInterval;
@ -200,7 +200,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService,
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
IndicesQueryCache indicesQueryCache, CodecService codecService,
TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
@ -217,7 +217,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.storeRecoveryService = storeRecoveryService;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
this.threadPool = threadPool;
this.mapperService = mapperService;
@ -844,13 +843,12 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* After the store has been recovered, we need to start the engine in order to apply operations
*/
public Map<String, Mapping> performTranslogRecovery(boolean indexExists) {
final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false, indexExists);
public void performTranslogRecovery(boolean indexExists) {
internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
return recoveredTypes;
}
private Map<String, Mapping> internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) {
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
@ -869,7 +867,7 @@ public class IndexShard extends AbstractIndexShardComponent {
engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);
createNewEngine(skipTranslogRecovery, engineConfig);
return engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes();
}
/**
@ -879,8 +877,7 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public void skipTranslogRecovery() throws IOException {
assert engineUnsafe() == null : "engine was already created";
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true, true);
assert recoveredTypes.isEmpty();
internalPerformTranslogRecovery(true, true);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}
@ -1063,12 +1060,19 @@ public class IndexShard extends AbstractIndexShardComponent {
return path;
}
public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) {
public boolean recoverFromStore(ShardRouting shard) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
final boolean shouldExist = shard.allocatedPostIndexCreate();
storeRecoveryService.recover(this, shouldExist, recoveryListener);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
}
public boolean restoreFromRepository(ShardRouting shard, IndexShardRepository repository) {
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
}
/**

View File

@ -66,7 +66,6 @@ public class IndexShardModule extends AbstractModule {
}
bind(EngineFactory.class).to(engineFactoryImpl);
bind(StoreRecoveryService.class).asEagerSingleton();
bind(IndexSearcherWrappingService.class).asEagerSingleton();
// this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService
Multibinder<IndexSearcherWrapper> multibinder

View File

@ -55,7 +55,7 @@ public final class ShadowIndexShard extends IndexShard {
@Inject
public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService,
IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService,
IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService,
IndexQueryParserService queryParserService, IndexCache indexCache,
IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache,
@ -64,7 +64,7 @@ public final class ShadowIndexShard extends IndexShard {
SimilarityService similarityService,
EngineFactory factory, ClusterService clusterService,
ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService,
super(shardId, indexSettingsService, indicesLifecycle, store,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indicesQueryCache, codecService,
termVectorsService, indexFieldDataService,

View File

@ -0,0 +1,278 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
* disk or from a snapshot in a repository.
*/
final class StoreRecovery {
private final ESLogger logger;
private final ShardId shardId;
StoreRecovery(ShardId shardId, ESLogger logger) {
this.logger = logger;
this.shardId = shardId;
}
/**
* Recovers a shard from it's local file system store. This method required pre-knowledge about if the shard should
* exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index
* files / transaction logs. This
* @param indexShard the index shard instance to recovery the shard into
* @param indexShouldExists <code>true</code> iff the index should exist on disk ie. has the shard been allocated previously on the shards store.
* @param localNode the reference to the local node
* @return <code>true</code> if the the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @see Store
*/
boolean recoverFromStore(final IndexShard indexShard, final boolean indexShouldExists, DiscoveryNode localNode) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() != null) {
throw new IllegalStateException("can't recover - restore source is not null");
}
try {
indexShard.recovering("from store", RecoveryState.Type.STORE, localNode);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("starting recovery from store ...");
internalRecoverFromStore(indexShard, indexShouldExists);
});
}
return false;
}
/**
* Recovers an index from a given {@link IndexShardRepository}. This method restores a
* previously created index snapshot into an existing initializing shard.
* @param indexShard the index shard instance to recovery the snapshot from
* @param repository the repository holding the physical files the shard should be recovered from
* @return <code>true</code> if the the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
*/
boolean recoverFromRepository(final IndexShard indexShard, IndexShardRepository repository) {
if (canRecover(indexShard)) {
if (indexShard.routingEntry().restoreSource() == null) {
throw new IllegalStateException("can't restore - restore source is null");
}
try {
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
return false;
}
return executeRecovery(indexShard, () -> {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
restore(indexShard, repository);
});
}
return false;
}
private boolean canRecover(IndexShard indexShard) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
return false;
}
if (!indexShard.routingEntry().primary()) {
throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null);
}
return true;
}
/**
* Recovers the state of the shard from the store.
*/
private boolean executeRecovery(final IndexShard indexShard, Runnable recoveryRunnable) throws IndexShardRecoveryException {
try {
recoveryRunnable.run();
// Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway
// to call post recovery.
final IndexShardState shardState = indexShard.state();
final RecoveryState recoveryState = indexShard.recoveryState();
assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]";
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append("shard_store").append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n");
RecoveryState.Index index = recoveryState.getIndex();
sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.totalBytes())).append("], took[")
.append(TimeValue.timeValueMillis(index.time())).append("]\n");
sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.recoveredBytes())).append("]\n");
sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time()));
}
return true;
} catch (IndexShardRecoveryException e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
return false;
}
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
return false;
}
throw e;
} catch (IndexShardClosedException | IndexShardNotStartedException e) {
} catch (Exception e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
return false;
}
throw new IndexShardRecoveryException(shardId, "failed recovery", e);
}
return false;
}
/**
* Recovers the state of the shard from the store.
*/
private void internalRecoverFromStore(IndexShard indexShard, boolean indexShouldExists) throws IndexShardRecoveryException {
final RecoveryState recoveryState = indexShard.recoveryState();
indexShard.prepareForIndexRecovery();
long version = -1;
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
try {
try {
store.failIfCorrupted();
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Throwable e1) {
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(shardId, "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
}
}
if (si != null) {
if (indexShouldExists) {
version = si.getVersion();
} else {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
recoveryState.getTranslog().totalOperations(0);
}
}
} catch (Throwable e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
index.addFileDetail(name, length, true);
}
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
if (indexShouldExists == false) {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
}
indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException e) {
throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e);
} finally {
store.decRef();
}
}
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*/
private void restore(final IndexShard indexShard, final IndexShardRepository indexShardRepository) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId);
}
try {
translogState.totalOperations(0);
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId;
if (!shardId.getIndex().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
}
indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
} catch (Throwable t) {
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
}
}
}

View File

@ -1,337 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
*
*/
public class StoreRecoveryService extends AbstractIndexShardComponent implements Closeable {
private final MappingUpdatedAction mappingUpdatedAction;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TimeValue waitForMappingUpdatePostRecovery;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private static final String SETTING_MAPPING_UPDATE_WAIT_LEGACY = "index.gateway.wait_for_mapping_update_post_recovery";
private static final String SETTING_MAPPING_UPDATE_WAIT = "index.shard.wait_for_mapping_update_post_recovery";
private final RestoreService restoreService;
private final RepositoriesService repositoriesService;
@Inject
public StoreRecoveryService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
MappingUpdatedAction mappingUpdatedAction, ClusterService clusterService, RepositoriesService repositoriesService, RestoreService restoreService) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.mappingUpdatedAction = mappingUpdatedAction;
this.restoreService = restoreService;
this.repositoriesService = repositoriesService;
this.clusterService = clusterService;
this.waitForMappingUpdatePostRecovery = indexSettings.getAsTime(SETTING_MAPPING_UPDATE_WAIT, indexSettings.getAsTime(SETTING_MAPPING_UPDATE_WAIT_LEGACY, TimeValue.timeValueSeconds(15)));
}
public interface RecoveryListener {
void onRecoveryDone();
void onIgnoreRecovery(String reason);
void onRecoveryFailed(IndexShardRecoveryException e);
}
/**
* Recovers the state of the shard from the gateway.
*/
public void recover(final IndexShard indexShard, final boolean indexShouldExists, final RecoveryListener listener) throws IndexShardRecoveryException {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
if (!indexShard.routingEntry().primary()) {
listener.onRecoveryFailed(new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null));
return;
}
try {
if (indexShard.routingEntry().restoreSource() != null) {
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
} else {
indexShard.recovering("from store", RecoveryState.Type.STORE, clusterService.localNode());
}
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery("already in recovering process, " + e.getMessage());
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
final RecoveryState recoveryState = indexShard.recoveryState();
if (indexShard.routingEntry().restoreSource() != null) {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
restore(indexShard, recoveryState);
} else {
logger.debug("starting recovery from shard_store ...");
recoverFromStore(indexShard, indexShouldExists, recoveryState);
}
// Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway
// to call post recovery.
IndexShardState shardState = indexShard.state();
assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]";
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append("shard_store").append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n");
RecoveryState.Index index = recoveryState.getIndex();
sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.totalBytes())).append("], took[")
.append(TimeValue.timeValueMillis(index.time())).append("]\n");
sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.recoveredBytes())).append("]\n");
sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" verify_index : took [").append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time()));
}
listener.onRecoveryDone();
} catch (IndexShardRecoveryException e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
listener.onRecoveryFailed(e);
} catch (IndexShardClosedException e) {
listener.onIgnoreRecovery("shard closed");
} catch (IndexShardNotStartedException e) {
listener.onIgnoreRecovery("shard closed");
} catch (Exception e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
listener.onRecoveryFailed(new IndexShardRecoveryException(shardId, "failed recovery", e));
}
}
});
}
/**
* Recovers the state of the shard from the store.
*/
private void recoverFromStore(IndexShard indexShard, boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardRecoveryException {
indexShard.prepareForIndexRecovery();
long version = -1;
final Map<String, Mapping> typesToUpdate;
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
try {
try {
store.failIfCorrupted();
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Throwable e1) {
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
}
}
if (si != null) {
if (indexShouldExists) {
version = si.getVersion();
} else {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
recoveryState.getTranslog().totalOperations(0);
}
}
} catch (Throwable e) {
throw new IndexShardRecoveryException(shardId(), "failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
index.addFileDetail(name, length, true);
}
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
if (indexShouldExists == false) {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
}
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
String indexName = indexShard.shardId().index().name();
for (Map.Entry<String, Mapping> entry : typesToUpdate.entrySet()) {
validateMappingUpdate(indexName, entry.getKey(), entry.getValue());
}
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException e) {
throw new IndexShardRecoveryException(shardId, "failed to recovery from gateway", e);
} finally {
store.decRef();
}
}
private void validateMappingUpdate(final String indexName, final String type, Mapping update) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
mappingUpdatedAction.updateMappingOnMaster(indexName, type, update, waitForMappingUpdatePostRecovery, new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
latch.countDown();
error.set(t);
}
});
cancellableThreads.execute(new CancellableThreads.Interruptable() {
@Override
public void run() throws InterruptedException {
try {
if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("waited for mapping update on master for [{}], yet timed out", type);
} else {
if (error.get() != null) {
throw new IndexShardRecoveryException(shardId, "Failed to propagate mappings on master post recovery", error.get());
}
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
throw e;
}
}
});
}
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*
* @param recoveryState recovery state
*/
private void restore(final IndexShard indexShard, final RecoveryState recoveryState) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId);
}
try {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
ShardId snapshotShardId = shardId;
if (!shardId.getIndex().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
}
indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState);
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shardId());
}
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
}
}
@Override
public void close() {
cancellableThreads.cancel("closed");
}
}

View File

@ -32,16 +32,13 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -52,17 +49,14 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
@ -91,6 +85,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
private final RestoreService restoreService;
private final RepositoriesService repositoriesService;
static class FailedShard {
public final long version;
@ -112,7 +108,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction) {
NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -121,7 +117,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.shardStateAction = shardStateAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.restoreService = restoreService;
this.repositoriesService = repositoriesService;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@ -675,18 +672,33 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
handleRecoveryFailure(indexService, shardRouting, true, e);
}
} else {
indexService.shard(shardId).recoverFromStore(shardRouting, new StoreRecoveryService.RecoveryListener() {
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
}
@Override
public void onIgnoreRecovery(String reason) {
}
@Override
public void onRecoveryFailed(IndexShardRecoveryException e) {
threadPool.generic().execute(() -> {
final RestoreSource restoreSource = shardRouting.restoreSource();
try {
final boolean success;
final IndexShard shard = indexService.shard(shardId);
if (restoreSource == null) {
// recover from filesystem store
success = shard.recoverFromStore(shardRouting);
} else {
// restore
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
try {
success = shard.restoreFromRepository(shardRouting, indexShardRepository);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shard.shardId());
}
throw t;
}
if (success) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shard.shardId());
}
}
if (success) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
}
} catch (Throwable e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
}
});

View File

@ -65,7 +65,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.StoreRecoveryService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
@ -115,11 +115,9 @@ import static org.elasticsearch.common.util.set.Sets.newHashSet;
* method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link StoreRecoveryService#recover(IndexShard, boolean, StoreRecoveryService.RecoveryListener)}
* {@link IndexShard#restoreFromRepository(ShardRouting, IndexShardRepository)}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null
* {@code recover} method uses {@link StoreRecoveryService#restore}
* method to start shard restore process.
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property.
* <p>
* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)},
* which updates {@link RestoreInProgress} in cluster state or removes it when all shards are completed. In case of
@ -489,7 +487,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
/**
* This method is used by {@link StoreRecoveryService} to notify
* This method is used by {@link IndexShard} to notify
* {@code RestoreService} about shard restore completion.
*
* @param snapshotId snapshot id

View File

@ -43,4 +43,16 @@ public class ShardRoutingHelper {
public static void initialize(ShardRouting routing, String nodeId, long expectedSize) {
routing.initialize(nodeId, expectedSize);
}
public static void reinit(ShardRouting routing) {
routing.reinitializeShard();
}
public static void moveToUnassigned(ShardRouting routing, UnassignedInfo info) {
routing.moveToUnassigned(info);
}
public static ShardRouting newWithRestoreSource(ShardRouting routing, RestoreSource restoreSource) {
return new ShardRouting(routing.index(), routing.shardId().id(), routing.currentNodeId(), routing.relocatingNodeId(), restoreSource, routing.primary(), routing.state(), routing.version(), routing.unassignedInfo(), routing.allocationId(), true, routing.getExpectedShardSize());
}
}

View File

@ -21,7 +21,9 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
@ -37,14 +39,14 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -64,10 +66,13 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.VersionUtils;
@ -90,7 +95,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
@ -767,4 +771,132 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(total + 1, shard.flushStats().getTotal());
}
public void testRecoverFromStore() {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush().get();
}
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c simon says so");
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue(newShard.recoverFromStore(routing));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 1);
}
public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.shard(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush().get();
}
final ShardRouting origRouting = shard.routingEntry();
ShardRouting routing = new ShardRouting(origRouting);
Store store = shard.store();
store.incRef();
test.removeShard(0, "b/c simon says so");
Lucene.cleanLuceneIndex(store.directory());
store.decRef();
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
try {
newShard.recoverFromStore(routing);
fail("index not there!");
} catch (IndexShardRecoveryException ex) {
assertTrue(ex.getMessage().contains("failed to fetch index version after copying it over"));
}
ShardRoutingHelper.moveToUnassigned(routing, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "because I say so"));
ShardRoutingHelper.initialize(routing, origRouting.currentNodeId());
assertFalse("it's already recovering", newShard.recoverFromStore(routing));
test.removeShard(0, "I broken it");
newShard = test.createShard(0, routing);
newShard.updateRoutingEntry(routing, false);
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore(routing));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(true).get();
assertHitCount(client().prepareSearch().get(), 1);
}
public void testRestoreShard() throws IOException {
createIndex("test");
createIndex("test_target");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
IndexService test_target = indicesService.indexService("test_target");
final IndexShard test_shard = test.shard(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
client().prepareIndex("test_target", "test", "1").setSource("{}").setRefresh(true).get();
assertHitCount(client().prepareSearch("test_target").get(), 1);
assertSearchHits(client().prepareSearch("test_target").get(), "1");
client().admin().indices().prepareFlush("test").get(); // only flush test
final ShardRouting origRouting = test_target.shard(0).routingEntry();
ShardRouting routing = new ShardRouting(origRouting);
ShardRoutingHelper.reinit(routing);
routing = ShardRoutingHelper.newWithRestoreSource(routing, new RestoreSource(new SnapshotId("foo", "bar"), Version.CURRENT, "test"));
test_target.removeShard(0, "just do it man!");
final IndexShard test_target_shard = test_target.createShard(0, routing);
Store sourceStore = test_shard.store();
Store targetStore = test_target_shard.store();
test_target_shard.updateRoutingEntry(routing, false);
assertTrue(test_target_shard.restoreFromRepository(routing, new IndexShardRepository() {
@Override
public void snapshot(SnapshotId snapshotId, ShardId shardId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
}
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
Lucene.cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
return null;
}
@Override
public void verify(String verificationToken) {
}
}));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
test_target_shard.updateRoutingEntry(routing, true);
assertHitCount(client().prepareSearch("test_target").get(), 1);
assertSearchHits(client().prepareSearch("test_target").get(), "0");
}
}