Simplify CheckSumBlobStoreFormat and make it more Reusable (#59888) (#59950)

Refactored `CheckSumBlobStoreFormat` so it can more easily be reused in
other functionality (i.e. upcoming repair logic).
Simplified away constant `failIfAlreadyExists` parameter and removed the atomic
write method and its tests.
The atomic write method was only used in a single spot and that spot has now been adjusted to
work the same way writing root level metadata works.
This commit is contained in:
Armin Braun 2020-07-21 11:20:56 +02:00 committed by GitHub
parent 5b92596fad
commit cefaa17c52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 254 deletions

View File

@ -160,16 +160,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
protected volatile RepositoryMetadata metadata;
protected final NamedXContentRegistry namedXContentRegistry;
protected final ThreadPool threadPool;
private static final int BUFFER_SIZE = 4096;
public static final String SNAPSHOT_PREFIX = "snap-";
public static final String SNAPSHOT_CODEC = "snapshot";
public static final String INDEX_FILE_PREFIX = "index-";
public static final String INDEX_LATEST_BLOB = "index.latest";
@ -180,18 +176,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat";
private static final String METADATA_CODEC = "metadata";
private static final String INDEX_METADATA_CODEC = "index-metadata";
public static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat";
private static final String SNAPSHOT_INDEX_PREFIX = "index-";
private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
private static final String SNAPSHOT_INDEX_CODEC = "snapshots";
private static final String UPLOADED_DATA_BLOB_PREFIX = "__";
/**
@ -202,13 +192,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
private static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";
/**
* When set to true metadata files are stored in compressed format. This setting doesnt affect index
* files that are already compressed by default. Changing the setting does not invalidate existing files since reads
* do not observe the setting, instead they examine the file to see if it is compressed or not.
*/
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", true, Setting.Property.NodeScope);
/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
@ -235,18 +218,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric();
private ChecksumBlobStoreFormat<Metadata> globalMetadataFormat;
public static final ChecksumBlobStoreFormat<Metadata> GLOBAL_METADATA_FORMAT =
new ChecksumBlobStoreFormat<>("metadata", METADATA_NAME_FORMAT, Metadata::fromXContent);
private ChecksumBlobStoreFormat<IndexMetadata> indexMetadataFormat;
public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT =
new ChecksumBlobStoreFormat<>("index-metadata", METADATA_NAME_FORMAT, IndexMetadata::fromXContent);
protected ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;
private static final String SNAPSHOT_CODEC = "snapshot";
public static final ChecksumBlobStoreFormat<SnapshotInfo> SNAPSHOT_FORMAT =
new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, SnapshotInfo::fromXContentInternal);
public static final ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshot> INDEX_SHARD_SNAPSHOT_FORMAT =
new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot::fromXContent);
public static final ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshots> INDEX_SHARD_SNAPSHOTS_FORMAT =
new ChecksumBlobStoreFormat<>("snapshots", SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots::fromXContent);
private final boolean readOnly;
private final ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat;
private final ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshots> indexShardSnapshotsFormat;
private final Object lock = new Object();
private final SetOnce<BlobContainer> blobContainer = new SetOnce<>();
@ -257,6 +247,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private final RecoverySettings recoverySettings;
private final NamedXContentRegistry namedXContentRegistry;
/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
@ -306,10 +298,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, compress);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT,
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, compress);
}
@Override
@ -320,12 +308,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
}
globalMetadataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT,
Metadata::fromXContent, namedXContentRegistry, compress);
indexMetadataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT,
IndexMetadata::fromXContent, namedXContentRegistry, compress);
snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
SnapshotInfo::fromXContentInternal, namedXContentRegistry, compress);
}
@Override
@ -554,11 +536,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Metadata clusterMetadata) {
try {
// Write Global Metadata
globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), true);
GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress);
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetadataFormat.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true);
INDEX_METADATA_FORMAT.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), compress);
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
@ -745,7 +727,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
for (String indexMetaGeneration : indexMetaGenerations) {
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
try {
return indexMetadataFormat.read(indexContainer, indexMetaGeneration).getNumberOfShards();
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry).getNumberOfShards();
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage(
"[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
@ -817,7 +799,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}),
indexMetaGenerations.entrySet().stream().flatMap(entry -> {
final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString();
return entry.getValue().stream().map(id -> indexContainerPath + indexMetadataFormat.blobName(id));
return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id));
})
).map(absolutePath -> {
assert absolutePath.startsWith(basePath);
@ -909,10 +891,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final String foundUUID;
if (blob.startsWith(SNAPSHOT_PREFIX)) {
foundUUID = blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length());
assert snapshotFormat.blobName(foundUUID).equals(blob);
assert SNAPSHOT_FORMAT.blobName(foundUUID).equals(blob);
} else if (blob.startsWith(METADATA_PREFIX)) {
foundUUID = blob.substring(METADATA_PREFIX.length(), blob.length() - ".dat".length());
assert globalMetadataFormat.blobName(foundUUID).equals(blob);
assert GLOBAL_METADATA_FORMAT.blobName(foundUUID).equals(blob);
} else {
return false;
}
@ -936,8 +918,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream().flatMap(
snapshotId -> Stream.of(globalMetadataFormat.blobName(snapshotId.getUUID()),
snapshotFormat.blobName(snapshotId.getUUID()))).collect(Collectors.toSet());
snapshotId -> Stream.of(GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()))).collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false)
.collect(Collectors.toList());
if (blobsToLog.isEmpty() == false) {
@ -1047,7 +1029,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// Write Global MetaData
executor.execute(ActionRunnable.run(allMetaListener,
() -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false)));
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)));
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
@ -1059,19 +1041,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (metaUUID == null) {
// We don't yet have this version of the metadata so we write it
metaUUID = UUIDs.base64UUID();
indexMetadataFormat.write(indexMetaData, indexContainer(index), metaUUID, false);
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
indexMetaIdentifiers.put(identifiers, metaUUID);
}
indexMetas.put(index, identifiers);
} else {
indexMetadataFormat.write(
clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
INDEX_METADATA_FORMAT.write(
clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), compress);
}
}
));
}
executor.execute(ActionRunnable.run(allMetaListener,
() -> snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false)));
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)));
}, onUpdateFailure);
}
@ -1092,7 +1074,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
return snapshotFormat.read(blobContainer(), snapshotId.getUUID());
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
@ -1103,7 +1085,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
try {
return globalMetadataFormat.read(blobContainer(), snapshotId.getUUID());
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@ -1114,8 +1096,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
try {
return indexMetadataFormat.read(indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index));
return INDEX_METADATA_FORMAT.read(indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry);
} catch (NoSuchFileException e) {
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
}
@ -1566,9 +1548,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(
filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true);
writeAtomic(blobContainer(), indexBlob,
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@ -1577,7 +1558,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
writeAtomic(blobContainer(), INDEX_LATEST_BLOB, genBytes, false);
// Step 3: Update CS to reflect new repository generation.
clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]",
@ -1768,11 +1749,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return latest;
}
private void writeAtomic(final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException {
private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef,
boolean failIfAlreadyExists) throws IOException {
try (InputStream stream = bytesRef.streamInput()) {
logger.trace(() ->
new ParameterizedMessage("[{}] Writing [{}] to the base path atomically", metadata.name(), blobName));
blobContainer().writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
container.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
}
}
@ -1910,7 +1892,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), false);
INDEX_SHARD_SNAPSHOT_FORMAT.write(snapshot, shardContainer, snapshotId.getUUID(), compress);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
@ -1940,7 +1922,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId,
"Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
+ indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
}
if (writeShardGens == false) {
try {
@ -2209,7 +2191,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
} catch (IOException e) {
throw new RepositoryException(metadata.name(), "Failed to finalize snapshot deletion " + snapshotIds +
" with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
" with shard index [" + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
}
}
@ -2219,7 +2201,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
assert ShardGenerations.DELETED_SHARD_GEN.equals(indexGeneration) == false;
logger.trace(() -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(),
indexGeneration, shardContainer.path()));
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration);
writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress), true);
}
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
@ -2240,7 +2223,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@ -2266,7 +2249,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
}
return new Tuple<>(indexShardSnapshotsFormat.read(shardContainer, generation), generation);
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
}
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
@ -2282,7 +2265,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
throws IOException {
long latest = latestGeneration(blobs);
if (latest >= 0) {
final BlobStoreIndexShardSnapshots shardSnapshots = indexShardSnapshotsFormat.read(shardContainer, Long.toString(latest));
final BlobStoreIndexShardSnapshots shardSnapshots =
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry);
return new Tuple<>(shardSnapshots, latest);
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {

View File

@ -28,14 +28,12 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@ -50,7 +48,6 @@ import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
@ -80,28 +77,20 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
private static final int BUFFER_SIZE = 4096;
private final boolean compress;
private final String codec;
private final String blobNameFormat;
private final CheckedFunction<XContentParser, T, IOException> reader;
private final NamedXContentRegistry namedXContentRegistry;
/**
* @param codec codec name
* @param blobNameFormat format of the blobname in {@link String#format} format
* @param reader prototype object that can deserialize T from XContent
* @param compress true if the content should be compressed
*/
public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunction<XContentParser, T, IOException> reader,
NamedXContentRegistry namedXContentRegistry, boolean compress) {
public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunction<XContentParser, T, IOException> reader) {
this.reader = reader;
this.blobNameFormat = blobNameFormat;
this.namedXContentRegistry = namedXContentRegistry;
this.compress = compress;
this.codec = codec;
}
@ -112,23 +101,16 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
* @param name name to be translated into
* @return parsed blob object
*/
public T read(BlobContainer blobContainer, String name) throws IOException {
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
String blobName = blobName(name);
return readBlob(blobContainer, blobName);
return deserialize(blobName, namedXContentRegistry, Streams.readFully(blobContainer.readBlob(blobName)));
}
public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}
/**
* Reads blob with specified name without resolving the blobName using using {@link #blobName} method.
*
* @param blobContainer blob container
* @param blobName blob name
*/
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName));
public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try {
final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
@ -148,50 +130,26 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
}
}
/**
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
*
* Atomic move might be very inefficient on some repositories. It also cannot override existing files.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
*/
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length(), true);
}
});
}
/**
* Writes blob with resolving the blob name using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
* The blob will optionally by compressed.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param failIfAlreadyExists Whether to fail if the blob already exists
* @param compress whether to use compression
*/
public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException {
public void write(T obj, BlobContainer blobContainer, String name, boolean compress) throws IOException {
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
}
});
final BytesReference bytes = serialize(obj, blobName, compress);
blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false);
}
private void writeTo(final T obj, final String blobName,
final CheckedConsumer<BytesReference, IOException> consumer) throws IOException {
public BytesReference serialize(final T obj, final String blobName, final boolean compress) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
@ -199,26 +157,15 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
}
}) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(indexOutputOutputStream)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, indexOutputOutputStream);
}
}; XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE,
compress ? CompressorFactory.COMPRESSOR.streamOutput(indexOutputOutputStream) : indexOutputOutputStream)) {
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
builder.endObject();
}
CodecUtil.writeFooter(indexOutput);
}
consumer.accept(outputStream.bytes());
}
}
private void write(T obj, OutputStream streamOutput) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE, streamOutput)) {
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
builder.endObject();
return outputStream.bytes();
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -44,13 +43,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
@ -110,18 +102,15 @@ public class BlobStoreFormatTests extends ESTestCase {
public void testBlobStoreOperations() throws IOException {
BlobStore blobStore = createTestBlobStore();
BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), false);
ChecksumBlobStoreFormat<BlobObj> checksumSMILECompressed = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), true);
ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
// Write blobs in different formats
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", true);
checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true);
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false);
checksumSMILE.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true);
// Assert that all checksum blobs can be read by all formats
assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile");
assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp").getText(), "checksum smile compressed");
// Assert that all checksum blobs can be read
assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText(), "checksum smile");
assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText(), "checksum smile compressed");
}
public void testCompressionIsApplied() throws IOException {
@ -131,13 +120,10 @@ public class BlobStoreFormatTests extends ESTestCase {
for (int i = 0; i < randomIntBetween(100, 300); i++) {
veryRedundantText.append("Blah ");
}
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), false);
ChecksumBlobStoreFormat<BlobObj> checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), true);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
BlobObj blobObj = new BlobObj(veryRedundantText.toString());
checksumFormatComp.write(blobObj, blobContainer, "blob-comp", true);
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", true);
checksumFormat.write(blobObj, blobContainer, "blob-comp", true);
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false);
Map<String, BlobMetadata> blobs = blobContainer.listBlobsByPrefix("blob-");
assertEquals(blobs.size(), 2);
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
@ -148,13 +134,12 @@ public class BlobStoreFormatTests extends ESTestCase {
BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
String testString = randomAlphaOfLength(randomInt(10000));
BlobObj blobObj = new BlobObj(testString);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), randomBoolean());
checksumFormat.write(blobObj, blobContainer, "test-path", true);
assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean());
assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString);
randomCorruption(blobContainer, "test-path");
try {
checksumFormat.read(blobContainer, "test-path");
checksumFormat.read(blobContainer, "test-path", xContentRegistry());
fail("Should have failed due to corruption");
} catch (ElasticsearchCorruptionException ex) {
assertThat(ex.getMessage(), containsString("test-path"));
@ -163,79 +148,6 @@ public class BlobStoreFormatTests extends ESTestCase {
}
}
public void testAtomicWrite() throws Exception {
final BlobStore blobStore = createTestBlobStore();
final BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
String testString = randomAlphaOfLength(randomInt(10000));
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch unblock = new CountDownLatch(1);
final BlobObj blobObj = new BlobObj(testString) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.toXContent(builder, params);
// Block before finishing writing
try {
block.countDown();
unblock.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return builder;
}
};
final ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), randomBoolean());
ExecutorService threadPool = Executors.newFixedThreadPool(1);
try {
Future<Void> future = threadPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
checksumFormat.writeAtomic(blobObj, blobContainer, "test-blob");
return null;
}
});
// signalling
block.await(5, TimeUnit.SECONDS);
assertFalse(blobContainer.blobExists("test-blob"));
unblock.countDown();
future.get();
assertTrue(blobContainer.blobExists("test-blob"));
} finally {
threadPool.shutdown();
}
}
public void testAtomicWriteFailures() throws Exception {
final String name = randomAlphaOfLength(10);
final BlobObj blobObj = new BlobObj("test");
final ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), randomBoolean());
final BlobStore blobStore = createTestBlobStore();
final BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
{
IOException writeBlobException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new FilterBlobContainer(blobContainer) {
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName);
}
@Override
protected BlobContainer wrapChild(BlobContainer child) {
return child;
}
};
checksumFormat.writeAtomic(blobObj, wrapper, name);
});
assertEquals("Exception thrown in writeBlobAtomic() for " + name, writeBlobException.getMessage());
assertEquals(0, writeBlobException.getSuppressed().length);
}
}
protected BlobStore createTestBlobStore() throws IOException {
return new FsBlobStore(Settings.EMPTY, createTempDir(), false);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.snapshots.mockstore;
import org.apache.lucene.codecs.CodecUtil;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -31,10 +30,7 @@ import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotInfo;
@ -164,7 +160,7 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
private class MockBlobStore implements BlobStore {
private AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public BlobContainer blobContainer(BlobPath path) {
@ -346,15 +342,12 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
if (hasConsistentContent) {
if (basePath().buildAsString().equals(path().buildAsString())) {
try {
// TODO: dry up the logic for reading SnapshotInfo here against the code in ChecksumBlobStoreFormat
final int offset = CodecUtil.headerLength(BlobStoreRepository.SNAPSHOT_CODEC);
final SnapshotInfo updatedInfo = SnapshotInfo.fromXContentInternal(
XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
new BytesArray(data, offset, data.length - offset - CodecUtil.footerLength()),
XContentType.SMILE));
final SnapshotInfo updatedInfo = BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(
blobName, namedXContentRegistry, new BytesArray(data));
// If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not
// a problem and could be the result of a correctly handled master failover.
final SnapshotInfo existingInfo = snapshotFormat.readBlob(this, blobName);
final SnapshotInfo existingInfo = SNAPSHOT_FORMAT.deserialize(
blobName, namedXContentRegistry, Streams.readFully(readBlob(blobName)));
assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId()));
assertThat(existingInfo.reason(), equalTo(updatedInfo.reason()));
assertThat(existingInfo.state(), equalTo(updatedInfo.state()));

View File

@ -118,7 +118,7 @@ public final class BlobStoreTestUtil {
}
assertIndexGenerations(blobContainer, latestGen);
final RepositoryData repositoryData;
try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
try (InputStream blob = blobContainer.readBlob(BlobStoreRepository.INDEX_FILE_PREFIX + latestGen);
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, blob)) {
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false);
@ -138,8 +138,8 @@ public final class BlobStoreTestUtil {
}
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
final long[] indexGenerations = repoRoot.listBlobsByPrefix("index-").keySet().stream()
.map(s -> s.replace("index-", ""))
final long[] indexGenerations = repoRoot.listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX).keySet().stream()
.map(s -> s.replace(BlobStoreRepository.INDEX_FILE_PREFIX, ""))
.mapToLong(Long::parseLong).sorted().toArray();
assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]);
assertTrue(indexGenerations.length <= 2);
@ -200,7 +200,7 @@ public final class BlobStoreTestUtil {
final BlobContainer repoRoot = repository.blobContainer();
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
final List<String> expectedSnapshotUUIDs = snapshotIds.stream().map(SnapshotId::getUUID).collect(Collectors.toList());
for (String prefix : new String[]{"snap-", "meta-"}) {
for (String prefix : new String[]{BlobStoreRepository.SNAPSHOT_PREFIX, BlobStoreRepository.METADATA_PREFIX}) {
final Collection<String> foundSnapshotUUIDs = repoRoot.listBlobs().keySet().stream().filter(p -> p.startsWith(prefix))
.map(p -> p.replace(prefix, "").replace(".dat", ""))
.collect(Collectors.toSet());

View File

@ -1315,11 +1315,14 @@ public abstract class ESTestCase extends LuceneTestCase {
return xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, data.streamInput());
}
private static final NamedXContentRegistry DEFAULT_NAMED_X_CONTENT_REGISTRY =
new NamedXContentRegistry(ClusterModule.getNamedXWriteables());
/**
* The {@link NamedXContentRegistry} to use for this test. Subclasses should override and use liberally.
*/
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(ClusterModule.getNamedXWriteables());
return DEFAULT_NAMED_X_CONTENT_REGISTRY;
}
/**