[RECOVERY] Ensure shards are identical after recovery

Today we don't check if the recovery target has all the
files that we expect there after the recovery. This commit
adds aditional safety to ensure all files are present with the
correct checksums on recovery finalization.

Closes #8723
This commit is contained in:
Simon Willnauer 2014-12-01 14:26:07 +01:00
parent c1edcaf388
commit 8736543c71
6 changed files with 248 additions and 40 deletions

View File

@ -25,16 +25,17 @@ import com.google.common.collect.Iterables;
import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.*;
import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.lucene.Directories;
@ -49,6 +50,7 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import java.io.*; import java.io.*;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
@ -530,6 +532,59 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
} }
} }
/**
* This method deletes every file in this store that is not contained in the given source meta data or is a
* legacy checksum file. After the delete it pulls the latest metadata snapshot from the store and compares it
* to the given snapshot. If the snapshots are inconsistent an illegal state exception is thrown
*
* @param reason the reason for this cleanup operation logged for each deleted file
* @param sourceMetaData the metadata used for cleanup. all files in this metadata should be kept around.
* @throws IOException if an IOException occurs
* @throws ElasticsearchIllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException {
failIfCorrupted();
metadataLock.writeLock().lock();
try {
final Directory dir = directory();
for (String existingFile : dir.listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
if (!sourceMetaData.contains(existingFile) && !Store.isChecksum(existingFile)) {
try {
logDeleteFile(reason, existingFile);
dir.deleteFile(existingFile);
} catch (Exception e) {
// ignore, we don't really care, will get deleted later on
}
}
}
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
final Store.RecoveryDiff recoveryDiff = metadataOrEmpty.recoveryDiff(sourceMetaData);
if (recoveryDiff.identical.size() != recoveryDiff.size()) {
if (recoveryDiff.missing.isEmpty()) {
for (StoreFileMetaData meta : recoveryDiff.different) {
StoreFileMetaData local = metadataOrEmpty.get(meta.name());
StoreFileMetaData remote = sourceMetaData.get(meta.name());
// if we have different files the they must have no checksums otherwise something went wrong during recovery.
// we have that problem when we have an empty index is only a segments_1 file then we can't tell if it's a Lucene 4.8 file
// and therefore no checksum. That isn't much of a problem since we simply copy it over anyway but those files come out as
// different in the diff. That's why we have to double check here again if the rest of it matches.
boolean consistent = (local.checksum() == null && remote.checksum() == null && local.hash().equals(remote.hash()) && local.length() == remote.length());
if (consistent == false) {
throw new ElasticsearchIllegalStateException("local version: " + local + " is different from remote version after recovery: " + remote, null);
}
}
} else {
logger.debug("Files are missing on the recovery target: {} ", recoveryDiff);
throw new ElasticsearchIllegalStateException("Files are missing on the recovery target: [different="
+ recoveryDiff.different + ", missing=" + recoveryDiff.missing +']', null);
}
}
} finally {
metadataLock.writeLock().unlock();
}
}
/** /**
* This exists so {@link org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat} can load its boolean setting; can we find a more straightforward way? * This exists so {@link org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat} can load its boolean setting; can we find a more straightforward way?
*/ */
@ -593,13 +648,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* *
* @see StoreFileMetaData * @see StoreFileMetaData
*/ */
public final static class MetadataSnapshot implements Iterable<StoreFileMetaData> { public final static class MetadataSnapshot implements Iterable<StoreFileMetaData>, Streamable {
private static final ESLogger logger = Loggers.getLogger(MetadataSnapshot.class); private static final ESLogger logger = Loggers.getLogger(MetadataSnapshot.class);
private static final Version FIRST_LUCENE_CHECKSUM_VERSION = Version.LUCENE_4_8; private static final Version FIRST_LUCENE_CHECKSUM_VERSION = Version.LUCENE_4_8;
// we stopped writing legacy checksums in 1.3.0 so all segments here must use the new CRC32 version // we stopped writing legacy checksums in 1.3.0 so all segments here must use the new CRC32 version
private static final Version FIRST_ES_CRC32_VERSION = org.elasticsearch.Version.V_1_3_0.luceneVersion; private static final Version FIRST_ES_CRC32_VERSION = org.elasticsearch.Version.V_1_3_0.luceneVersion;
private final Map<String, StoreFileMetaData> metadata; private Map<String, StoreFileMetaData> metadata;
public static final MetadataSnapshot EMPTY = new MetadataSnapshot(); public static final MetadataSnapshot EMPTY = new MetadataSnapshot();
@ -645,7 +700,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
if (useLuceneChecksum(maxVersion, legacyChecksum != null)) { if (useLuceneChecksum(maxVersion, legacyChecksum != null)) {
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
} else { } else {
builder.put(segmentsFile, new StoreFileMetaData(segmentsFile, directory.fileLength(segmentsFile), legacyChecksum, null)); builder.put(segmentsFile, new StoreFileMetaData(segmentsFile, directory.fileLength(segmentsFile), legacyChecksum, null, hashFile(directory, segmentsFile)));
} }
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
throw ex; throw ex;
@ -733,9 +788,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
throw new CorruptIndexException("Can't retrieve checksum from file: " + file + " file length must be >= " + CodecUtil.footerLength() + " but was: " + in.length(), in); throw new CorruptIndexException("Can't retrieve checksum from file: " + file + " file length must be >= " + CodecUtil.footerLength() + " but was: " + in.length(), in);
} }
if (readFileAsHash) { if (readFileAsHash) {
hashFile(fileHash, new InputStreamIndexInput(in, in.length()), in.length()); final VerifyingIndexInput verifyingIndexInput = new VerifyingIndexInput(in); // additional safety we checksum the entire file we read the hash for...
hashFile(fileHash, new InputStreamIndexInput(verifyingIndexInput, in.length()), in.length());
checksum = digestToString(verifyingIndexInput.verify());
} else {
checksum = digestToString(CodecUtil.retrieveChecksum(in));
} }
checksum = digestToString(CodecUtil.retrieveChecksum(in));
} catch (Throwable ex) { } catch (Throwable ex) {
logger.debug("Can retrieve checksum from file [{}]", ex, file); logger.debug("Can retrieve checksum from file [{}]", ex, file);
@ -745,6 +803,18 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
} }
} }
/**
* Computes a strong hash value for small files. Note that this method should only be used for files < 1MB
*/
public static BytesRef hashFile(Directory directory, String file) throws IOException {
final BytesRefBuilder fileHash = new BytesRefBuilder();
try (final IndexInput in = directory.openInput(file, IOContext.READONCE)) {
hashFile(fileHash, new InputStreamIndexInput(in, in.length()), in.length());
}
return fileHash.get();
}
/** /**
* Computes a strong hash value for small files. Note that this method should only be used for files < 1MB * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB
*/ */
@ -867,6 +937,38 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public int size() { public int size() {
return metadata.size(); return metadata.size();
} }
public static MetadataSnapshot read(StreamInput in) throws IOException {
MetadataSnapshot storeFileMetaDatas = new MetadataSnapshot();
storeFileMetaDatas.readFrom(in);
return storeFileMetaDatas;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
StoreFileMetaData meta = StoreFileMetaData.readStoreFileMetaData(in);
builder.put(meta.name(), meta);
}
this.metadata = builder.build();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(this.metadata.size());
for (StoreFileMetaData meta : this) {
meta.writeTo(out);
}
}
/**
* Returns true iff this metadata contains the given file.
*/
public boolean contains(String existingFile) {
return metadata.containsKey(existingFile);
}
} }
/** /**
@ -899,6 +1001,15 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public int size() { public int size() {
return identical.size() + different.size() + missing.size(); return identical.size() + different.size() + missing.size();
} }
@Override
public String toString() {
return "RecoveryDiff{" +
"identical=" + identical +
", different=" + different +
", missing=" + missing +
'}';
}
} }
public final static class LegacyChecksums { public final static class LegacyChecksums {
@ -1140,10 +1251,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return new ByteArrayDataInput(checksum).readLong(); return new ByteArrayDataInput(checksum).readLong();
} }
public void verify() throws CorruptIndexException { public long verify() throws CorruptIndexException {
long storedChecksum = getStoredChecksum(); long storedChecksum = getStoredChecksum();
if (getChecksum() == storedChecksum) { if (getChecksum() == storedChecksum) {
return; return storedChecksum;
} }
throw new CorruptIndexException("verification failed : calculated=" + Store.digestToString(getChecksum()) + throw new CorruptIndexException("verification failed : calculated=" + Store.digestToString(getChecksum()) +
" stored=" + Store.digestToString(storedChecksum), this); " stored=" + Store.digestToString(storedChecksum), this);

View File

@ -99,6 +99,7 @@ public class StoreFileMetaData implements Streamable {
*/ */
public boolean isSame(StoreFileMetaData other) { public boolean isSame(StoreFileMetaData other) {
if (checksum == null || other.checksum == null) { if (checksum == null || other.checksum == null) {
// we can't tell if either or is null so we return false in this case! this is why we don't use equals for this!
return false; return false;
} }
return length == other.length && checksum.equals(other.checksum) && hash.equals(other.hash); return length == other.length && checksum.equals(other.checksum) && hash.equals(other.hash);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
import java.io.IOException; import java.io.IOException;
@ -36,12 +37,12 @@ class RecoveryCleanFilesRequest extends TransportRequest {
private long recoveryId; private long recoveryId;
private ShardId shardId; private ShardId shardId;
private Set<String> snapshotFiles; private Store.MetadataSnapshot snapshotFiles;
RecoveryCleanFilesRequest() { RecoveryCleanFilesRequest() {
} }
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Set<String> snapshotFiles) { RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles) {
this.recoveryId = recoveryId; this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
this.snapshotFiles = snapshotFiles; this.snapshotFiles = snapshotFiles;
@ -55,20 +56,12 @@ class RecoveryCleanFilesRequest extends TransportRequest {
return shardId; return shardId;
} }
public Set<String> snapshotFiles() {
return snapshotFiles;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
recoveryId = in.readLong(); recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
int size = in.readVInt(); snapshotFiles = Store.MetadataSnapshot.read(in);
snapshotFiles = Sets.newHashSetWithExpectedSize(size);
for (int i = 0; i < size; i++) {
snapshotFiles.add(in.readString());
}
} }
@Override @Override
@ -76,9 +69,10 @@ class RecoveryCleanFilesRequest extends TransportRequest {
super.writeTo(out); super.writeTo(out);
out.writeLong(recoveryId); out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
out.writeVInt(snapshotFiles.size()); snapshotFiles.writeTo(out);
for (String snapshotFile : snapshotFiles) { }
out.writeString(snapshotFile);
} public Store.MetadataSnapshot sourceMetaSnapshot() {
return snapshotFiles;
} }
} }

View File

@ -380,17 +380,11 @@ public class RecoveryTarget extends AbstractComponent {
final Store store = recoveryStatus.store(); final Store store = recoveryStatus.store();
// now write checksums // now write checksums
recoveryStatus.legacyChecksums().write(store); recoveryStatus.legacyChecksums().write(store);
Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot();
for (String existingFile : store.directory().listAll()) { try {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) { } catch (Exception ex) {
try { throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
store.logDeleteFile("recovery CleanFilesRequestHandler", existingFile);
store.directory().deleteFile(existingFile);
} catch (Exception e) {
// ignore, we don't really care, will get deleted later on
}
}
} }
channel.sendResponse(TransportResponse.Empty.INSTANCE); channel.sendResponse(TransportResponse.Empty.INSTANCE);
} }

View File

@ -100,6 +100,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
} }
}; };
public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final TimeValue internalActionTimeout, final TransportService transportService, final TimeValue internalActionTimeout,
final TimeValue internalActionLongTimeout, final ClusterService clusterService, final TimeValue internalActionLongTimeout, final ClusterService clusterService,
@ -336,7 +337,6 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
cancelableThreads.run(new Interruptable() { cancelableThreads.run(new Interruptable() {
@Override @Override
public void run() throws InterruptedException { public void run() throws InterruptedException {
final Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
// Send the CLEAN_FILES request, which takes all of the files that // Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file // were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for // names to the actual file names. It also writes checksums for
@ -346,7 +346,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// related to this recovery (out of date segments, for example) // related to this recovery (out of date segments, for example)
// are deleted // are deleted
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
TransportRequestOptions.options().withTimeout(internalActionTimeout), TransportRequestOptions.options().withTimeout(internalActionTimeout),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} }

View File

@ -292,7 +292,11 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
if (file.equals("write.lock") || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { if (file.equals("write.lock") || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
continue; continue;
} }
StoreFileMetaData storeFileMetaData = new StoreFileMetaData(file, store.directory().fileLength(file), file + "checksum", null); BytesRef hash = new BytesRef();
if (file.startsWith("segments")) {
hash = Store.MetadataSnapshot.hashFile(store.directory(), file);
}
StoreFileMetaData storeFileMetaData = new StoreFileMetaData(file, store.directory().fileLength(file), file + "checksum", null, hash);
legacyMeta.put(file, storeFileMetaData); legacyMeta.put(file, storeFileMetaData);
checksums.add(storeFileMetaData); checksums.add(storeFileMetaData);
} }
@ -897,4 +901,108 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
store.deleteContent(); store.deleteContent();
IOUtils.close(store); IOUtils.close(store);
} }
@Test
public void testCleanupFromSnapshot() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
// this time random codec....
IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec());
// we keep all commits and that allows us clean based on multiple snapshots
indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig);
int docs = 1 + random().nextInt(100);
int numCommits = 0;
for (int i = 0; i < docs; i++) {
if (i > 0 && randomIntBetween(0, 10 ) == 0) {
writer.commit();
numCommits++;
}
Document doc = new Document();
doc.add(new TextField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
writer.addDocument(doc);
}
if (numCommits < 1) {
writer.commit();
Document doc = new Document();
doc.add(new TextField("id", "" + docs++, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
writer.addDocument(doc);
}
Store.MetadataSnapshot firstMeta = store.getMetadata();
if (random().nextBoolean()) {
for (int i = 0; i < docs; i++) {
if (random().nextBoolean()) {
Document doc = new Document();
doc.add(new TextField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.updateDocument(new Term("id", "" + i), doc);
}
}
}
writer.commit();
writer.close();
Store.MetadataSnapshot secondMeta = store.getMetadata();
Store.LegacyChecksums checksums = new Store.LegacyChecksums();
Map<String, StoreFileMetaData> legacyMeta = new HashMap<>();
for (String file : store.directory().listAll()) {
if (file.equals("write.lock") || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
continue;
}
BytesRef hash = new BytesRef();
if (file.startsWith("segments")) {
hash = Store.MetadataSnapshot.hashFile(store.directory(), file);
}
StoreFileMetaData storeFileMetaData = new StoreFileMetaData(file, store.directory().fileLength(file), file + "checksum", null, hash);
legacyMeta.put(file, storeFileMetaData);
checksums.add(storeFileMetaData);
}
checksums.write(store); // write one checksum file here - we expect it to survive all the cleanups
if (randomBoolean()) {
store.cleanupAndVerify("test", firstMeta);
String[] strings = store.directory().listAll();
int numChecksums = 0;
int numNotFound = 0;
for (String file : strings) {
assertTrue(firstMeta.contains(file) || Store.isChecksum(file));
if (Store.isChecksum(file)) {
numChecksums++;
} else if (secondMeta.contains(file) == false) {
numNotFound++;
}
}
assertTrue("at least one file must not be in here since we have two commits?", numNotFound > 0);
assertEquals("we wrote one checksum but it's gone now? - checksums are supposed to be kept", numChecksums, 1);
} else {
store.cleanupAndVerify("test", secondMeta);
String[] strings = store.directory().listAll();
int numChecksums = 0;
int numNotFound = 0;
for (String file : strings) {
assertTrue(secondMeta.contains(file) || Store.isChecksum(file));
if (Store.isChecksum(file)) {
numChecksums++;
} else if (firstMeta.contains(file) == false) {
numNotFound++;
}
}
assertTrue("at least one file must not be in here since we have two commits?", numNotFound > 0);
assertEquals("we wrote one checksum but it's gone now? - checksums are supposed to be kept", numChecksums, 1);
}
store.deleteContent();
IOUtils.close(store);
}
} }