* Add eventually consistent mock repository for reproducing and testing AWS S3 blob store behavior * Relates #38941
This commit is contained in:
parent
8445c41004
commit
3b5038b837
|
@ -166,13 +166,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private static final int BUFFER_SIZE = 4096;
|
||||
|
||||
private static final String SNAPSHOT_PREFIX = "snap-";
|
||||
public static final String SNAPSHOT_PREFIX = "snap-";
|
||||
|
||||
private static final String SNAPSHOT_CODEC = "snapshot";
|
||||
public static final String SNAPSHOT_CODEC = "snapshot";
|
||||
|
||||
private static final String INDEX_FILE_PREFIX = "index-";
|
||||
|
||||
private static final String INDEX_LATEST_BLOB = "index.latest";
|
||||
public static final String INDEX_LATEST_BLOB = "index.latest";
|
||||
|
||||
private static final String TESTS_FILE = "tests-";
|
||||
|
||||
|
@ -210,7 +210,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private ChecksumBlobStoreFormat<IndexMetaData> indexMetaDataFormat;
|
||||
|
||||
private ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;
|
||||
protected ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;
|
||||
|
||||
private final boolean readOnly;
|
||||
|
||||
|
|
|
@ -108,6 +108,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimary
|
|||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
|
@ -152,6 +153,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.fetch.FetchPhase;
|
||||
import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.disruption.DisruptableMockTransport;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
|
@ -203,9 +205,18 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
|
||||
private Path tempDir;
|
||||
|
||||
/**
|
||||
* Context shared by all the node's {@link Repository} instances if the eventually consistent blobstore is to be used.
|
||||
* {@code null} if not using the eventually consistent blobstore.
|
||||
*/
|
||||
@Nullable private MockEventuallyConsistentRepository.Context blobStoreContext;
|
||||
|
||||
@Before
|
||||
public void createServices() {
|
||||
tempDir = createTempDir();
|
||||
if (randomBoolean()) {
|
||||
blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
}
|
||||
deterministicTaskQueue =
|
||||
new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random());
|
||||
}
|
||||
|
@ -213,6 +224,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
@After
|
||||
public void verifyReposThenStopServices() {
|
||||
try {
|
||||
if (blobStoreContext != null) {
|
||||
blobStoreContext.forceConsistent();
|
||||
}
|
||||
BlobStoreTestUtil.assertConsistency(
|
||||
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
|
||||
Runnable::run);
|
||||
|
@ -900,19 +914,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
|
||||
repositoriesService = new RepositoriesService(
|
||||
settings, clusterService, transportService,
|
||||
Collections.singletonMap(FsRepository.TYPE, metaData -> {
|
||||
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
|
||||
@Override
|
||||
protected void assertSnapshotOrGenericThread() {
|
||||
// eliminate thread name check as we create repo in the test thread
|
||||
}
|
||||
};
|
||||
repository.start();
|
||||
return repository;
|
||||
}
|
||||
),
|
||||
emptyMap(),
|
||||
threadPool
|
||||
Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool
|
||||
);
|
||||
snapshotsService =
|
||||
new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
|
||||
|
@ -1093,6 +1095,28 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
|
||||
}
|
||||
|
||||
private Repository.Factory getRepoFactory(Environment environment) {
|
||||
// Run half the tests with the eventually consistent repository
|
||||
if (blobStoreContext == null) {
|
||||
return metaData -> {
|
||||
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
|
||||
@Override
|
||||
protected void assertSnapshotOrGenericThread() {
|
||||
// eliminate thread name check as we create repo in the test thread
|
||||
}
|
||||
};
|
||||
repository.start();
|
||||
return repository;
|
||||
};
|
||||
} else {
|
||||
return metaData -> {
|
||||
final Repository repository = new MockEventuallyConsistentRepository(
|
||||
metaData, environment, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
|
||||
repository.start();
|
||||
return repository;
|
||||
};
|
||||
}
|
||||
}
|
||||
public void restart() {
|
||||
testClusterNodes.disconnectNode(this);
|
||||
final ClusterState oldState = this.clusterService.state();
|
||||
|
|
|
@ -0,0 +1,338 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.snapshots.mockstore;
|
||||
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* Mock Repository that allows testing the eventually consistent behaviour of AWS S3 as documented in the
|
||||
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel">AWS S3 docs</a>.
|
||||
* Currently, the repository asserts that no inconsistent reads are made.
|
||||
* TODO: Resolve todos on list and overwrite operation consistency to fully cover S3's behavior.
|
||||
*/
|
||||
public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
||||
|
||||
private final Context context;
|
||||
|
||||
private final NamedXContentRegistry namedXContentRegistry;
|
||||
|
||||
public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment,
|
||||
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool, Context context) {
|
||||
super(metadata, environment.settings(), false, namedXContentRegistry, threadPool);
|
||||
this.context = context;
|
||||
this.namedXContentRegistry = namedXContentRegistry;
|
||||
}
|
||||
|
||||
// Filters out all actions that are super-seeded by subsequent actions
|
||||
// TODO: Remove all usages of this method, snapshots should not depend on consistent list operations
|
||||
private static List<BlobStoreAction> consistentView(List<BlobStoreAction> actions) {
|
||||
final Map<String, BlobStoreAction> lastActions = new HashMap<>();
|
||||
for (BlobStoreAction action : actions) {
|
||||
if (action.operation == Operation.PUT) {
|
||||
lastActions.put(action.path, action);
|
||||
} else if (action.operation == Operation.DELETE) {
|
||||
lastActions.remove(action.path);
|
||||
}
|
||||
}
|
||||
return new ArrayList<>(lastActions.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertSnapshotOrGenericThread() {
|
||||
// eliminate thread name check as we create repo in the test thread
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BlobStore createBlobStore() {
|
||||
return new MockBlobStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlobPath basePath() {
|
||||
return BlobPath.cleanPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* Context that must be shared between all instances of {@link MockEventuallyConsistentRepository} in a test run.
|
||||
*/
|
||||
public static final class Context {
|
||||
|
||||
private final List<BlobStoreAction> actions = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Force the repository into a consistent end state so that its eventual state can be examined.
|
||||
*/
|
||||
public void forceConsistent() {
|
||||
synchronized (actions) {
|
||||
final List<BlobStoreAction> consistentActions = consistentView(actions);
|
||||
actions.clear();
|
||||
actions.addAll(consistentActions);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private enum Operation {
|
||||
PUT, GET, DELETE
|
||||
}
|
||||
|
||||
private static final class BlobStoreAction {
|
||||
|
||||
private final Operation operation;
|
||||
|
||||
@Nullable
|
||||
private final byte[] data;
|
||||
|
||||
private final String path;
|
||||
|
||||
private BlobStoreAction(Operation operation, String path, byte[] data) {
|
||||
this.operation = operation;
|
||||
this.path = path;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
private BlobStoreAction(Operation operation, String path) {
|
||||
this(operation, path, null);
|
||||
}
|
||||
}
|
||||
|
||||
private class MockBlobStore implements BlobStore {
|
||||
|
||||
private AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public BlobContainer blobContainer(BlobPath path) {
|
||||
return new MockBlobContainer(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closed.set(true);
|
||||
}
|
||||
|
||||
private void ensureNotClosed() {
|
||||
if (closed.get()) {
|
||||
throw new AssertionError("Blobstore is closed already");
|
||||
}
|
||||
}
|
||||
|
||||
private class MockBlobContainer implements BlobContainer {
|
||||
|
||||
private final BlobPath path;
|
||||
|
||||
MockBlobContainer(BlobPath path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlobPath path() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String name) throws NoSuchFileException {
|
||||
ensureNotClosed();
|
||||
final String blobPath = path.buildAsString() + name;
|
||||
synchronized (context.actions) {
|
||||
final List<BlobStoreAction> relevantActions = relevantActions(blobPath);
|
||||
context.actions.add(new BlobStoreAction(Operation.GET, blobPath));
|
||||
if (relevantActions.stream().noneMatch(a -> a.operation == Operation.PUT)) {
|
||||
throw new NoSuchFileException(blobPath);
|
||||
}
|
||||
if (relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT) {
|
||||
// Consistent read after write
|
||||
return new ByteArrayInputStream(relevantActions.get(0).data);
|
||||
}
|
||||
throw new AssertionError("Inconsistent read on [" + blobPath + ']');
|
||||
}
|
||||
}
|
||||
|
||||
private List<BlobStoreAction> relevantActions(String blobPath) {
|
||||
assert Thread.holdsLock(context.actions);
|
||||
final List<BlobStoreAction> relevantActions = new ArrayList<>(
|
||||
context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList()));
|
||||
for (int i = relevantActions.size() - 1; i > 0; i--) {
|
||||
if (relevantActions.get(i).operation == Operation.GET) {
|
||||
relevantActions.remove(i);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return relevantActions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String blobName) {
|
||||
ensureNotClosed();
|
||||
synchronized (context.actions) {
|
||||
context.actions.add(new BlobStoreAction(Operation.DELETE, path.buildAsString() + blobName));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete() {
|
||||
ensureNotClosed();
|
||||
final String thisPath = path.buildAsString();
|
||||
synchronized (context.actions) {
|
||||
consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath))
|
||||
.forEach(a -> context.actions.add(new BlobStoreAction(Operation.DELETE, a.path)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobs() {
|
||||
ensureNotClosed();
|
||||
final String thisPath = path.buildAsString();
|
||||
synchronized (context.actions) {
|
||||
return consistentView(context.actions).stream()
|
||||
.filter(
|
||||
action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1
|
||||
&& action.operation == Operation.PUT)
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
action -> action.path.substring(thisPath.length()),
|
||||
action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() {
|
||||
ensureNotClosed();
|
||||
final String thisPath = path.buildAsString();
|
||||
synchronized (context.actions) {
|
||||
return consistentView(context.actions).stream()
|
||||
.filter(action ->
|
||||
action.operation == Operation.PUT
|
||||
&& action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') != -1)
|
||||
.map(action -> action.path.substring(thisPath.length()).split("/")[0])
|
||||
.distinct()
|
||||
.collect(Collectors.toMap(Function.identity(), name -> new MockBlobContainer(path.add(name))));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) {
|
||||
return
|
||||
listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect(
|
||||
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
||||
throws IOException {
|
||||
ensureNotClosed();
|
||||
assert blobSize < Integer.MAX_VALUE;
|
||||
final byte[] data = new byte[(int) blobSize];
|
||||
final int read = inputStream.read(data);
|
||||
assert read == data.length;
|
||||
final String blobPath = path.buildAsString() + blobName;
|
||||
synchronized (context.actions) {
|
||||
final List<BlobStoreAction> relevantActions = relevantActions(blobPath);
|
||||
// We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent.
|
||||
final boolean hasConsistentContent =
|
||||
relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT;
|
||||
if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
|
||||
// TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that
|
||||
// it never decrements here
|
||||
} else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) {
|
||||
if (hasConsistentContent) {
|
||||
if (basePath().buildAsString().equals(path().buildAsString())) {
|
||||
try {
|
||||
// TODO: dry up the logic for reading SnapshotInfo here against the code in ChecksumBlobStoreFormat
|
||||
final int offset = CodecUtil.headerLength(BlobStoreRepository.SNAPSHOT_CODEC);
|
||||
final SnapshotInfo updatedInfo = SnapshotInfo.fromXContentInternal(
|
||||
XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
|
||||
new BytesArray(data, offset, data.length - offset - CodecUtil.footerLength()),
|
||||
XContentType.SMILE));
|
||||
// If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not
|
||||
// a problem and could be the result of a correctly handled master failover.
|
||||
final SnapshotInfo existingInfo = snapshotFormat.readBlob(this, blobName);
|
||||
assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId()));
|
||||
assertThat(existingInfo.reason(), equalTo(updatedInfo.reason()));
|
||||
assertThat(existingInfo.state(), equalTo(updatedInfo.state()));
|
||||
assertThat(existingInfo.totalShards(), equalTo(updatedInfo.totalShards()));
|
||||
assertThat(existingInfo.successfulShards(), equalTo(updatedInfo.successfulShards()));
|
||||
assertThat(
|
||||
existingInfo.shardFailures(), containsInAnyOrder(updatedInfo.shardFailures().toArray()));
|
||||
assertThat(existingInfo.indices(), equalTo(updatedInfo.indices()));
|
||||
return; // No need to add a write for this since we didn't change content
|
||||
} catch (Exception e) {
|
||||
// Rethrow as AssertionError here since kind exception might otherwise be swallowed and logged by
|
||||
// the blob store repository.
|
||||
// Since we are not doing any actual IO we don't expect this to throw ever and an exception would
|
||||
// signal broken SnapshotInfo bytes or unexpected behavior of SnapshotInfo otherwise.
|
||||
throw new AssertionError("Failed to deserialize SnapshotInfo", e);
|
||||
}
|
||||
} else {
|
||||
// Primaries never retry so any shard level snap- blob retry/overwrite even with the same content is
|
||||
// not expected.
|
||||
throw new AssertionError("Shard level snap-{uuid} blobs should never be overwritten");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (hasConsistentContent) {
|
||||
ESTestCase.assertArrayEquals("Tried to overwrite blob [" + blobName + "]", relevantActions.get(0).data, data);
|
||||
return; // No need to add a write for this since we didn't change content
|
||||
}
|
||||
}
|
||||
context.actions.add(new BlobStoreAction(Operation.PUT, blobPath, data));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize,
|
||||
final boolean failIfAlreadyExists) throws IOException {
|
||||
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.snapshots.mockstore;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
||||
|
||||
private Environment environment;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
final Path tempDir = createTempDir();
|
||||
final String nodeName = "testNode";
|
||||
environment = TestEnvironment.newEnvironment(Settings.builder()
|
||||
.put(NODE_NAME_SETTING.getKey(), nodeName)
|
||||
.put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath())
|
||||
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
|
||||
.build());
|
||||
}
|
||||
|
||||
public void testReadAfterWriteConsistently() throws IOException {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
repository.start();
|
||||
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
final int lengthWritten = randomIntBetween(1, 100);
|
||||
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
|
||||
blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true);
|
||||
try (InputStream in = blobContainer.readBlob(blobName)) {
|
||||
final byte[] readBytes = new byte[lengthWritten + 1];
|
||||
final int lengthSeen = in.read(readBytes);
|
||||
assertThat(lengthSeen, equalTo(lengthWritten));
|
||||
assertArrayEquals(blobData, Arrays.copyOf(readBytes, lengthWritten));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadAfterWriteAfterReadThrows() throws IOException {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
repository.start();
|
||||
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
final int lengthWritten = randomIntBetween(1, 100);
|
||||
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
|
||||
expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob(blobName));
|
||||
blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true);
|
||||
assertThrowsOnInconsistentRead(blobContainer, blobName);
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadAfterDeleteAfterWriteThrows() throws IOException {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
repository.start();
|
||||
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
final int lengthWritten = randomIntBetween(1, 100);
|
||||
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
|
||||
blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true);
|
||||
blobContainer.deleteBlob(blobName);
|
||||
assertThrowsOnInconsistentRead(blobContainer, blobName);
|
||||
blobStoreContext.forceConsistent();
|
||||
expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob(blobName));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOverwriteRandomBlobFails() throws IOException {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
repository.start();
|
||||
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
|
||||
final String blobName = randomAlphaOfLength(10);
|
||||
final int lengthWritten = randomIntBetween(1, 100);
|
||||
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
|
||||
container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false);
|
||||
final AssertionError assertionError = expectThrows(AssertionError.class,
|
||||
() -> container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten - 1, false));
|
||||
assertThat(assertionError.getMessage(), startsWith("Tried to overwrite blob [" + blobName +"]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOverwriteShardSnapBlobFails() throws IOException {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
repository.start();
|
||||
final BlobContainer container =
|
||||
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
|
||||
final String blobName = BlobStoreRepository.SNAPSHOT_PREFIX + UUIDs.randomBase64UUID();
|
||||
final int lengthWritten = randomIntBetween(1, 100);
|
||||
final byte[] blobData = randomByteArrayOfLength(lengthWritten);
|
||||
container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false);
|
||||
final AssertionError assertionError = expectThrows(AssertionError.class,
|
||||
() -> container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false));
|
||||
assertThat(assertionError.getMessage(), equalTo("Shard level snap-{uuid} blobs should never be overwritten"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOverwriteSnapshotInfoBlob() {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment,
|
||||
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
|
||||
repository.start();
|
||||
|
||||
// We create a snap- blob for snapshot "foo" in the first generation
|
||||
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
|
||||
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
|
||||
-1L, false, Collections.emptyMap());
|
||||
|
||||
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
|
||||
final AssertionError assertionError = expectThrows(AssertionError.class,
|
||||
() -> repository.finalizeSnapshot(
|
||||
snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
|
||||
0, false, Collections.emptyMap()));
|
||||
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
|
||||
|
||||
// We try to write yet another snap- blob for "foo" in the next generation.
|
||||
// It passes cleanly because the content of the blob except for the timestamps.
|
||||
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
|
||||
0, false, Collections.emptyMap());
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) {
|
||||
final AssertionError assertionError = expectThrows(AssertionError.class, () -> blobContainer.readBlob(blobName));
|
||||
assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']'));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue