Cleanup FileRestoreContext Abstractions (#48173) (#48300)

This class is only used by the blob store repository
and CCR and the abstractions didn't really make sense
with CCR ignoring the concrete `restoreFiles` method
completely and having a method used only by the blobstore
overriden as unsupported.
=> Moved to a more fitting set of abstractions
=> Dried up the stream wrapping in `BlobStoreRepository` a little
now that the `restoreFile` method could be simplified

Relates #48110 as it makes changing the API of `FileRestoreContext`
to what is needed for async restores simpler
This commit is contained in:
Armin Braun 2019-10-21 17:30:35 +02:00 committed by GitHub
parent cc0c876a8d
commit e65c60915a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 79 deletions

View File

@ -22,9 +22,13 @@ package org.elasticsearch.repositories.blobstore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
@ -36,6 +40,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
@ -1192,17 +1197,51 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final BlobContainer container = shardContainer(indexId, snapshotShardId);
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) {
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
}
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
boolean success = false;
try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
}
},
restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
try (IndexOutput indexOutput =
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
};
return restoreRateLimiter == null ? dataBlobCompositeStream
: new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc);
}
}
}.restore(snapshotFiles, store);
} catch (Exception e) {
@ -1210,6 +1249,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {
return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc);
}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
@ -1371,13 +1414,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);
InputStream inputStream = new InputStreamIndexInput(indexInput, partBytes);
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter,
snapshotRateLimitingTimeInNanos::inc);
}
// Make reads abortable by mutating the snapshotStatus object
inputStream = new FilterInputStream(inputStream) {
final InputStream inputStream = new FilterInputStream(maybeRateLimit(
new InputStreamIndexInput(indexInput, partBytes), snapshotRateLimiter, snapshotRateLimitingTimeInNanos)) {
@Override
public int read() throws IOException {
checkAborted();

View File

@ -21,11 +21,6 @@ package org.elasticsearch.repositories.blobstore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
@ -38,10 +33,8 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -63,7 +56,6 @@ public abstract class FileRestoreContext {
protected final RecoveryState recoveryState;
protected final SnapshotId snapshotId;
protected final ShardId shardId;
protected final int bufferSize;
/**
* Constructs new restore context
@ -71,15 +63,12 @@ public abstract class FileRestoreContext {
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param bufferSize buffer size for restore
*/
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState,
int bufferSize) {
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState) {
this.repositoryName = repositoryName;
this.recoveryState = recoveryState;
this.snapshotId = snapshotId;
this.shardId = shardId;
this.bufferSize = bufferSize;
}
/**
@ -194,54 +183,16 @@ public abstract class FileRestoreContext {
}
}
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
}
protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo);
/**
* Restores given list of {@link BlobStoreIndexShardSnapshot.FileInfo} to the given {@link Store}.
*
* @param filesToRecover List of files to restore
* @param store Store to restore into
*/
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException;
@SuppressWarnings("unchecked")
private static Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {
return Iterables.concat(diff.different, diff.missing);
}
/**
* Restores a file
*
* @param fileInfo file to be restored
*/
private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException {
boolean success = false;
try (InputStream stream = fileInputStream(fileInfo)) {
try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[bufferSize];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
}
}
}

View File

@ -80,7 +80,6 @@ import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionReque
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -456,7 +455,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) {
super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
super(repositoryName, shardId, SNAPSHOT_ID, recoveryState);
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
@ -571,11 +570,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
}
}
@Override
protected InputStream fileInputStream(FileInfo fileInfo) {
throw new UnsupportedOperationException();
}
@Override
public void close() {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);