Use snapshot information to build searchable snapshot store MetadataSnapshot (#56289) (#56403)

While investigating possible optimizations to speed up searchable
snapshots shard restores, we noticed that Elasticsearch builds the
list of shard files on local disk in order to compare it with the list of
files contained in the snapshot to restore. This list of files is
materialized with a MetadataSnapshot object whose construction
involves to read the footer checksum of every files of the shard
using Store.checksumFromLuceneFile() method.

Further investigation shows that a MetadataSnapshot object is
also created for other types of operations like building the list of
files to recover in a peer recovery (and primary shard relocation)
or in order to assign a shard to a node. These operations use the
Store.getMetadata(IndexCommit) method to build the list of files
and checksums.

In the case of searchable snapshots building the MetadataSnapshot
object can potentially trigger cache misses, which in turn can
cause the download and the writing in cache of the last range of
the file in order to check the 16 bytes footer. This in turn can
cause more evictions.

Since searchable snapshots already contains the footer information
of every file in BlobStoreIndexShardSnapshot it can directly read the
checksum from it and avoid to use the cache at all to create a
MetadataSnapshot for the operations mentioned above.

This commit adds a shortcut to the
SearchableSnapshotDirectory.openInput() method - similarly to what
already exists for segment infos - so that it creates a specific
IndexInput for checksum reading operation.
This commit is contained in:
Tanguy Leroux 2020-05-08 14:16:19 +02:00 committed by GitHub
parent 60b1c67409
commit 8e9b69bfd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 334 additions and 11 deletions

View File

@ -144,6 +144,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);
/**
* Specific {@link IOContext} used to verify Lucene files footer checksums.
* See {@link MetadataSnapshot#checksumFromLuceneFile(Directory, String, Map, Logger, Version, boolean)}
*/
public static final IOContext READONCE_CHECKSUM = new IOContext(IOContext.READONCE.context);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final StoreDirectory directory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
@ -875,7 +881,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
Logger logger, Version version, boolean readFileAsHash) throws IOException {
final String checksum;
final BytesRefBuilder fileHash = new BytesRefBuilder();
try (IndexInput in = directory.openInput(file, IOContext.READONCE)) {
try (IndexInput in = directory.openInput(file, READONCE_CHECKSUM)) {
final long length;
try {
length = in.length();

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.cache.CacheFile;
import org.elasticsearch.index.store.cache.CacheKey;
import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput;
import org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput;
import org.elasticsearch.index.store.direct.DirectBlobContainerIndexInput;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
@ -323,6 +324,9 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
final BytesRef content = fileInfo.metadata().hash();
return new ByteArrayIndexInput("ByteArrayIndexInput(" + name + ')', content.bytes, content.offset, content.length);
}
if (context == Store.READONCE_CHECKSUM) {
return ChecksumBlobContainerIndexInput.create(fileInfo.physicalName(), fileInfo.length(), fileInfo.checksum(), context);
}
final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
if (useCache && isExcludedFromCache(name) == false) {

View File

@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.store.checksum;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.index.store.Store;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
/**
* A {@link IndexInput} that can only be used to verify footer checksums.
*/
public class ChecksumBlobContainerIndexInput extends IndexInput {
private final byte[] checksum;
private final long length;
private long position;
private ChecksumBlobContainerIndexInput(String name, long length, byte[] checksum, IOContext context) {
super("ChecksumBlobContainerIndexInput(" + name + ')');
ensureReadOnceChecksumContext(context);
assert checksum.length == CodecUtil.footerLength();
this.checksum = Objects.requireNonNull(checksum);
assert length >= this.checksum.length;
this.length = length;
this.position = 0L;
}
@Override
public long length() {
return length;
}
@Override
public long getFilePointer() {
return position;
}
@Override
public byte readByte() throws IOException {
if (getFilePointer() >= length()) {
throw new EOFException("seek past EOF");
}
return checksum[checksumPositionOrThrow(position++)];
}
@Override
public void readBytes(final byte[] b, final int off, int len) throws IOException {
if (getFilePointer() + len > length()) {
throw new EOFException("seek past EOF");
}
System.arraycopy(checksum, checksumPositionOrThrow(position), b, off, len);
position += len;
}
@Override
public void seek(long pos) throws IOException {
if (pos < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + pos);
} else if (pos > length()) {
throw new EOFException("seek past EOF");
}
checksumPositionOrThrow(pos);
position = pos;
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) {
assert false : "unexpected slicing (" + sliceDescription + ") for " + this;
throw new UnsupportedOperationException();
}
@Override
public IndexInput clone() {
assert false : "unexpected cloning for " + this;
throw new UnsupportedOperationException();
}
@Override
public void close() {}
@Override
public String toString() {
return "ChecksumBlobContainerIndexInput{"
+ "checksum="
+ Arrays.toString(checksum)
+ ", length="
+ length
+ ", position="
+ position
+ '}';
}
private int checksumPositionOrThrow(long pos) {
final long checksumPosition = length - checksum.length;
if (pos < checksumPosition) {
assert false : "unexpected read or seek at position [" + pos + "] but checksum starts at [" + checksumPosition + ']';
throw new IllegalArgumentException("Can't read or seek before footer checksum");
}
return Math.toIntExact(checksum.length - (length - pos));
}
private static void ensureReadOnceChecksumContext(IOContext context) {
if (context != Store.READONCE_CHECKSUM) {
assert false : "expected READONCE_CHECKSUM but got " + context;
throw new IllegalArgumentException("ChecksumBlobContainerIndexInput should only be used with READONCE_CHECKSUM context");
}
}
/**
* Creates a {@link ChecksumBlobContainerIndexInput} that can be used to verify a Lucene file's footer checksum without opening the
* file on disk. The checksum verification should be executed using {@link CodecUtil#retrieveChecksum(IndexInput)}.
*
* @param name the physical name of the file
* @param length the total length of the file
* @param checksum the footer checksum provided as a {@link String}
* @return a {@link ChecksumBlobContainerIndexInput}
* @throws IOException if something goes wrong when creating the {@link ChecksumBlobContainerIndexInput}
*/
public static ChecksumBlobContainerIndexInput create(String name, long length, String checksum, IOContext context) throws IOException {
final ByteBuffersDataOutput out = new ByteBuffersDataOutput();
try (IndexOutput output = new ByteBuffersIndexOutput(out, "tmp", name)) {
// reverse CodecUtil.writeFooter()
output.writeInt(CodecUtil.FOOTER_MAGIC);
output.writeInt(0);
output.writeLong(Long.parseLong(checksum, Character.MAX_RADIX));
output.close();
return new ChecksumBlobContainerIndexInput(name, length, out.toArrayCopy(), context);
}
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.index.store;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
@ -14,6 +15,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
@ -47,6 +49,7 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -59,6 +62,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -99,8 +103,10 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class SearchableSnapshotDirectoryTests extends ESTestCase {
@ -309,6 +315,108 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
});
}
public void testChecksumBlobContainerIndexInput() throws Exception {
testDirectories(
randomBoolean(),
false, // no prewarming in this test because we want to ensure that files are accessed on purpose
(directory, snapshotDirectory) -> {
for (String fileName : randomSubsetOf(Arrays.asList(snapshotDirectory.listAll()))) {
final long checksum;
try (IndexInput input = directory.openInput(fileName, Store.READONCE_CHECKSUM)) {
checksum = CodecUtil.checksumEntireFile(input);
}
final long snapshotChecksum;
try (IndexInput input = snapshotDirectory.openInput(fileName, Store.READONCE_CHECKSUM)) {
snapshotChecksum = CodecUtil.retrieveChecksum(input);
assertThat(
input,
"si".equals(IndexFileNames.getExtension(fileName)) || fileName.startsWith(IndexFileNames.SEGMENTS)
? instanceOf(ByteArrayIndexInput.class)
: instanceOf(ChecksumBlobContainerIndexInput.class)
);
}
assertThat(
"Expected checksum [" + checksum + "] but got [" + snapshotChecksum + ']',
snapshotChecksum,
equalTo(checksum)
);
assertThat(
"File [" + fileName + "] should have been read from heap",
snapshotDirectory.getStats(fileName),
nullValue()
);
}
}
);
}
public void testMetadataSnapshotsDoesNotAccessFilesOnDisk() throws Exception {
final ShardId shardId = new ShardId("_name", "_id", 0);
final IndexSettings indexSettings = newIndexSettings();
// sometimes load store's MetadataSnapshot using an IndexCommit
final boolean useIndexCommit = randomBoolean();
logger.info("--> loading Store.MetadataSnapshot using index commit is [{}]", useIndexCommit);
final CheckedFunction<Store, Store.MetadataSnapshot, IOException> loader = store -> {
if (useIndexCommit) {
return store.getMetadata(Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory()));
} else {
return store.getMetadata(null, true);
}
};
testDirectories(
randomBoolean(),
false, // no prewarming in this test because we want to ensure that files are accessed on purpose
((directory, snapshotDirectory) -> {
final Store.MetadataSnapshot metadata;
try (Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId))) {
metadata = loader.apply(store);
assertNotNull(metadata);
}
final Store.MetadataSnapshot snapshotMetadata;
try (Store store = new Store(shardId, indexSettings, snapshotDirectory, new DummyShardLock(shardId))) {
assertTrue("No files should have been read yet", snapshotDirectory.getStats().isEmpty());
snapshotMetadata = store.getMetadata(null);
assertTrue("No files should have been read to compute MetadataSnapshot", snapshotDirectory.getStats().isEmpty());
assertNotNull(snapshotMetadata);
}
final Store.RecoveryDiff diff = randomBoolean()
? metadata.recoveryDiff(snapshotMetadata)
: snapshotMetadata.recoveryDiff(metadata);
assertThat(
"List of different files should be empty but got [" + metadata.asMap() + "] and [" + snapshotMetadata.asMap() + ']',
diff.different.isEmpty(),
is(true)
);
assertThat(
"List of missing files should be empty but got [" + metadata.asMap() + "] and [" + snapshotMetadata.asMap() + ']',
diff.missing.isEmpty(),
is(true)
);
assertThat(
"List of files should be identical [" + metadata.asMap() + "] and [" + snapshotMetadata.asMap() + ']',
diff.identical.size(),
equalTo(metadata.size())
);
assertThat("Number of files should be identical", snapshotMetadata.size(), equalTo(metadata.size()));
for (StoreFileMetadata storeFileMetadata : metadata) {
final StoreFileMetadata snapshotFileMetadata = snapshotMetadata.get(storeFileMetadata.name());
assertTrue(
storeFileMetadata + " should be identical but got [" + snapshotFileMetadata + ']',
storeFileMetadata.isSame(snapshotFileMetadata)
);
}
})
);
}
/**
* This method :
* - sets up a default {@link Directory} and index random documents
@ -316,14 +424,16 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
* - creates a {@link SearchableSnapshotDirectory} instance based on the snapshotted files
* - consumes the default and the searchable snapshot directories using the {@link CheckedBiConsumer}.
*/
private void testDirectories(final CheckedBiConsumer<Directory, Directory, Exception> consumer) throws Exception {
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
"_index",
Settings.builder()
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
.put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build()
);
private void testDirectories(final CheckedBiConsumer<Directory, SearchableSnapshotDirectory, Exception> consumer) throws Exception {
testDirectories(randomBoolean(), randomBoolean(), consumer);
}
private void testDirectories(
final boolean enableCache,
final boolean prewarmCache,
final CheckedBiConsumer<Directory, SearchableSnapshotDirectory, Exception> consumer
) throws Exception {
final IndexSettings indexSettings = newIndexSettings();
final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 10));
final List<Releasable> releasables = new ArrayList<>();
@ -443,8 +553,8 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
indexId,
shardId,
Settings.builder()
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), randomBoolean())
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean())
.put(SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), enableCache)
.put(SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), prewarmCache)
.build(),
() -> 0L,
cacheService,
@ -607,4 +717,14 @@ public class SearchableSnapshotDirectoryTests extends ESTestCase {
assertThat("Number of files (" + files.size() + ") mismatch, got : " + files.keySet(), files.size(), matchNumberOfFiles);
assertThat("Sum of file sizes mismatch, got: " + files, files.values().stream().mapToLong(Long::longValue).sum(), matchSizeOfFiles);
}
private static IndexSettings newIndexSettings() {
return IndexSettingsModule.newIndexSettings(
"_index",
Settings.builder()
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
.put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build()
);
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.store.checksum;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.ESTestCase;
import java.io.EOFException;
import static org.hamcrest.Matchers.equalTo;
public class ChecksumBlobContainerIndexInputTests extends ESTestCase {
public void testChecksumBlobContainerIndexInput() throws Exception {
final ByteBuffersDataOutput out = new ByteBuffersDataOutput();
try (IndexOutput output = new ByteBuffersIndexOutput(out, "tmp", getTestName())) {
CodecUtil.writeHeader(output, getTestName(), 5);
output.writeString(randomRealisticUnicodeOfLengthBetween(1, 50));
CodecUtil.writeFooter(output);
}
final byte[] bytes = out.toArrayCopy();
final long checksum = CodecUtil.checksumEntireFile(new ByteArrayIndexInput(getTestName(), bytes));
final ChecksumBlobContainerIndexInput indexInput = ChecksumBlobContainerIndexInput.create(
getTestName(),
bytes.length,
Store.digestToString(checksum),
Store.READONCE_CHECKSUM
);
assertThat(indexInput.length(), equalTo((long) bytes.length));
assertThat(indexInput.getFilePointer(), equalTo(0L));
assertThat(CodecUtil.retrieveChecksum(indexInput), equalTo(checksum));
assertThat(indexInput.getFilePointer(), equalTo((long) bytes.length));
expectThrows(EOFException.class, () -> indexInput.readByte());
expectThrows(EOFException.class, () -> indexInput.readBytes(new byte[0], 0, 1));
expectThrows(EOFException.class, () -> indexInput.seek(bytes.length + 1));
}
}