From d8169e5fdcfbc804659cdad119a783be87ff386a Mon Sep 17 00:00:00 2001
From: Armin Braun <me@obrown.io>
Date: Mon, 10 Feb 2020 16:50:09 +0100
Subject: [PATCH] Don't Upload Redundant Shard Files (#51729) (#52147)

Segment(s) info blobs are already stored with their full content
in the "hash" field in the shard snapshot metadata as long as they are
smaller than 1MB. We can make use of this fact and never upload them
physically to the repo.
This saves a non-trivial number of uploads and downloads when restoring
and might also lower the latency of searchable snapshots since they can save
phyiscally loading this information as well.
---
 .../index/store/StoreFileMetaData.java        |  25 +++++
 .../blobstore/BlobStoreRepository.java        | 104 +++++++++++-------
 .../DedicatedClusterSnapshotRestoreIT.java    |  16 +--
 .../SharedClusterSnapshotRestoreIT.java       |   3 +-
 4 files changed, 102 insertions(+), 46 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java b/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java
index 59ad749f638..abd0e593a9f 100644
--- a/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java
+++ b/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java
@@ -19,11 +19,13 @@
 
 package org.elasticsearch.index.store;
 
+import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -100,6 +102,29 @@ public class StoreFileMetaData implements Writeable {
         return this.checksum;
     }
 
+    /**
+     * Checks if the bytes returned by {@link #hash()} are the contents of the file that this instances refers to.
+     *
+     * @return {@code true} iff {@link #hash()} will return the actual file contents
+     */
+    public boolean hashEqualsContents() {
+        if (hash.length == length) {
+            try {
+                final boolean checksumsMatch = Store.digestToString(CodecUtil.retrieveChecksum(
+                    new ByteArrayIndexInput("store_file", hash.bytes, hash.offset, hash.length))).equals(checksum);
+                assert checksumsMatch : "Checksums did not match for [" + this + "] which has a hash of [" + hash + "]";
+                return checksumsMatch;
+            } catch (Exception e) {
+                // Hash didn't contain any bytes that Lucene could extract a checksum from so we can't verify against the checksum of the
+                // original file. We should never see an exception here because lucene files are assumed to always contain the checksum
+                // footer.
+                assert false : new AssertionError("Saw exception for hash [" + hash + "] but expected it to be Lucene file", e);
+                return false;
+            }
+        }
+        return false;
+    }
+
     /**
      * Returns <code>true</code> iff the length and the checksums are the same. otherwise <code>false</code>
      */
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 19c5ee794f6..eef39aa0c75 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -30,6 +30,7 @@ import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
@@ -182,7 +183,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private static final String SNAPSHOT_INDEX_CODEC = "snapshots";
 
-    private static final String DATA_BLOB_PREFIX = "__";
+    private static final String UPLOADED_DATA_BLOB_PREFIX = "__";
+
+    /**
+     * Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are
+     * already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and
+     * {@link BlobStoreIndexShardSnapshots}. This is the case for files for which {@link StoreFileMetaData#hashEqualsContents()} is
+     * {@code true}.
+     */
+    private static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";
 
     /**
      * When set to true metadata files are stored in compressed format. This setting doesn’t affect index
@@ -1529,6 +1538,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     }
                 }
 
+                // We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
+                // directly in the shard level metadata in this case
+                final boolean needsWrite = md.hashEqualsContents() == false;
                 indexTotalFileCount += md.length();
                 indexTotalNumberOfFiles++;
 
@@ -1537,9 +1549,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     indexIncrementalSize += md.length();
                     // create a new FileInfo
                     BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
-                        new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize());
+                        new BlobStoreIndexShardSnapshot.FileInfo(
+                            (needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
+                            md, chunkSize());
                     indexCommitPointFiles.add(snapshotFileInfo);
-                    filesToSnapshot.add(snapshotFileInfo);
+                    if (needsWrite) {
+                        filesToSnapshot.add(snapshotFileInfo);
+                    }
+                    assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
                 } else {
                     indexCommitPointFiles.add(existingFileInfo);
                 }
@@ -1548,8 +1565,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
                 indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
 
-            assert indexIncrementalFileCount == filesToSnapshot.size();
-
             final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
             allFilesUploadedListener.whenComplete(v -> {
                 final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
@@ -1638,6 +1653,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
     }
 
+    private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) {
+        try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) {
+            final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];
+            indexInput.readBytes(tmp, 0, tmp.length);
+            assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp));
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+        return true;
+    }
+
     @Override
     public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
                              RecoveryState recoveryState, ActionListener<Void> listener) {
@@ -1681,38 +1707,42 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
                 private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
                     boolean success = false;
-
-                    try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
-                                                                 @Override
-                                                                 protected InputStream openSlice(long slice) throws IOException {
-                                                                     return container.readBlob(fileInfo.partName(slice));
-                                                                 }
-                                                             },
-                        restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
-                        try (IndexOutput indexOutput =
-                                 store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
-                            final byte[] buffer = new byte[BUFFER_SIZE];
-                            int length;
-                            while ((length = stream.read(buffer)) > 0) {
-                                indexOutput.writeBytes(buffer, 0, length);
-                                recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
-                            }
-                            Store.verify(indexOutput);
-                            indexOutput.close();
-                            store.directory().sync(Collections.singleton(fileInfo.physicalName()));
-                            success = true;
-                        } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
-                            try {
-                                store.markStoreCorrupted(ex);
-                            } catch (IOException e) {
-                                logger.warn("store cannot be marked as corrupted", e);
-                            }
-                            throw ex;
-                        } finally {
-                            if (success == false) {
-                                store.deleteQuiet(fileInfo.physicalName());
+                    try (IndexOutput indexOutput =
+                             store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
+                        if (fileInfo.name().startsWith(VIRTUAL_DATA_BLOB_PREFIX)) {
+                            final BytesRef hash = fileInfo.metadata().hash();
+                            indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
+                            recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), hash.length);
+                        } else {
+                            try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
+                                @Override
+                                protected InputStream openSlice(long slice) throws IOException {
+                                    return container.readBlob(fileInfo.partName(slice));
+                                }
+                            }, restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
+                                final byte[] buffer = new byte[BUFFER_SIZE];
+                                int length;
+                                while ((length = stream.read(buffer)) > 0) {
+                                    indexOutput.writeBytes(buffer, 0, length);
+                                    recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
+                                }
                             }
                         }
+                        Store.verify(indexOutput);
+                        indexOutput.close();
+                        store.directory().sync(Collections.singleton(fileInfo.physicalName()));
+                        success = true;
+                    } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
+                        try {
+                            store.markStoreCorrupted(ex);
+                        } catch (IOException e) {
+                            logger.warn("store cannot be marked as corrupted", e);
+                        }
+                        throw ex;
+                    } finally {
+                        if (success == false) {
+                            store.deleteQuiet(fileInfo.physicalName());
+                        }
                     }
                 }
             }.restore(snapshotFiles, store, l);
@@ -1843,7 +1873,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 || (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
                     && survivingSnapshotUUIDs.contains(
                         blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
-                || (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
+                || (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
                 || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
     }
 
@@ -1897,7 +1927,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             final BlobStoreIndexShardSnapshots shardSnapshots = indexShardSnapshotsFormat.read(shardContainer, Long.toString(latest));
             return new Tuple<>(shardSnapshots, latest);
         } else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
-                                                                              || b.startsWith(DATA_BLOB_PREFIX))) {
+                                                                              || b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {
             throw new IllegalStateException(
                 "Could not find a readable index-N file in a non-empty shard snapshot directory [" + shardContainer.path() + "]");
         }
diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
index 777363cfcf5..2320e4e0d36 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
@@ -1132,11 +1132,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
 
         SnapshotStats stats = snapshots.get(0).getStats();
 
-        assertThat(stats.getTotalFileCount(), is(snapshot0FileCount));
-        assertThat(stats.getTotalSize(), is(snapshot0FileSize));
+        assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
+        assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));
 
-        assertThat(stats.getIncrementalFileCount(), equalTo(snapshot0FileCount));
-        assertThat(stats.getIncrementalSize(), equalTo(snapshot0FileSize));
+        assertThat(stats.getIncrementalFileCount(), equalTo(stats.getTotalFileCount()));
+        assertThat(stats.getIncrementalSize(), equalTo(stats.getTotalSize()));
 
         assertThat(stats.getIncrementalFileCount(), equalTo(stats.getProcessedFileCount()));
         assertThat(stats.getIncrementalSize(), equalTo(stats.getProcessedSize()));
@@ -1175,8 +1175,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         ArrayList<Path> snapshotFilesDiff = new ArrayList<>(snapshot1Files);
         snapshotFilesDiff.removeAll(snapshot0Files);
 
-        assertThat(anotherStats.getIncrementalFileCount(), equalTo(snapshotFilesDiff.size()));
-        assertThat(anotherStats.getIncrementalSize(), equalTo(calculateTotalFilesSize(snapshotFilesDiff)));
+        assertThat(anotherStats.getIncrementalFileCount(), greaterThanOrEqualTo(snapshotFilesDiff.size()));
+        assertThat(anotherStats.getIncrementalSize(), greaterThanOrEqualTo(calculateTotalFilesSize(snapshotFilesDiff)));
 
         assertThat(anotherStats.getIncrementalFileCount(), equalTo(anotherStats.getProcessedFileCount()));
         assertThat(anotherStats.getIncrementalSize(), equalTo(anotherStats.getProcessedSize()));
@@ -1184,8 +1184,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         assertThat(stats.getTotalSize(), lessThan(anotherStats.getTotalSize()));
         assertThat(stats.getTotalFileCount(), lessThan(anotherStats.getTotalFileCount()));
 
-        assertThat(anotherStats.getTotalFileCount(), is(snapshot1FileCount));
-        assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
+        assertThat(anotherStats.getTotalFileCount(), greaterThanOrEqualTo(snapshot1FileCount));
+        assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
     }
 
     public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
index a54eaf60baa..cd14a7f9f5d 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
@@ -1036,7 +1036,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         final String indexName = "unrestorable-files";
         final int maxRetries = randomIntBetween(1, 10);
 
-        Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build();
+        Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries)
+            .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).build();
 
         Settings repositorySettings = Settings.builder()
                                                 .put("random", randomAlphaOfLength(10))