Segment(s) info blobs are already stored with their full content in the "hash" field in the shard snapshot metadata as long as they are smaller than 1MB. We can make use of this fact and never upload them physically to the repo. This saves a non-trivial number of uploads and downloads when restoring and might also lower the latency of searchable snapshots since they can save phyiscally loading this information as well.
This commit is contained in:
parent
c7cc383d33
commit
d8169e5fdc
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
|
@ -100,6 +102,29 @@ public class StoreFileMetaData implements Writeable {
|
|||
return this.checksum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the bytes returned by {@link #hash()} are the contents of the file that this instances refers to.
|
||||
*
|
||||
* @return {@code true} iff {@link #hash()} will return the actual file contents
|
||||
*/
|
||||
public boolean hashEqualsContents() {
|
||||
if (hash.length == length) {
|
||||
try {
|
||||
final boolean checksumsMatch = Store.digestToString(CodecUtil.retrieveChecksum(
|
||||
new ByteArrayIndexInput("store_file", hash.bytes, hash.offset, hash.length))).equals(checksum);
|
||||
assert checksumsMatch : "Checksums did not match for [" + this + "] which has a hash of [" + hash + "]";
|
||||
return checksumsMatch;
|
||||
} catch (Exception e) {
|
||||
// Hash didn't contain any bytes that Lucene could extract a checksum from so we can't verify against the checksum of the
|
||||
// original file. We should never see an exception here because lucene files are assumed to always contain the checksum
|
||||
// footer.
|
||||
assert false : new AssertionError("Saw exception for hash [" + hash + "] but expected it to be Lucene file", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the length and the checksums are the same. otherwise <code>false</code>
|
||||
*/
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.lucene.store.IOContext;
|
|||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -182,7 +183,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private static final String SNAPSHOT_INDEX_CODEC = "snapshots";
|
||||
|
||||
private static final String DATA_BLOB_PREFIX = "__";
|
||||
private static final String UPLOADED_DATA_BLOB_PREFIX = "__";
|
||||
|
||||
/**
|
||||
* Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are
|
||||
* already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and
|
||||
* {@link BlobStoreIndexShardSnapshots}. This is the case for files for which {@link StoreFileMetaData#hashEqualsContents()} is
|
||||
* {@code true}.
|
||||
*/
|
||||
private static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";
|
||||
|
||||
/**
|
||||
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
|
||||
|
@ -1529,6 +1538,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
|
||||
// directly in the shard level metadata in this case
|
||||
final boolean needsWrite = md.hashEqualsContents() == false;
|
||||
indexTotalFileCount += md.length();
|
||||
indexTotalNumberOfFiles++;
|
||||
|
||||
|
@ -1537,9 +1549,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
indexIncrementalSize += md.length();
|
||||
// create a new FileInfo
|
||||
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
|
||||
new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize());
|
||||
new BlobStoreIndexShardSnapshot.FileInfo(
|
||||
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
|
||||
md, chunkSize());
|
||||
indexCommitPointFiles.add(snapshotFileInfo);
|
||||
filesToSnapshot.add(snapshotFileInfo);
|
||||
if (needsWrite) {
|
||||
filesToSnapshot.add(snapshotFileInfo);
|
||||
}
|
||||
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
|
||||
} else {
|
||||
indexCommitPointFiles.add(existingFileInfo);
|
||||
}
|
||||
|
@ -1548,8 +1565,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
|
||||
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
|
||||
|
||||
assert indexIncrementalFileCount == filesToSnapshot.size();
|
||||
|
||||
final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
|
||||
allFilesUploadedListener.whenComplete(v -> {
|
||||
final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
|
||||
|
@ -1638,6 +1653,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) {
|
||||
try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) {
|
||||
final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];
|
||||
indexInput.readBytes(tmp, 0, tmp.length);
|
||||
assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp));
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
|
||||
RecoveryState recoveryState, ActionListener<Void> listener) {
|
||||
|
@ -1681,38 +1707,42 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
|
||||
boolean success = false;
|
||||
|
||||
try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
|
||||
@Override
|
||||
protected InputStream openSlice(long slice) throws IOException {
|
||||
return container.readBlob(fileInfo.partName(slice));
|
||||
}
|
||||
},
|
||||
restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
|
||||
try (IndexOutput indexOutput =
|
||||
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
|
||||
final byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int length;
|
||||
while ((length = stream.read(buffer)) > 0) {
|
||||
indexOutput.writeBytes(buffer, 0, length);
|
||||
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
|
||||
}
|
||||
Store.verify(indexOutput);
|
||||
indexOutput.close();
|
||||
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
|
||||
success = true;
|
||||
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
|
||||
try {
|
||||
store.markStoreCorrupted(ex);
|
||||
} catch (IOException e) {
|
||||
logger.warn("store cannot be marked as corrupted", e);
|
||||
}
|
||||
throw ex;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
store.deleteQuiet(fileInfo.physicalName());
|
||||
try (IndexOutput indexOutput =
|
||||
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
|
||||
if (fileInfo.name().startsWith(VIRTUAL_DATA_BLOB_PREFIX)) {
|
||||
final BytesRef hash = fileInfo.metadata().hash();
|
||||
indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
|
||||
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), hash.length);
|
||||
} else {
|
||||
try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
|
||||
@Override
|
||||
protected InputStream openSlice(long slice) throws IOException {
|
||||
return container.readBlob(fileInfo.partName(slice));
|
||||
}
|
||||
}, restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
|
||||
final byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int length;
|
||||
while ((length = stream.read(buffer)) > 0) {
|
||||
indexOutput.writeBytes(buffer, 0, length);
|
||||
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
|
||||
}
|
||||
}
|
||||
}
|
||||
Store.verify(indexOutput);
|
||||
indexOutput.close();
|
||||
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
|
||||
success = true;
|
||||
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
|
||||
try {
|
||||
store.markStoreCorrupted(ex);
|
||||
} catch (IOException e) {
|
||||
logger.warn("store cannot be marked as corrupted", e);
|
||||
}
|
||||
throw ex;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
store.deleteQuiet(fileInfo.physicalName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}.restore(snapshotFiles, store, l);
|
||||
|
@ -1843,7 +1873,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|| (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
|
||||
&& survivingSnapshotUUIDs.contains(
|
||||
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
|
||||
|| (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
||||
|| (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
||||
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -1897,7 +1927,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final BlobStoreIndexShardSnapshots shardSnapshots = indexShardSnapshotsFormat.read(shardContainer, Long.toString(latest));
|
||||
return new Tuple<>(shardSnapshots, latest);
|
||||
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
|
||||
|| b.startsWith(DATA_BLOB_PREFIX))) {
|
||||
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {
|
||||
throw new IllegalStateException(
|
||||
"Could not find a readable index-N file in a non-empty shard snapshot directory [" + shardContainer.path() + "]");
|
||||
}
|
||||
|
|
|
@ -1132,11 +1132,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
|
||||
SnapshotStats stats = snapshots.get(0).getStats();
|
||||
|
||||
assertThat(stats.getTotalFileCount(), is(snapshot0FileCount));
|
||||
assertThat(stats.getTotalSize(), is(snapshot0FileSize));
|
||||
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
|
||||
assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));
|
||||
|
||||
assertThat(stats.getIncrementalFileCount(), equalTo(snapshot0FileCount));
|
||||
assertThat(stats.getIncrementalSize(), equalTo(snapshot0FileSize));
|
||||
assertThat(stats.getIncrementalFileCount(), equalTo(stats.getTotalFileCount()));
|
||||
assertThat(stats.getIncrementalSize(), equalTo(stats.getTotalSize()));
|
||||
|
||||
assertThat(stats.getIncrementalFileCount(), equalTo(stats.getProcessedFileCount()));
|
||||
assertThat(stats.getIncrementalSize(), equalTo(stats.getProcessedSize()));
|
||||
|
@ -1175,8 +1175,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
ArrayList<Path> snapshotFilesDiff = new ArrayList<>(snapshot1Files);
|
||||
snapshotFilesDiff.removeAll(snapshot0Files);
|
||||
|
||||
assertThat(anotherStats.getIncrementalFileCount(), equalTo(snapshotFilesDiff.size()));
|
||||
assertThat(anotherStats.getIncrementalSize(), equalTo(calculateTotalFilesSize(snapshotFilesDiff)));
|
||||
assertThat(anotherStats.getIncrementalFileCount(), greaterThanOrEqualTo(snapshotFilesDiff.size()));
|
||||
assertThat(anotherStats.getIncrementalSize(), greaterThanOrEqualTo(calculateTotalFilesSize(snapshotFilesDiff)));
|
||||
|
||||
assertThat(anotherStats.getIncrementalFileCount(), equalTo(anotherStats.getProcessedFileCount()));
|
||||
assertThat(anotherStats.getIncrementalSize(), equalTo(anotherStats.getProcessedSize()));
|
||||
|
@ -1184,8 +1184,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
assertThat(stats.getTotalSize(), lessThan(anotherStats.getTotalSize()));
|
||||
assertThat(stats.getTotalFileCount(), lessThan(anotherStats.getTotalFileCount()));
|
||||
|
||||
assertThat(anotherStats.getTotalFileCount(), is(snapshot1FileCount));
|
||||
assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
|
||||
assertThat(anotherStats.getTotalFileCount(), greaterThanOrEqualTo(snapshot1FileCount));
|
||||
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
|
||||
}
|
||||
|
||||
public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
|
||||
|
|
|
@ -1036,7 +1036,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
final String indexName = "unrestorable-files";
|
||||
final int maxRetries = randomIntBetween(1, 10);
|
||||
|
||||
Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build();
|
||||
Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).build();
|
||||
|
||||
Settings repositorySettings = Settings.builder()
|
||||
.put("random", randomAlphaOfLength(10))
|
||||
|
|
Loading…
Reference in New Issue