Some Cleanup in BlobStoreRepository (#43323) (#44043)

* Some Cleanup in BlobStoreRepository

* Extracted from #42833:
  * Dry up index and shard path handling
  * Shorten XContent handling
This commit is contained in:
Armin Braun 2019-07-07 19:50:46 +02:00 committed by GitHub
parent 9089820d8f
commit 990ac4ca83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 50 deletions

View File

@ -308,7 +308,7 @@ public final class RepositoryData {
* Writes the snapshots metadata and the related indices metadata to x-content, omitting the * Writes the snapshots metadata and the related indices metadata to x-content, omitting the
* incompatible snapshots. * incompatible snapshots.
*/ */
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws IOException {
builder.startObject(); builder.startObject();
// write the snapshots list // write the snapshots list
builder.startArray(SNAPSHOTS); builder.startArray(SNAPSHOTS);
@ -453,14 +453,12 @@ public final class RepositoryData {
/** /**
* Writes the incompatible snapshot ids to x-content. * Writes the incompatible snapshot ids to x-content.
*/ */
public XContentBuilder incompatibleSnapshotsToXContent(final XContentBuilder builder, final ToXContent.Params params) public XContentBuilder incompatibleSnapshotsToXContent(XContentBuilder builder) throws IOException {
throws IOException {
builder.startObject(); builder.startObject();
// write the incompatible snapshots list // write the incompatible snapshots list
builder.startArray(INCOMPATIBLE_SNAPSHOTS); builder.startArray(INCOMPATIBLE_SNAPSHOTS);
for (final SnapshotId snapshot : getIncompatibleSnapshotIds()) { for (final SnapshotId snapshot : getIncompatibleSnapshotIds()) {
snapshot.toXContent(builder, params); snapshot.toXContent(builder, ToXContent.EMPTY_PARAMS);
} }
builder.endArray(); builder.endArray();
builder.endObject(); builder.endObject();

View File

@ -51,8 +51,6 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
@ -62,8 +60,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -401,10 +397,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// write the index metadata for each index in the snapshot // write the index metadata for each index in the snapshot
for (IndexId index : indices) { for (IndexId index : indices) {
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName()); indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID());
final BlobPath indexPath = basePath().add("indices").add(index.getId());
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID());
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex); throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
@ -452,7 +445,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
snapshotId, snapshotId,
ActionListener.map(listener, v -> { ActionListener.map(listener, v -> {
try { try {
blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists(
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException e) { } catch (IOException e) {
logger.warn(() -> logger.warn(() ->
@ -504,9 +497,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()));
try { try {
indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID()); indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID());
} catch (IOException ex) { } catch (IOException ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]", logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]",
snapshotId, indexId.getName()), ex); snapshotId, indexId.getName()), ex);
@ -570,8 +562,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override @Override
public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final IndexId index) throws IOException { public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final IndexId index) throws IOException {
final BlobPath indexPath = basePath().add("indices").add(index.getId()); return indexMetaDataFormat.read(indexContainer(index), snapshotId.getUUID());
return indexMetaDataFormat.read(blobStore().blobContainer(indexPath), snapshotId.getUUID()); }
private BlobPath indicesPath() {
return basePath().add("indices");
}
private BlobContainer indexContainer(IndexId indexId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()));
}
private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId.getId())));
} }
/** /**
@ -619,10 +622,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
String seed = UUIDs.randomBase64UUID(); String seed = UUIDs.randomBase64UUID();
byte[] testBytes = Strings.toUTF8Bytes(seed); byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
String blobName = "master.dat";
BytesArray bytes = new BytesArray(testBytes); BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) { try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlobAtomic(blobName, stream, bytes.length(), true); testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
} }
return seed; return seed;
} }
@ -695,7 +697,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
public static String testBlobPrefix(String seed) { private static String testBlobPrefix(String seed) {
return TESTS_FILE + seed; return TESTS_FILE + seed;
} }
@ -715,19 +717,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
"] - possibly due to simultaneous snapshot deletion requests"); "] - possibly due to simultaneous snapshot deletion requests");
} }
final long newGen = currentGen + 1; final long newGen = currentGen + 1;
final BytesReference snapshotsBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
repositoryData.snapshotsToXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
}
snapshotsBytes = bStream.bytes();
}
// write the index file // write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob, snapshotsBytes, true); writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder())), true);
// write the current generation to the index-latest file // write the current generation to the index-latest file
final BytesReference genBytes; final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) { try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@ -754,17 +747,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/ */
void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOException { void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository assert isReadOnly() == false; // can not write to a read only repository
final BytesReference bytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
repositoryData.incompatibleSnapshotsToXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
}
bytes = bStream.bytes();
}
// write the incompatible snapshots blob // write the incompatible snapshots blob
writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes, false); writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB,
BytesReference.bytes(repositoryData.incompatibleSnapshotsToXContent(XContentFactory.jsonBuilder())), false);
} }
/** /**
@ -857,9 +842,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
ShardId shardId = store.shardId(); ShardId shardId = store.shardId();
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId); final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())); final RestoreContext snapshotContext =
BlobContainer blobContainer = blobStore().blobContainer(path); new RestoreContext(shardId, snapshotId, recoveryState, shardContainer(indexId, snapshotShardId));
final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
try { try {
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
@ -935,8 +919,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) { Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId; this.snapshotId = snapshotId;
this.shardId = shardId; this.shardId = shardId;
blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()) blobContainer = shardContainer(indexId, snapshotShardId);
.add(Integer.toString(snapshotShardId.getId())));
} }
/** /**

View File

@ -22,7 +22,6 @@ package org.elasticsearch.repositories;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -61,7 +60,7 @@ public class RepositoryDataTests extends ESTestCase {
public void testXContent() throws IOException { public void testXContent() throws IOException {
RepositoryData repositoryData = generateRandomRepoData(); RepositoryData repositoryData = generateRandomRepoData();
XContentBuilder builder = JsonXContent.contentBuilder(); XContentBuilder builder = JsonXContent.contentBuilder();
repositoryData.snapshotsToXContent(builder, ToXContent.EMPTY_PARAMS); repositoryData.snapshotsToXContent(builder);
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
long gen = (long) randomIntBetween(0, 500); long gen = (long) randomIntBetween(0, 500);
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen); RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen);
@ -166,7 +165,7 @@ public class RepositoryDataTests extends ESTestCase {
final RepositoryData repositoryData = generateRandomRepoData(); final RepositoryData repositoryData = generateRandomRepoData();
XContentBuilder builder = XContentBuilder.builder(xContent); XContentBuilder builder = XContentBuilder.builder(xContent);
repositoryData.snapshotsToXContent(builder, ToXContent.EMPTY_PARAMS); repositoryData.snapshotsToXContent(builder);
RepositoryData parsedRepositoryData; RepositoryData parsedRepositoryData;
try (XContentParser xParser = createParser(builder)) { try (XContentParser xParser = createParser(builder)) {
parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId()); parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId());
@ -197,7 +196,7 @@ public class RepositoryDataTests extends ESTestCase {
indexSnapshots, new ArrayList<>(parsedRepositoryData.getIncompatibleSnapshotIds())); indexSnapshots, new ArrayList<>(parsedRepositoryData.getIncompatibleSnapshotIds()));
final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent); final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent);
corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, ToXContent.EMPTY_PARAMS); corruptedRepositoryData.snapshotsToXContent(corruptedBuilder);
try (XContentParser xParser = createParser(corruptedBuilder)) { try (XContentParser xParser = createParser(corruptedBuilder)) {
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->