[Tests] Use mock storage in repository-gcs unit tests (#29397)
The repository-gcs unit tests rely on the GoogleCloudStorageTestServer but it would be better if they rely on a mocked Storage client instead. That would also help to extract the GoogleCloudStorageFixture and the GoogleCloudStorageTestServer classes in a QA third party project. Closes #28960
This commit is contained in:
parent
451a328281
commit
7d29087442
|
@ -24,12 +24,13 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
@Override
|
||||
protected BlobStore newBlobStore() {
|
||||
String bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||
return new GoogleCloudStorageBlobStore(Settings.EMPTY, bucket, MockStorage.newStorageClient(bucket, getTestName()));
|
||||
return new GoogleCloudStorageBlobStore(Settings.EMPTY, bucket, new MockStorage(bucket, new ConcurrentHashMap<>()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,14 +27,13 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
|
||||
import java.net.SocketPermission;
|
||||
import java.security.AccessController;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
||||
|
@ -42,9 +41,9 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|||
|
||||
private static final String BUCKET = "gcs-repository-test";
|
||||
|
||||
// Static storage client shared among all nodes in order to act like a remote repository service:
|
||||
// Static list of blobs shared among all nodes in order to act like a remote repository service:
|
||||
// all nodes must see the same content
|
||||
private static final AtomicReference<Storage> storage = new AtomicReference<>();
|
||||
private static final ConcurrentMap<String, byte[]> blobs = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
|
@ -62,15 +61,17 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpStorage() {
|
||||
storage.set(MockStorage.newStorageClient(BUCKET, GoogleCloudStorageBlobStoreRepositoryTests.class.getName()));
|
||||
@AfterClass
|
||||
public static void wipeRepository() {
|
||||
blobs.clear();
|
||||
}
|
||||
|
||||
public static class MockGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
|
||||
|
||||
public MockGoogleCloudStoragePlugin(final Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GoogleCloudStorageService createStorageService(Environment environment) {
|
||||
return new MockGoogleCloudStorageService(environment, getClientsSettings());
|
||||
|
@ -85,9 +86,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|||
|
||||
@Override
|
||||
public Storage createClient(String clientName) {
|
||||
// The actual impl might open a connection. So check we have permission when this call is made.
|
||||
AccessController.checkPermission(new SocketPermission("*", "connect"));
|
||||
return storage.get();
|
||||
return new MockStorage(BUCKET, blobs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,12 +24,13 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.repositories.ESBlobStoreTestCase;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class GoogleCloudStorageBlobStoreTests extends ESBlobStoreTestCase {
|
||||
|
||||
@Override
|
||||
protected BlobStore newBlobStore() {
|
||||
String bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||
return new GoogleCloudStorageBlobStore(Settings.EMPTY, bucket, MockStorage.newStorageClient(bucket, getTestName()));
|
||||
return new GoogleCloudStorageBlobStore(Settings.EMPTY, bucket, new MockStorage(bucket, new ConcurrentHashMap<>()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,74 +19,289 @@
|
|||
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.api.client.googleapis.json.GoogleJsonError;
|
||||
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||
import com.google.api.client.http.AbstractInputStreamContent;
|
||||
import com.google.api.client.http.HttpHeaders;
|
||||
import com.google.api.client.http.HttpMethods;
|
||||
import com.google.api.client.http.HttpRequest;
|
||||
import com.google.api.client.http.HttpRequestInitializer;
|
||||
import com.google.api.client.http.HttpResponseException;
|
||||
import com.google.api.client.http.LowLevelHttpRequest;
|
||||
import com.google.api.client.http.LowLevelHttpResponse;
|
||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||
import com.google.api.client.http.MultipartContent;
|
||||
import com.google.api.client.json.JsonFactory;
|
||||
import com.google.api.client.testing.http.MockHttpTransport;
|
||||
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
|
||||
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
|
||||
import com.google.api.services.storage.Storage;
|
||||
import com.google.api.services.storage.model.Bucket;
|
||||
import com.google.api.services.storage.model.StorageObject;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.io.InputStream;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* {@link MockStorage} is a utility class that provides {@link Storage} clients that works
|
||||
* against an embedded {@link GoogleCloudStorageTestServer}.
|
||||
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
|
||||
* in a given concurrent map.
|
||||
*/
|
||||
class MockStorage extends com.google.api.client.testing.http.MockHttpTransport {
|
||||
class MockStorage extends Storage {
|
||||
|
||||
/**
|
||||
* Embedded test server that emulates a Google Cloud Storage service
|
||||
**/
|
||||
private final GoogleCloudStorageTestServer server = new GoogleCloudStorageTestServer();
|
||||
/* A custom HTTP header name used to propagate the name of the blobs to delete in batch requests */
|
||||
private static final String DELETION_HEADER = "x-blob-to-delete";
|
||||
|
||||
private MockStorage() {
|
||||
private final String bucketName;
|
||||
private final ConcurrentMap<String, byte[]> blobs;
|
||||
|
||||
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) {
|
||||
super(new MockedHttpTransport(blobs), mock(JsonFactory.class), mock(HttpRequestInitializer.class));
|
||||
this.bucketName = bucket;
|
||||
this.blobs = blobs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
|
||||
return new MockLowLevelHttpRequest() {
|
||||
@Override
|
||||
public LowLevelHttpResponse execute() throws IOException {
|
||||
return convert(server.handle(method, url, getHeaders(), getContentAsBytes()));
|
||||
}
|
||||
|
||||
/** Returns the LowLevelHttpRequest body as an array of bytes **/
|
||||
byte[] getContentAsBytes() throws IOException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
if (getStreamingContent() != null) {
|
||||
getStreamingContent().writeTo(out);
|
||||
}
|
||||
return out.toByteArray();
|
||||
}
|
||||
};
|
||||
public Buckets buckets() {
|
||||
return new MockBuckets();
|
||||
}
|
||||
|
||||
private static MockLowLevelHttpResponse convert(final GoogleCloudStorageTestServer.Response response) {
|
||||
final MockLowLevelHttpResponse lowLevelHttpResponse = new MockLowLevelHttpResponse();
|
||||
for (Map.Entry<String, String> header : response.headers.entrySet()) {
|
||||
lowLevelHttpResponse.addHeader(header.getKey(), header.getValue());
|
||||
@Override
|
||||
public Objects objects() {
|
||||
return new MockObjects();
|
||||
}
|
||||
|
||||
class MockBuckets extends Buckets {
|
||||
|
||||
@Override
|
||||
public Get get(String getBucket) {
|
||||
return new Get(getBucket) {
|
||||
@Override
|
||||
public Bucket execute() {
|
||||
if (bucketName.equals(getBucket())) {
|
||||
Bucket bucket = new Bucket();
|
||||
bucket.setId(bucketName);
|
||||
return bucket;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
lowLevelHttpResponse.setContentType(response.contentType);
|
||||
lowLevelHttpResponse.setStatusCode(response.status.getStatus());
|
||||
lowLevelHttpResponse.setReasonPhrase(response.status.toString());
|
||||
if (response.body != null) {
|
||||
lowLevelHttpResponse.setContent(response.body);
|
||||
lowLevelHttpResponse.setContentLength(response.body.length);
|
||||
}
|
||||
|
||||
class MockObjects extends Objects {
|
||||
|
||||
@Override
|
||||
public Get get(String getBucket, String getObject) {
|
||||
return new Get(getBucket, getObject) {
|
||||
@Override
|
||||
public StorageObject execute() throws IOException {
|
||||
if (bucketName.equals(getBucket()) == false) {
|
||||
throw newBucketNotFoundException(getBucket());
|
||||
}
|
||||
if (blobs.containsKey(getObject()) == false) {
|
||||
throw newObjectNotFoundException(getObject());
|
||||
}
|
||||
|
||||
StorageObject storageObject = new StorageObject();
|
||||
storageObject.setId(getObject());
|
||||
return storageObject;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream executeMediaAsInputStream() throws IOException {
|
||||
if (bucketName.equals(getBucket()) == false) {
|
||||
throw newBucketNotFoundException(getBucket());
|
||||
}
|
||||
if (blobs.containsKey(getObject()) == false) {
|
||||
throw newObjectNotFoundException(getObject());
|
||||
}
|
||||
return new ByteArrayInputStream(blobs.get(getObject()));
|
||||
}
|
||||
};
|
||||
}
|
||||
return lowLevelHttpResponse;
|
||||
|
||||
@Override
|
||||
public Insert insert(String insertBucket, StorageObject insertObject, AbstractInputStreamContent insertStream) {
|
||||
return new Insert(insertBucket, insertObject) {
|
||||
@Override
|
||||
public StorageObject execute() throws IOException {
|
||||
if (bucketName.equals(getBucket()) == false) {
|
||||
throw newBucketNotFoundException(getBucket());
|
||||
}
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(insertStream.getInputStream(), out);
|
||||
blobs.put(getName(), out.toByteArray());
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public List list(String listBucket) {
|
||||
return new List(listBucket) {
|
||||
@Override
|
||||
public com.google.api.services.storage.model.Objects execute() throws IOException {
|
||||
if (bucketName.equals(getBucket()) == false) {
|
||||
throw newBucketNotFoundException(getBucket());
|
||||
}
|
||||
|
||||
final com.google.api.services.storage.model.Objects objects = new com.google.api.services.storage.model.Objects();
|
||||
|
||||
final java.util.List<StorageObject> storageObjects = new ArrayList<>();
|
||||
for (Entry<String, byte[]> blob : blobs.entrySet()) {
|
||||
if (getPrefix() == null || blob.getKey().startsWith(getPrefix())) {
|
||||
StorageObject storageObject = new StorageObject();
|
||||
storageObject.setId(blob.getKey());
|
||||
storageObject.setName(blob.getKey());
|
||||
storageObject.setSize(BigInteger.valueOf((long) blob.getValue().length));
|
||||
storageObjects.add(storageObject);
|
||||
}
|
||||
}
|
||||
|
||||
objects.setItems(storageObjects);
|
||||
return objects;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Delete delete(String deleteBucket, String deleteObject) {
|
||||
return new Delete(deleteBucket, deleteObject) {
|
||||
@Override
|
||||
public Void execute() throws IOException {
|
||||
if (bucketName.equals(getBucket()) == false) {
|
||||
throw newBucketNotFoundException(getBucket());
|
||||
}
|
||||
|
||||
if (blobs.containsKey(getObject()) == false) {
|
||||
throw newObjectNotFoundException(getObject());
|
||||
}
|
||||
|
||||
blobs.remove(getObject());
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpRequest buildHttpRequest() throws IOException {
|
||||
HttpRequest httpRequest = super.buildHttpRequest();
|
||||
httpRequest.getHeaders().put(DELETION_HEADER, getObject());
|
||||
return httpRequest;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Copy copy(String srcBucket, String srcObject, String destBucket, String destObject, StorageObject content) {
|
||||
return new Copy(srcBucket, srcObject, destBucket, destObject, content) {
|
||||
@Override
|
||||
public StorageObject execute() throws IOException {
|
||||
if (bucketName.equals(getSourceBucket()) == false) {
|
||||
throw newBucketNotFoundException(getSourceBucket());
|
||||
}
|
||||
if (bucketName.equals(getDestinationBucket()) == false) {
|
||||
throw newBucketNotFoundException(getDestinationBucket());
|
||||
}
|
||||
|
||||
final byte[] bytes = blobs.get(getSourceObject());
|
||||
if (bytes == null) {
|
||||
throw newObjectNotFoundException(getSourceObject());
|
||||
}
|
||||
blobs.put(getDestinationObject(), bytes);
|
||||
|
||||
StorageObject storageObject = new StorageObject();
|
||||
storageObject.setId(getDestinationObject());
|
||||
return storageObject;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private static GoogleJsonResponseException newBucketNotFoundException(final String bucket) {
|
||||
HttpResponseException.Builder builder = new HttpResponseException.Builder(404, "Bucket not found: " + bucket, new HttpHeaders());
|
||||
return new GoogleJsonResponseException(builder, new GoogleJsonError());
|
||||
}
|
||||
|
||||
private static GoogleJsonResponseException newObjectNotFoundException(final String object) {
|
||||
HttpResponseException.Builder builder = new HttpResponseException.Builder(404, "Object not found: " + object, new HttpHeaders());
|
||||
return new GoogleJsonResponseException(builder, new GoogleJsonError());
|
||||
}
|
||||
|
||||
/**
|
||||
* Instanciates a mocked Storage client for tests.
|
||||
* {@link MockedHttpTransport} extends the existing testing transport to analyze the content
|
||||
* of {@link com.google.api.client.googleapis.batch.BatchRequest} and delete the appropriates
|
||||
* blobs. We use this because {@link Storage#batch()} is final and there is no other way to
|
||||
* extend batch requests for testing purposes.
|
||||
*/
|
||||
public static Storage newStorageClient(final String bucket, final String applicationName) {
|
||||
MockStorage mockStorage = new MockStorage();
|
||||
mockStorage.server.createBucket(bucket);
|
||||
static class MockedHttpTransport extends MockHttpTransport {
|
||||
|
||||
return new Storage.Builder(mockStorage, JacksonFactory.getDefaultInstance(), null)
|
||||
.setApplicationName(applicationName)
|
||||
.build();
|
||||
private final ConcurrentMap<String, byte[]> blobs;
|
||||
|
||||
MockedHttpTransport(final ConcurrentMap<String, byte[]> blobs) {
|
||||
this.blobs = blobs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LowLevelHttpRequest buildRequest(final String method, final String url) throws IOException {
|
||||
// We analyze the content of the Batch request to detect our custom HTTP header,
|
||||
// and extract from it the name of the blob to delete. Then we reply a simple
|
||||
// batch response so that the client parser is happy.
|
||||
//
|
||||
// See https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch for the
|
||||
// format of the batch request body.
|
||||
if (HttpMethods.POST.equals(method) && url.endsWith("/batch")) {
|
||||
return new MockLowLevelHttpRequest() {
|
||||
@Override
|
||||
public LowLevelHttpResponse execute() throws IOException {
|
||||
final String contentType = new MultipartContent().getType();
|
||||
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
|
||||
getStreamingContent().writeTo(out);
|
||||
|
||||
Streams.readAllLines(new ByteArrayInputStream(out.toByteArray()), line -> {
|
||||
if (line != null && line.startsWith(DELETION_HEADER)) {
|
||||
builder.append("--__END_OF_PART__\r\n");
|
||||
builder.append("Content-Type: application/http").append("\r\n");
|
||||
builder.append("\r\n");
|
||||
builder.append("HTTP/1.1 ");
|
||||
|
||||
final String blobName = line.substring(line.indexOf(':') + 1).trim();
|
||||
if (blobs.containsKey(blobName)) {
|
||||
builder.append(RestStatus.OK.getStatus());
|
||||
blobs.remove(blobName);
|
||||
} else {
|
||||
builder.append(RestStatus.NOT_FOUND.getStatus());
|
||||
}
|
||||
builder.append("\r\n");
|
||||
builder.append("Content-Type: application/json; charset=UTF-8").append("\r\n");
|
||||
builder.append("Content-Length: 0").append("\r\n");
|
||||
builder.append("\r\n");
|
||||
}
|
||||
});
|
||||
builder.append("\r\n");
|
||||
builder.append("--__END_OF_PART__--");
|
||||
}
|
||||
|
||||
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
|
||||
response.setStatusCode(200);
|
||||
response.setContent(builder.toString());
|
||||
response.setContentType(contentType);
|
||||
return response;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return super.buildRequest(method, url);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue