diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 0f53e7e7118..c4c4b7d5d8d 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -25,16 +25,17 @@ import com.google.common.collect.Iterables; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.*; import org.apache.lucene.store.*; -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.BytesRefBuilder; -import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.Version; +import org.apache.lucene.util.*; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Directories; @@ -49,6 +50,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.distributor.Distributor; +import org.elasticsearch.indices.recovery.RecoveryFailedException; import java.io.*; import java.nio.file.NoSuchFileException; @@ -530,6 +532,59 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } + /** + * This method deletes every file in this store that is not contained in the given source meta data or is a + * legacy checksum file. After the delete it pulls the latest metadata snapshot from the store and compares it + * to the given snapshot. If the snapshots are inconsistent an illegal state exception is thrown + * + * @param reason the reason for this cleanup operation logged for each deleted file + * @param sourceMetaData the metadata used for cleanup. all files in this metadata should be kept around. + * @throws IOException if an IOException occurs + * @throws ElasticsearchIllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException { + failIfCorrupted(); + metadataLock.writeLock().lock(); + try { + final Directory dir = directory(); + for (String existingFile : dir.listAll()) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) + if (!sourceMetaData.contains(existingFile) && !Store.isChecksum(existingFile)) { + try { + logDeleteFile(reason, existingFile); + dir.deleteFile(existingFile); + } catch (Exception e) { + // ignore, we don't really care, will get deleted later on + } + } + } + final Store.MetadataSnapshot metadataOrEmpty = getMetadata(); + final Store.RecoveryDiff recoveryDiff = metadataOrEmpty.recoveryDiff(sourceMetaData); + if (recoveryDiff.identical.size() != recoveryDiff.size()) { + if (recoveryDiff.missing.isEmpty()) { + for (StoreFileMetaData meta : recoveryDiff.different) { + StoreFileMetaData local = metadataOrEmpty.get(meta.name()); + StoreFileMetaData remote = sourceMetaData.get(meta.name()); + // if we have different files the they must have no checksums otherwise something went wrong during recovery. + // we have that problem when we have an empty index is only a segments_1 file then we can't tell if it's a Lucene 4.8 file + // and therefore no checksum. That isn't much of a problem since we simply copy it over anyway but those files come out as + // different in the diff. That's why we have to double check here again if the rest of it matches. + boolean consistent = (local.checksum() == null && remote.checksum() == null && local.hash().equals(remote.hash()) && local.length() == remote.length()); + if (consistent == false) { + throw new ElasticsearchIllegalStateException("local version: " + local + " is different from remote version after recovery: " + remote, null); + } + } + } else { + logger.debug("Files are missing on the recovery target: {} ", recoveryDiff); + throw new ElasticsearchIllegalStateException("Files are missing on the recovery target: [different=" + + recoveryDiff.different + ", missing=" + recoveryDiff.missing +']', null); + } + } + } finally { + metadataLock.writeLock().unlock(); + } + } + /** * This exists so {@link org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat} can load its boolean setting; can we find a more straightforward way? */ @@ -593,13 +648,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * * @see StoreFileMetaData */ - public final static class MetadataSnapshot implements Iterable { + public final static class MetadataSnapshot implements Iterable, Streamable { private static final ESLogger logger = Loggers.getLogger(MetadataSnapshot.class); private static final Version FIRST_LUCENE_CHECKSUM_VERSION = Version.LUCENE_4_8; // we stopped writing legacy checksums in 1.3.0 so all segments here must use the new CRC32 version private static final Version FIRST_ES_CRC32_VERSION = org.elasticsearch.Version.V_1_3_0.luceneVersion; - private final Map metadata; + private Map metadata; public static final MetadataSnapshot EMPTY = new MetadataSnapshot(); @@ -645,7 +700,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref if (useLuceneChecksum(maxVersion, legacyChecksum != null)) { checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); } else { - builder.put(segmentsFile, new StoreFileMetaData(segmentsFile, directory.fileLength(segmentsFile), legacyChecksum, null)); + builder.put(segmentsFile, new StoreFileMetaData(segmentsFile, directory.fileLength(segmentsFile), legacyChecksum, null, hashFile(directory, segmentsFile))); } } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { throw ex; @@ -733,9 +788,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref throw new CorruptIndexException("Can't retrieve checksum from file: " + file + " file length must be >= " + CodecUtil.footerLength() + " but was: " + in.length(), in); } if (readFileAsHash) { - hashFile(fileHash, new InputStreamIndexInput(in, in.length()), in.length()); + final VerifyingIndexInput verifyingIndexInput = new VerifyingIndexInput(in); // additional safety we checksum the entire file we read the hash for... + hashFile(fileHash, new InputStreamIndexInput(verifyingIndexInput, in.length()), in.length()); + checksum = digestToString(verifyingIndexInput.verify()); + } else { + checksum = digestToString(CodecUtil.retrieveChecksum(in)); } - checksum = digestToString(CodecUtil.retrieveChecksum(in)); } catch (Throwable ex) { logger.debug("Can retrieve checksum from file [{}]", ex, file); @@ -745,6 +803,18 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } + /** + * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB + */ + public static BytesRef hashFile(Directory directory, String file) throws IOException { + final BytesRefBuilder fileHash = new BytesRefBuilder(); + try (final IndexInput in = directory.openInput(file, IOContext.READONCE)) { + hashFile(fileHash, new InputStreamIndexInput(in, in.length()), in.length()); + } + return fileHash.get(); + } + + /** * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB */ @@ -867,6 +937,38 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public int size() { return metadata.size(); } + + public static MetadataSnapshot read(StreamInput in) throws IOException { + MetadataSnapshot storeFileMetaDatas = new MetadataSnapshot(); + storeFileMetaDatas.readFrom(in); + return storeFileMetaDatas; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < size; i++) { + StoreFileMetaData meta = StoreFileMetaData.readStoreFileMetaData(in); + builder.put(meta.name(), meta); + } + this.metadata = builder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(this.metadata.size()); + for (StoreFileMetaData meta : this) { + meta.writeTo(out); + } + } + + /** + * Returns true iff this metadata contains the given file. + */ + public boolean contains(String existingFile) { + return metadata.containsKey(existingFile); + } } /** @@ -899,6 +1001,15 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public int size() { return identical.size() + different.size() + missing.size(); } + + @Override + public String toString() { + return "RecoveryDiff{" + + "identical=" + identical + + ", different=" + different + + ", missing=" + missing + + '}'; + } } public final static class LegacyChecksums { @@ -1140,10 +1251,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref return new ByteArrayDataInput(checksum).readLong(); } - public void verify() throws CorruptIndexException { + public long verify() throws CorruptIndexException { long storedChecksum = getStoredChecksum(); if (getChecksum() == storedChecksum) { - return; + return storedChecksum; } throw new CorruptIndexException("verification failed : calculated=" + Store.digestToString(getChecksum()) + " stored=" + Store.digestToString(storedChecksum), this); diff --git a/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java b/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java index 31d194315d7..f483b149f30 100644 --- a/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java +++ b/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java @@ -99,6 +99,7 @@ public class StoreFileMetaData implements Streamable { */ public boolean isSame(StoreFileMetaData other) { if (checksum == null || other.checksum == null) { + // we can't tell if either or is null so we return false in this case! this is why we don't use equals for this! return false; } return length == other.length && checksum.equals(other.checksum) && hash.equals(other.hash); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java index 5d506df040e..819ae984ae9 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -36,12 +37,12 @@ class RecoveryCleanFilesRequest extends TransportRequest { private long recoveryId; private ShardId shardId; - private Set snapshotFiles; + private Store.MetadataSnapshot snapshotFiles; RecoveryCleanFilesRequest() { } - RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Set snapshotFiles) { + RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles) { this.recoveryId = recoveryId; this.shardId = shardId; this.snapshotFiles = snapshotFiles; @@ -55,20 +56,12 @@ class RecoveryCleanFilesRequest extends TransportRequest { return shardId; } - public Set snapshotFiles() { - return snapshotFiles; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); - int size = in.readVInt(); - snapshotFiles = Sets.newHashSetWithExpectedSize(size); - for (int i = 0; i < size; i++) { - snapshotFiles.add(in.readString()); - } + snapshotFiles = Store.MetadataSnapshot.read(in); } @Override @@ -76,9 +69,10 @@ class RecoveryCleanFilesRequest extends TransportRequest { super.writeTo(out); out.writeLong(recoveryId); shardId.writeTo(out); - out.writeVInt(snapshotFiles.size()); - for (String snapshotFile : snapshotFiles) { - out.writeString(snapshotFile); - } + snapshotFiles.writeTo(out); + } + + public Store.MetadataSnapshot sourceMetaSnapshot() { + return snapshotFiles; } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e886523632f..f588450741e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -380,17 +380,11 @@ public class RecoveryTarget extends AbstractComponent { final Store store = recoveryStatus.store(); // now write checksums recoveryStatus.legacyChecksums().write(store); - - for (String existingFile : store.directory().listAll()) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) - if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) { - try { - store.logDeleteFile("recovery CleanFilesRequestHandler", existingFile); - store.directory().deleteFile(existingFile); - } catch (Exception e) { - // ignore, we don't really care, will get deleted later on - } - } + Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot(); + try { + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + } catch (Exception ex) { + throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index caa0a00e457..051eb2c32ff 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -100,6 +100,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { } }; + public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, final TransportService transportService, final TimeValue internalActionTimeout, final TimeValue internalActionLongTimeout, final ClusterService clusterService, @@ -336,7 +337,6 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { cancelableThreads.run(new Interruptable() { @Override public void run() throws InterruptedException { - final Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for @@ -346,7 +346,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { // related to this recovery (out of date segments, for example) // are deleted transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 5db2fa03f00..626b581d1f0 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -292,7 +292,11 @@ public class StoreTest extends ElasticsearchLuceneTestCase { if (file.equals("write.lock") || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { continue; } - StoreFileMetaData storeFileMetaData = new StoreFileMetaData(file, store.directory().fileLength(file), file + "checksum", null); + BytesRef hash = new BytesRef(); + if (file.startsWith("segments")) { + hash = Store.MetadataSnapshot.hashFile(store.directory(), file); + } + StoreFileMetaData storeFileMetaData = new StoreFileMetaData(file, store.directory().fileLength(file), file + "checksum", null, hash); legacyMeta.put(file, storeFileMetaData); checksums.add(storeFileMetaData); } @@ -897,4 +901,108 @@ public class StoreTest extends ElasticsearchLuceneTestCase { store.deleteContent(); IOUtils.close(store); } + + @Test + public void testCleanupFromSnapshot() throws IOException { + final ShardId shardId = new ShardId(new Index("index"), 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId)); + // this time random codec.... + IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec()); + // we keep all commits and that allows us clean based on multiple snapshots + indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig); + int docs = 1 + random().nextInt(100); + int numCommits = 0; + for (int i = 0; i < docs; i++) { + if (i > 0 && randomIntBetween(0, 10 ) == 0) { + writer.commit(); + numCommits++; + } + Document doc = new Document(); + doc.add(new TextField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + writer.addDocument(doc); + + } + if (numCommits < 1) { + writer.commit(); + Document doc = new Document(); + doc.add(new TextField("id", "" + docs++, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + writer.addDocument(doc); + } + + Store.MetadataSnapshot firstMeta = store.getMetadata(); + + if (random().nextBoolean()) { + for (int i = 0; i < docs; i++) { + if (random().nextBoolean()) { + Document doc = new Document(); + doc.add(new TextField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add(new TextField("body", TestUtil.randomRealisticUnicodeString(random()), random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.updateDocument(new Term("id", "" + i), doc); + } + } + } + writer.commit(); + writer.close(); + + Store.MetadataSnapshot secondMeta = store.getMetadata(); + + Store.LegacyChecksums checksums = new Store.LegacyChecksums(); + Map legacyMeta = new HashMap<>(); + for (String file : store.directory().listAll()) { + if (file.equals("write.lock") || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { + continue; + } + BytesRef hash = new BytesRef(); + if (file.startsWith("segments")) { + hash = Store.MetadataSnapshot.hashFile(store.directory(), file); + } + StoreFileMetaData storeFileMetaData = new StoreFileMetaData(file, store.directory().fileLength(file), file + "checksum", null, hash); + legacyMeta.put(file, storeFileMetaData); + checksums.add(storeFileMetaData); + } + checksums.write(store); // write one checksum file here - we expect it to survive all the cleanups + + if (randomBoolean()) { + store.cleanupAndVerify("test", firstMeta); + String[] strings = store.directory().listAll(); + int numChecksums = 0; + int numNotFound = 0; + for (String file : strings) { + assertTrue(firstMeta.contains(file) || Store.isChecksum(file)); + if (Store.isChecksum(file)) { + numChecksums++; + } else if (secondMeta.contains(file) == false) { + numNotFound++; + } + + } + assertTrue("at least one file must not be in here since we have two commits?", numNotFound > 0); + assertEquals("we wrote one checksum but it's gone now? - checksums are supposed to be kept", numChecksums, 1); + } else { + store.cleanupAndVerify("test", secondMeta); + String[] strings = store.directory().listAll(); + int numChecksums = 0; + int numNotFound = 0; + for (String file : strings) { + assertTrue(secondMeta.contains(file) || Store.isChecksum(file)); + if (Store.isChecksum(file)) { + numChecksums++; + } else if (firstMeta.contains(file) == false) { + numNotFound++; + } + + } + assertTrue("at least one file must not be in here since we have two commits?", numNotFound > 0); + assertEquals("we wrote one checksum but it's gone now? - checksums are supposed to be kept", numChecksums, 1); + } + + store.deleteContent(); + IOUtils.close(store); + } }