[RESTORE] Fail restore if snapshot is corrupted
today if a snapshot is corrupted the restore operation never terminates. Yet, if the snapshot is corrupted there is no way to restore it anyway. If such a snapshot is restored today the only way to cancle it is to delete the entire index which might cause dataloss. This commit also fixes an issue in InternalEngine where a deadlock can occur if a corruption is detected during flush since the InternalEngine#snapshotIndex aqcuires a topLevel read lock which prevents closing the engine. Closes #6938
This commit is contained in:
parent
d65a9d63a2
commit
e730c76be7
|
@ -315,9 +315,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
private void readLastCommittedSegmentsInfo() throws IOException {
|
||||
SegmentInfos infos = new SegmentInfos();
|
||||
infos.read(store.directory());
|
||||
lastCommittedSegmentInfos = infos;
|
||||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -907,11 +905,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
} catch (Throwable e) {
|
||||
if (!closed) {
|
||||
logger.warn("failed to read latest segment infos on flush", e);
|
||||
if (Lucene.isCorruptionException(e)) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (FlushFailedEngineException ex) {
|
||||
maybeFailEngine(ex.getCause(), "flush");
|
||||
maybeFailEngine(ex, "flush");
|
||||
throw ex;
|
||||
} finally {
|
||||
flushLock.unlock();
|
||||
|
@ -1033,8 +1033,10 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public SnapshotIndexCommit snapshotIndex() throws EngineException {
|
||||
// we have to flush outside of the readlock otherwise we might have a problem upgrading
|
||||
// the to a write lock when we fail the engine in this operation
|
||||
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
|
||||
ensureOpen();
|
||||
return deletionPolicy.snapshot();
|
||||
} catch (IOException e) {
|
||||
|
@ -1102,16 +1104,19 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
}
|
||||
|
||||
private void maybeFailEngine(Throwable t, String source) {
|
||||
private boolean maybeFailEngine(Throwable t, String source) {
|
||||
if (Lucene.isCorruptionException(t)) {
|
||||
if (this.failEngineOnCorruption) {
|
||||
failEngine("corrupt file detected source: [" + source + "]", t);
|
||||
return true;
|
||||
} else {
|
||||
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
|
||||
}
|
||||
}else if (ExceptionsHelper.isOOM(t)) {
|
||||
failEngine("out of memory", t);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private Throwable wrapIfClosed(Throwable t) {
|
||||
|
@ -1611,11 +1616,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
private static final class InternalLock implements Releasable {
|
||||
private final ThreadLocal<Boolean> lockIsHeld;
|
||||
private final ThreadLocal<AtomicInteger> lockIsHeld;
|
||||
private final Lock lock;
|
||||
|
||||
InternalLock(Lock lock) {
|
||||
ThreadLocal<Boolean> tl = null;
|
||||
ThreadLocal<AtomicInteger> tl = null;
|
||||
assert (tl = new ThreadLocal<>()) != null;
|
||||
lockIsHeld = tl;
|
||||
this.lock = lock;
|
||||
|
@ -1635,18 +1640,26 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
|
||||
protected boolean onAssertRelease() {
|
||||
lockIsHeld.set(Boolean.FALSE);
|
||||
AtomicInteger count = lockIsHeld.get();
|
||||
if (count.decrementAndGet() == 0) {
|
||||
lockIsHeld.remove();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected boolean onAssertLock() {
|
||||
lockIsHeld.remove();
|
||||
AtomicInteger count = lockIsHeld.get();
|
||||
if (count == null) {
|
||||
count = new AtomicInteger(0);
|
||||
lockIsHeld.set(count);
|
||||
}
|
||||
count.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean assertLockIsHeld() {
|
||||
Boolean aBoolean = lockIsHeld.get();
|
||||
return aBoolean != null && aBoolean.booleanValue();
|
||||
AtomicInteger count = lockIsHeld.get();
|
||||
return count != null && count.get() > 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.snapshots;
|
|||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -123,6 +124,9 @@ public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardCompo
|
|||
indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState);
|
||||
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);
|
||||
} catch (Throwable t) {
|
||||
if (Lucene.isCorruptionException(t)) {
|
||||
restoreService.failRestore(restoreSource.snapshotId(), shardId());
|
||||
}
|
||||
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.snapshots.blobstore;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
|
@ -674,6 +675,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
Map<String, StoreFileMetaData> metadata = Collections.emptyMap();
|
||||
try {
|
||||
metadata = store.getMetadata().asMap();
|
||||
} catch (CorruptIndexException e) {
|
||||
logger.warn("{} Can't read metadata from store", e, shardId);
|
||||
throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
|
||||
} catch (Throwable e) {
|
||||
// if the index is broken we might not be able to read it
|
||||
logger.warn("{} Can't read metadata from store", e, shardId);
|
||||
|
@ -684,8 +688,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
String fileName = fileInfo.physicalName();
|
||||
final StoreFileMetaData md = metadata.get(fileName);
|
||||
numberOfFiles++;
|
||||
// we don't compute checksum for segments, so always recover them
|
||||
if (!fileName.startsWith("segments") && md != null && fileInfo.isSame(md)) {
|
||||
if (md != null && fileInfo.isSame(md)) {
|
||||
totalSize += md.length();
|
||||
numberOfReusedFiles++;
|
||||
reusedTotalSize += md.length();
|
||||
|
@ -845,6 +848,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
try {
|
||||
failures.add(t);
|
||||
IOUtils.closeWhileHandlingException(indexOutput);
|
||||
if (t instanceof CorruptIndexException) {
|
||||
try {
|
||||
store.markStoreCorrupted((CorruptIndexException)t);
|
||||
} catch (IOException e) {
|
||||
//
|
||||
}
|
||||
}
|
||||
store.deleteQuiet(fileInfo.physicalName());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
|
|
|
@ -107,6 +107,27 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
return directory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the last committed segments info for this store
|
||||
* @throws IOException if the index is corrupted or the segments file is not present
|
||||
*/
|
||||
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
|
||||
return readLastCommittedSegmentsInfo(directory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the last committed segments info for the given directory
|
||||
* @throws IOException if the index is corrupted or the segments file is not present
|
||||
*/
|
||||
private static SegmentInfos readLastCommittedSegmentsInfo(Directory directory) throws IOException {
|
||||
try {
|
||||
return Lucene.readSegmentInfos(directory);
|
||||
} catch (EOFException eof) {
|
||||
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
|
||||
throw new CorruptIndexException("Read past EOF while reading segment infos", eof);
|
||||
}
|
||||
}
|
||||
|
||||
private final void ensureOpen() {
|
||||
if (this.refCount.get() <= 0) {
|
||||
throw new AlreadyClosedException("Store is already closed");
|
||||
|
@ -119,7 +140,12 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
public MetadataSnapshot getMetadata() throws IOException {
|
||||
ensureOpen();
|
||||
failIfCorrupted();
|
||||
return new MetadataSnapshot(distributorDirectory, logger);
|
||||
try {
|
||||
return new MetadataSnapshot(distributorDirectory, logger);
|
||||
} catch (CorruptIndexException ex) {
|
||||
markStoreCorrupted(ex);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -241,6 +267,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
dirs[i] = new SimpleFSDirectory(indexLocations[i]);
|
||||
}
|
||||
DistributorDirectory dir = new DistributorDirectory(dirs);
|
||||
failIfCorrupted(dir, new ShardId("", 1));
|
||||
return new MetadataSnapshot(dir, logger);
|
||||
} finally {
|
||||
IOUtils.close(dirs);
|
||||
|
@ -298,14 +325,18 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
|
||||
public void failIfCorrupted() throws IOException {
|
||||
ensureOpen();
|
||||
final String[] files = directory().listAll();
|
||||
failIfCorrupted(directory, shardId);
|
||||
}
|
||||
|
||||
private static final void failIfCorrupted(Directory directory, ShardId shardId) throws IOException {
|
||||
final String[] files = directory.listAll();
|
||||
List<CorruptIndexException> ex = new ArrayList<>();
|
||||
for (String file : files) {
|
||||
if (file.startsWith(CORRUPTED)) {
|
||||
try(ChecksumIndexInput input = directory().openChecksumInput(file, IOContext.READONCE)) {
|
||||
try(ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) {
|
||||
CodecUtil.checkHeader(input, CODEC, VERSION, VERSION);
|
||||
String msg = input.readString();
|
||||
StringBuilder builder = new StringBuilder(this.shardId.toString());
|
||||
StringBuilder builder = new StringBuilder(shardId.toString());
|
||||
builder.append(" Corrupted index [");
|
||||
builder.append(file).append("] caused by: ");
|
||||
builder.append(msg);
|
||||
|
@ -408,14 +439,11 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
try {
|
||||
final SegmentInfos segmentCommitInfos;
|
||||
try {
|
||||
segmentCommitInfos = Lucene.readSegmentInfos(directory);
|
||||
segmentCommitInfos = Store.readLastCommittedSegmentsInfo(directory);
|
||||
} catch (FileNotFoundException | NoSuchFileException ex) {
|
||||
// no segments file -- can't read metadata
|
||||
logger.trace("Can't read segment infos", ex);
|
||||
return ImmutableMap.of();
|
||||
} catch (EOFException eof) {
|
||||
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
|
||||
throw new CorruptIndexException("Read past EOF while reading segment infos", eof);
|
||||
}
|
||||
Version maxVersion = Version.LUCENE_3_0; // we don't know which version was used to write so we take the max version.
|
||||
Set<String> added = new HashSet<>();
|
||||
|
|
|
@ -477,6 +477,21 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fails the given snapshot restore operation for the given shard
|
||||
*/
|
||||
public void failRestore(SnapshotId snapshotId, ShardId shardId) {
|
||||
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
|
||||
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
|
||||
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
|
||||
if (clusterService.state().nodes().localNodeMaster()) {
|
||||
innerUpdateRestoreState(request);
|
||||
} else {
|
||||
transportService.sendRequest(clusterService.state().nodes().masterNode(),
|
||||
UpdateRestoreStateRequestHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean failed(Snapshot snapshot, String index) {
|
||||
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
|
||||
if (index.equals(failure.index())) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.LifecycleScope;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Predicate;
|
||||
|
@ -32,6 +33,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.count.CountResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
|
@ -62,6 +65,7 @@ import org.elasticsearch.indices.IndicesService;
|
|||
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.monitor.fs.FsStats;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
@ -376,6 +380,72 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
|||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests that restoring of a corrupted shard fails and we get a partial snapshot.
|
||||
* TODO once checksum verification on snapshotting is implemented this test needs to be fixed or split into several
|
||||
* parts... We should also corrupt files on the actual snapshot and check that we don't restore the corrupted shard.
|
||||
*/
|
||||
@Test
|
||||
public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
|
||||
int numDocs = scaledRandomIntBetween(100, 1000);
|
||||
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));
|
||||
|
||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
|
||||
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
ensureGreen();
|
||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
|
||||
for (int i = 0; i < builders.length; i++) {
|
||||
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
ensureGreen();
|
||||
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
|
||||
// we have to flush at least once here since we don't corrupt the translog
|
||||
CountResponse countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
|
||||
ShardRouting shardRouting = corruptRandomFile(false);
|
||||
// we don't corrupt segments.gen since S/R doesn't snapshot this file
|
||||
// the other problem here why we can't corrupt segments.X files is that the snapshot flushes again before
|
||||
// it snapshots and that will write a new segments.X+1 file
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000))));
|
||||
logger.info("--> snapshot");
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test").get();
|
||||
if (createSnapshotResponse.getSnapshotInfo().state() == SnapshotState.PARTIAL) {
|
||||
logger.info("failed during snapshot -- maybe SI file got corrupted");
|
||||
final List<File> files = listShardFiles(shardRouting);
|
||||
File corruptedFile = null;
|
||||
for (File file : files) {
|
||||
if (file.getName().startsWith("corrupted_")) {
|
||||
corruptedFile = file;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertThat(corruptedFile, notNullValue());
|
||||
} else {
|
||||
assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
|
||||
cluster().wipeIndices("test");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()-1));
|
||||
}
|
||||
}
|
||||
|
||||
private int numShards(String... index) {
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(index, false);
|
||||
|
@ -384,6 +454,10 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
|||
|
||||
|
||||
private ShardRouting corruptRandomFile() throws IOException {
|
||||
return corruptRandomFile(true);
|
||||
}
|
||||
|
||||
private ShardRouting corruptRandomFile(final boolean includeSegmentsFiles) throws IOException {
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
|
||||
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators());
|
||||
|
@ -401,7 +475,8 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
|||
files.addAll(Arrays.asList(file.listFiles(new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File pathname) {
|
||||
return pathname.isFile() && !"write.lock".equals(pathname.getName());
|
||||
return pathname.isFile() && !"write.lock".equals(pathname.getName()) &&
|
||||
(includeSegmentsFiles == true || pathname.getName().startsWith("segments") == false);
|
||||
}
|
||||
})));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue