* Remove BlobContainer Tests against Mocks Removing all these weird mocks as asked for by #30424. All these tests are now part of real repository ITs and otherwise left unchanged if they had independent tests that didn't call the `createBlobStore` method previously. The HDFS tests also get added coverage as a side-effect because they did not have an implementation of the abstract repository ITs. Closes #30424
This commit is contained in:
parent
3717c733ff
commit
761d6e8e4b
|
@ -72,13 +72,13 @@ import java.util.regex.Pattern;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.elasticsearch.repositories.ESBlobStoreContainerTestCase.randomBytes;
|
||||
import static org.elasticsearch.repositories.azure.AzureRepository.Repository.CONTAINER_SETTING;
|
||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING;
|
||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.ENDPOINT_SUFFIX_SETTING;
|
||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.KEY_SETTING;
|
||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.MAX_RETRIES_SETTING;
|
||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
private ThreadPool threadPool;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BlobStore newBlobStore() {
|
||||
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
|
||||
AzureStorageServiceMock client = new AzureStorageServiceMock();
|
||||
return new AzureBlobStore(repositoryMetaData, client, threadPool);
|
||||
}
|
||||
}
|
|
@ -1,167 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.internal.io.Streams;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.SocketPermission;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.security.AccessController;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
/**
|
||||
* In memory storage for unit tests
|
||||
*/
|
||||
public class AzureStorageServiceMock extends AzureStorageService {
|
||||
|
||||
protected final Map<String, ByteArrayOutputStream> blobs = new ConcurrentHashMap<>();
|
||||
|
||||
public AzureStorageServiceMock() {
|
||||
super(Settings.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean blobExists(String account, String container, String blob) {
|
||||
return blobs.containsKey(blob);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlob(String account, String container, String blob) throws StorageException {
|
||||
if (blobs.remove(blob) == null) {
|
||||
throw new StorageException("BlobNotFound", "[" + blob + "] does not exist.", 404, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream(String account, String container, String blob) throws IOException {
|
||||
if (!blobExists(account, container, blob)) {
|
||||
throw new NoSuchFileException("missing blob [" + blob + "]");
|
||||
}
|
||||
return AzureStorageService.giveSocketPermissionsToStream(new PermissionRequiringInputStream(blobs.get(blob).toByteArray()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix) {
|
||||
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
|
||||
blobs.forEach((String blobName, ByteArrayOutputStream bos) -> {
|
||||
final String checkBlob;
|
||||
if (keyPath != null && !keyPath.isEmpty()) {
|
||||
// strip off key path from the beginning of the blob name
|
||||
checkBlob = blobName.replace(keyPath, "");
|
||||
} else {
|
||||
checkBlob = blobName;
|
||||
}
|
||||
if (prefix == null || startsWithIgnoreCase(checkBlob, prefix)) {
|
||||
blobsBuilder.put(blobName, new PlainBlobMetaData(checkBlob, bos.size()));
|
||||
}
|
||||
});
|
||||
return blobsBuilder.immutableMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
|
||||
boolean failIfAlreadyExists) throws StorageException, FileAlreadyExistsException {
|
||||
if (failIfAlreadyExists && blobs.containsKey(blobName)) {
|
||||
throw new FileAlreadyExistsException(blobName);
|
||||
}
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
blobs.put(blobName, outputStream);
|
||||
Streams.copy(inputStream, outputStream);
|
||||
} catch (IOException e) {
|
||||
throw new StorageException("MOCK", "Error while writing mock stream", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the given String starts with the specified prefix,
|
||||
* ignoring upper/lower case.
|
||||
*
|
||||
* @param str the String to check
|
||||
* @param prefix the prefix to look for
|
||||
* @see java.lang.String#startsWith
|
||||
*/
|
||||
private static boolean startsWithIgnoreCase(String str, String prefix) {
|
||||
if (str == null || prefix == null) {
|
||||
return false;
|
||||
}
|
||||
if (str.startsWith(prefix)) {
|
||||
return true;
|
||||
}
|
||||
if (str.length() < prefix.length()) {
|
||||
return false;
|
||||
}
|
||||
String lcStr = str.substring(0, prefix.length()).toLowerCase(Locale.ROOT);
|
||||
String lcPrefix = prefix.toLowerCase(Locale.ROOT);
|
||||
return lcStr.equals(lcPrefix);
|
||||
}
|
||||
|
||||
private static class PermissionRequiringInputStream extends ByteArrayInputStream {
|
||||
|
||||
private PermissionRequiringInputStream(byte[] buf) {
|
||||
super(buf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read() {
|
||||
AccessController.checkPermission(new SocketPermission("*", "connect"));
|
||||
return super.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b) throws IOException {
|
||||
AccessController.checkPermission(new SocketPermission("*", "connect"));
|
||||
return super.read(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read(byte[] b, int off, int len) {
|
||||
AccessController.checkPermission(new SocketPermission("*", "connect"));
|
||||
return super.read(b, off, len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings) {
|
||||
return emptyMap();
|
||||
}
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package com.google.cloud.storage;
|
||||
|
||||
import com.google.cloud.storage.spi.v1.StorageRpc;
|
||||
|
||||
/**
|
||||
* Utility class that exposed Google SDK package protected methods to
|
||||
* create specific StorageRpc objects in unit tests.
|
||||
*/
|
||||
public class StorageRpcOptionUtils {
|
||||
|
||||
private StorageRpcOptionUtils(){}
|
||||
|
||||
public static String getPrefix(final Storage.BlobListOption... options) {
|
||||
if (options != null) {
|
||||
for (final Option option : options) {
|
||||
final StorageRpc.Option rpcOption = option.getRpcOption();
|
||||
if (StorageRpc.Option.PREFIX.equals(rpcOption)) {
|
||||
return (String) option.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package com.google.cloud.storage;
|
||||
|
||||
/**
|
||||
* Utility class that exposed Google SDK package protected methods to
|
||||
* create buckets and blobs objects in unit tests.
|
||||
*/
|
||||
public class StorageTestUtils {
|
||||
|
||||
private StorageTestUtils(){}
|
||||
|
||||
public static Bucket createBucket(final Storage storage, final String bucketName) {
|
||||
return new Bucket(storage, (BucketInfo.BuilderImpl) BucketInfo.newBuilder(bucketName));
|
||||
}
|
||||
|
||||
public static Blob createBlob(final Storage storage, final String bucketName, final String blobName, final long blobSize) {
|
||||
return new Blob(storage, (BlobInfo.BuilderImpl) BlobInfo.newBuilder(bucketName, blobName).setSize(blobSize));
|
||||
}
|
||||
}
|
|
@ -74,7 +74,7 @@ import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeLimit;
|
|||
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart;
|
||||
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.elasticsearch.repositories.ESBlobStoreContainerTestCase.randomBytes;
|
||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
|
||||
|
|
|
@ -26,20 +26,14 @@ import com.google.cloud.storage.Storage;
|
|||
import com.google.cloud.storage.StorageBatch;
|
||||
import com.google.cloud.storage.StorageBatchResult;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -51,44 +45,7 @@ import static org.mockito.Mockito.doThrow;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
@Override
|
||||
protected BlobStore newBlobStore() {
|
||||
final String bucketName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
|
||||
try {
|
||||
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
|
||||
}
|
||||
|
||||
public void testWriteReadLarge() throws IOException {
|
||||
try(BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
|
||||
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
// override file, to check if we get latest contents
|
||||
random().nextBytes(data);
|
||||
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||
}
|
||||
try (InputStream stream = container.readBlob("foobar")) {
|
||||
BytesRefBuilder target = new BytesRefBuilder();
|
||||
while (target.length() < data.length) {
|
||||
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||
int offset = scaledRandomIntBetween(0, buffer.length - 1);
|
||||
int read = stream.read(buffer, offset, buffer.length - offset);
|
||||
target.append(new BytesRef(buffer, offset, read));
|
||||
}
|
||||
assertEquals(data.length, target.length());
|
||||
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||
}
|
||||
}
|
||||
}
|
||||
public class GoogleCloudStorageBlobStoreContainerTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Exception {
|
||||
|
|
|
@ -26,11 +26,17 @@ import com.sun.net.httpserver.HttpExchange;
|
|||
import com.sun.net.httpserver.HttpHandler;
|
||||
import fixture.gcs.FakeOAuth2HttpHandler;
|
||||
import fixture.gcs.GoogleCloudStorageHttpHandler;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -46,6 +52,8 @@ import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTes
|
|||
import org.threeten.bp.Duration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -153,6 +161,31 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage());
|
||||
}
|
||||
|
||||
public void testWriteReadLarge() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
|
||||
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
// override file, to check if we get latest contents
|
||||
random().nextBytes(data);
|
||||
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||
}
|
||||
try (InputStream stream = container.readBlob("foobar")) {
|
||||
BytesRefBuilder target = new BytesRefBuilder();
|
||||
while (target.length() < data.length) {
|
||||
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||
int offset = scaledRandomIntBetween(0, buffer.length - 1);
|
||||
int read = stream.read(buffer, offset, buffer.length - offset);
|
||||
target.append(new BytesRef(buffer, offset, read));
|
||||
}
|
||||
assertEquals(data.length, target.length());
|
||||
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||
}
|
||||
container.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
|
||||
|
||||
public TestGoogleCloudStoragePlugin(Settings settings) {
|
||||
|
|
|
@ -1,574 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.api.gax.paging.Page;
|
||||
import com.google.cloud.BatchResult;
|
||||
import com.google.cloud.Policy;
|
||||
import com.google.cloud.ReadChannel;
|
||||
import com.google.cloud.RestorableState;
|
||||
import com.google.cloud.WriteChannel;
|
||||
import com.google.cloud.storage.Acl;
|
||||
import com.google.cloud.storage.Blob;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.cloud.storage.Bucket;
|
||||
import com.google.cloud.storage.BucketInfo;
|
||||
import com.google.cloud.storage.CopyWriter;
|
||||
import com.google.cloud.storage.ServiceAccount;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageBatch;
|
||||
import com.google.cloud.storage.StorageBatchResult;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.google.cloud.storage.StorageRpcOptionUtils;
|
||||
import com.google.cloud.storage.StorageTestUtils;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyVararg;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
|
||||
* in a given concurrent map.
|
||||
*/
|
||||
class MockStorage implements Storage {
|
||||
|
||||
private final Random random;
|
||||
private final String bucketName;
|
||||
private final ConcurrentMap<String, byte[]> blobs;
|
||||
|
||||
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) {
|
||||
this.random = random;
|
||||
this.bucketName = Objects.requireNonNull(bucket);
|
||||
this.blobs = Objects.requireNonNull(blobs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket get(String bucket, BucketGetOption... options) {
|
||||
if (bucketName.equals(bucket)) {
|
||||
return StorageTestUtils.createBucket(this, bucketName);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket lockRetentionPolicy(final BucketInfo bucket, final BucketTargetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob get(BlobId blob) {
|
||||
if (bucketName.equals(blob.getBucket())) {
|
||||
final byte[] bytes = blobs.get(blob.getName());
|
||||
if (bytes != null) {
|
||||
return StorageTestUtils.createBlob(this, bucketName, blob.getName(), bytes.length);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(BlobId blob) {
|
||||
if (bucketName.equals(blob.getBucket()) && blobs.containsKey(blob.getName())) {
|
||||
return blobs.remove(blob.getName()) != null;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Boolean> delete(Iterable<BlobId> blobIds) {
|
||||
final List<Boolean> ans = new ArrayList<>();
|
||||
for (final BlobId blobId : blobIds) {
|
||||
ans.add(delete(blobId));
|
||||
}
|
||||
return ans;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options) {
|
||||
if (bucketName.equals(blobInfo.getBucket()) == false) {
|
||||
throw new StorageException(404, "Bucket not found");
|
||||
}
|
||||
if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) {
|
||||
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content);
|
||||
if (existingBytes != null) {
|
||||
throw new StorageException(412, "Blob already exists");
|
||||
}
|
||||
} else {
|
||||
blobs.put(blobInfo.getName(), content);
|
||||
}
|
||||
return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<Blob> list(String bucket, BlobListOption... options) {
|
||||
if (bucketName.equals(bucket) == false) {
|
||||
throw new StorageException(404, "Bucket not found");
|
||||
}
|
||||
final Storage storage = this;
|
||||
final String prefix = StorageRpcOptionUtils.getPrefix(options);
|
||||
|
||||
return new Page<Blob>() {
|
||||
@Override
|
||||
public boolean hasNextPage() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNextPageToken() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<Blob> getNextPage() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Blob> iterateAll() {
|
||||
return blobs.entrySet().stream()
|
||||
.filter(blob -> ((prefix == null) || blob.getKey().startsWith(prefix)))
|
||||
.map(blob -> StorageTestUtils.createBlob(storage, bucketName, blob.getKey(), blob.getValue().length))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Blob> getValues() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadChannel reader(BlobId blob, BlobSourceOption... options) {
|
||||
if (bucketName.equals(blob.getBucket())) {
|
||||
final byte[] bytes = blobs.get(blob.getName());
|
||||
|
||||
final ReadableByteChannel readableByteChannel;
|
||||
if (bytes != null) {
|
||||
readableByteChannel = Channels.newChannel(new ByteArrayInputStream(bytes));
|
||||
} else {
|
||||
readableByteChannel = new ReadableByteChannel() {
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
throw new StorageException(404, "Object not found");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
};
|
||||
}
|
||||
return new ReadChannel() {
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeWhileHandlingException(readableByteChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long position) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setChunkSize(int chunkSize) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestorableState<ReadChannel> capture() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
return readableByteChannel.read(dst);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return readableByteChannel.isOpen();
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
@Override
|
||||
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
|
||||
if (bucketName.equals(blobInfo.getBucket())) {
|
||||
final ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
return new WriteChannel() {
|
||||
|
||||
private volatile boolean failed;
|
||||
|
||||
final WritableByteChannel writableByteChannel = Channels.newChannel(output);
|
||||
|
||||
@Override
|
||||
public void setChunkSize(int chunkSize) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestorableState<WriteChannel> capture() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(ByteBuffer src) throws IOException {
|
||||
// Only fail a blob once on a 410 error since the error is so unlikely in practice
|
||||
if (simulated410s.add(blobInfo) && random.nextBoolean()) {
|
||||
failed = true;
|
||||
throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session");
|
||||
}
|
||||
return writableByteChannel.write(src);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return writableByteChannel.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeWhileHandlingException(writableByteChannel);
|
||||
if (failed == false) {
|
||||
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
|
||||
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
|
||||
if (existingBytes != null) {
|
||||
throw new StorageException(412, "Blob already exists");
|
||||
}
|
||||
} else {
|
||||
blobs.put(blobInfo.getName(), output.toByteArray());
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteChannel writer(URL signedURL) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Everything below this line is not implemented.
|
||||
|
||||
@Override
|
||||
public CopyWriter copy(CopyRequest copyRequest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob create(BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob create(BlobInfo blobInfo, BlobTargetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob get(String bucket, String blob, BlobGetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob get(BlobId blob, BlobGetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<Bucket> list(BucketListOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket update(BucketInfo bucketInfo, BucketTargetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob update(BlobInfo blobInfo, BlobTargetOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob update(BlobInfo blobInfo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(String bucket, BucketSourceOption... options) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(String bucket, String blob, BlobSourceOption... options) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(BlobId blob, BlobSourceOption... options) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Blob compose(ComposeRequest composeRequest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... options) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public StorageBatch batch() {
|
||||
final Answer<?> throwOnMissingMock = invocationOnMock -> {
|
||||
throw new AssertionError("Did not expect call to method [" + invocationOnMock.getMethod().getName() + ']');
|
||||
};
|
||||
final StorageBatch batch = mock(StorageBatch.class, throwOnMissingMock);
|
||||
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class, throwOnMissingMock);
|
||||
doAnswer(answer -> {
|
||||
BatchResult.Callback<Boolean, Exception> callback = (BatchResult.Callback<Boolean, Exception>) answer.getArguments()[0];
|
||||
callback.success(true);
|
||||
return null;
|
||||
}).when(result).notify(any(BatchResult.Callback.class));
|
||||
doAnswer(invocation -> {
|
||||
final BlobId blobId = (BlobId) invocation.getArguments()[0];
|
||||
delete(blobId);
|
||||
return result;
|
||||
}).when(batch).delete(any(BlobId.class), anyVararg());
|
||||
doAnswer(invocation -> null).when(batch).submit();
|
||||
return batch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadChannel reader(String bucket, String blob, BlobSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL signUrl(BlobInfo blobInfo, long duration, TimeUnit unit, SignUrlOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Blob> get(BlobId... blobIds) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Blob> get(Iterable<BlobId> blobIds) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Blob> update(BlobInfo... blobInfos) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Blob> update(Iterable<BlobInfo> blobInfos) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Boolean> delete(BlobId... blobIds) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl getAcl(String bucket, Acl.Entity entity, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl getAcl(String bucket, Acl.Entity entity) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteAcl(String bucket, Acl.Entity entity, BucketSourceOption... options) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteAcl(String bucket, Acl.Entity entity) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl createAcl(String bucket, Acl acl, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl createAcl(String bucket, Acl acl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl updateAcl(String bucket, Acl acl, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl updateAcl(String bucket, Acl acl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Acl> listAcls(String bucket, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Acl> listAcls(String bucket) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl getDefaultAcl(String bucket, Acl.Entity entity) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteDefaultAcl(String bucket, Acl.Entity entity) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl createDefaultAcl(String bucket, Acl acl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl updateDefaultAcl(String bucket, Acl acl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Acl> listDefaultAcls(String bucket) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl getAcl(BlobId blob, Acl.Entity entity) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteAcl(BlobId blob, Acl.Entity entity) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl createAcl(BlobId blob, Acl acl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Acl updateAcl(BlobId blob, Acl acl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Acl> listAcls(BlobId blob) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Policy getIamPolicy(String bucket, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Policy setIamPolicy(String bucket, Policy policy, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Boolean> testIamPermissions(String bucket, List<String> permissions, BucketSourceOption... options) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceAccount getServiceAccount(String projectId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageOptions getOptions() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -28,13 +28,12 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URI;
|
||||
|
@ -45,13 +44,12 @@ import java.security.PrivilegedActionException;
|
|||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
|
||||
@ThreadLeakFilters(filters = {HdfsClientThreadLeakFilter.class})
|
||||
public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.readBlobFully;
|
||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.writeBlob;
|
||||
|
||||
@Override
|
||||
protected BlobStore newBlobStore() throws IOException {
|
||||
return new HdfsBlobStore(createTestContext(), "temp", 1024, false);
|
||||
}
|
||||
@ThreadLeakFilters(filters = {HdfsClientThreadLeakFilter.class})
|
||||
public class HdfsBlobStoreContainerTests extends ESTestCase {
|
||||
|
||||
private FileContext createTestContext() {
|
||||
FileContext fileContext;
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories.hdfs;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
|
||||
// Ony using a single node here since the TestingFs only supports the single-node case
|
||||
@ESIntegTestCase.ClusterScope(numDataNodes = 1, supportsDedicatedMasters = false)
|
||||
public class HdfsBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected String repositoryType() {
|
||||
return "hdfs";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings repositorySettings() {
|
||||
assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", HdfsRepositoryTests.isJava11());
|
||||
return Settings.builder()
|
||||
.put("uri", "hdfs:///")
|
||||
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
|
||||
.put("path", "foo")
|
||||
.put("chunk_size", randomIntBetween(100, 1000) + "k")
|
||||
.put("compress", randomBoolean()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Collections.singletonList(HdfsPlugin.class);
|
||||
}
|
||||
}
|
|
@ -78,7 +78,7 @@ public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean isJava11() {
|
||||
public static boolean isJava11() {
|
||||
return JavaVersion.current().equals(JavaVersion.parse("11"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,169 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.SdkClientException;
|
||||
import com.amazonaws.services.s3.AbstractAmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
class MockAmazonS3 extends AbstractAmazonS3 {
|
||||
|
||||
private final ConcurrentMap<String, byte[]> blobs;
|
||||
private final String bucket;
|
||||
private final boolean serverSideEncryption;
|
||||
private final String cannedACL;
|
||||
private final String storageClass;
|
||||
|
||||
MockAmazonS3(final ConcurrentMap<String, byte[]> blobs,
|
||||
final String bucket,
|
||||
final boolean serverSideEncryption,
|
||||
final String cannedACL,
|
||||
final String storageClass) {
|
||||
this.blobs = Objects.requireNonNull(blobs);
|
||||
this.bucket = Objects.requireNonNull(bucket);
|
||||
this.serverSideEncryption = serverSideEncryption;
|
||||
this.cannedACL = cannedACL;
|
||||
this.storageClass = storageClass;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doesObjectExist(final String bucketName, final String objectName) throws SdkClientException {
|
||||
assertThat(bucketName, equalTo(bucket));
|
||||
return blobs.containsKey(objectName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutObjectResult putObject(final PutObjectRequest request) throws AmazonClientException {
|
||||
assertThat(request.getBucketName(), equalTo(bucket));
|
||||
assertThat(request.getMetadata().getSSEAlgorithm(), serverSideEncryption ? equalTo("AES256") : nullValue());
|
||||
assertThat(request.getCannedAcl(), notNullValue());
|
||||
assertThat(request.getCannedAcl().toString(), cannedACL != null ? equalTo(cannedACL) : equalTo("private"));
|
||||
assertThat(request.getStorageClass(), storageClass != null ? equalTo(storageClass) : equalTo("STANDARD"));
|
||||
|
||||
|
||||
final String blobName = request.getKey();
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
try {
|
||||
Streams.copy(request.getInputStream(), out);
|
||||
blobs.put(blobName, out.toByteArray());
|
||||
} catch (IOException e) {
|
||||
throw new AmazonClientException(e);
|
||||
}
|
||||
return new PutObjectResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3Object getObject(final GetObjectRequest request) throws AmazonClientException {
|
||||
assertThat(request.getBucketName(), equalTo(bucket));
|
||||
|
||||
final String blobName = request.getKey();
|
||||
final byte[] content = blobs.get(blobName);
|
||||
if (content == null) {
|
||||
AmazonS3Exception exception = new AmazonS3Exception("[" + blobName + "] does not exist.");
|
||||
exception.setStatusCode(404);
|
||||
throw exception;
|
||||
}
|
||||
|
||||
ObjectMetadata metadata = new ObjectMetadata();
|
||||
metadata.setContentLength(content.length);
|
||||
|
||||
S3Object s3Object = new S3Object();
|
||||
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(content), null, false));
|
||||
s3Object.setKey(blobName);
|
||||
s3Object.setObjectMetadata(metadata);
|
||||
|
||||
return s3Object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectListing listObjects(final ListObjectsRequest request) throws AmazonClientException {
|
||||
assertThat(request.getBucketName(), equalTo(bucket));
|
||||
|
||||
final ObjectListing listing = new ObjectListing();
|
||||
listing.setBucketName(request.getBucketName());
|
||||
listing.setPrefix(request.getPrefix());
|
||||
|
||||
for (Map.Entry<String, byte[]> blob : blobs.entrySet()) {
|
||||
if (Strings.isEmpty(request.getPrefix()) || blob.getKey().startsWith(request.getPrefix())) {
|
||||
S3ObjectSummary summary = new S3ObjectSummary();
|
||||
summary.setBucketName(request.getBucketName());
|
||||
summary.setKey(blob.getKey());
|
||||
summary.setSize(blob.getValue().length);
|
||||
listing.getObjectSummaries().add(summary);
|
||||
}
|
||||
}
|
||||
return listing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteObject(final DeleteObjectRequest request) throws AmazonClientException {
|
||||
assertThat(request.getBucketName(), equalTo(bucket));
|
||||
blobs.remove(request.getKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
// TODO check close
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) throws SdkClientException {
|
||||
assertThat(request.getBucketName(), equalTo(bucket));
|
||||
|
||||
final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
|
||||
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
|
||||
if (blobs.remove(key.getKey()) != null) {
|
||||
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
|
||||
deletion.setKey(key.getKey());
|
||||
deletions.add(deletion);
|
||||
}
|
||||
}
|
||||
return new DeleteObjectsResult(deletions);
|
||||
}
|
||||
}
|
|
@ -34,15 +34,11 @@ import com.amazonaws.services.s3.model.PutObjectResult;
|
|||
import com.amazonaws.services.s3.model.StorageClass;
|
||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||
import com.amazonaws.services.s3.model.UploadPartResult;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -50,8 +46,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
@ -65,16 +59,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
||||
public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
protected BlobStore newBlobStore() {
|
||||
return randomMockS3BlobStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testVerifyOverwriteFails() {
|
||||
assumeFalse("not implemented because of S3's weak consistency model", true);
|
||||
}
|
||||
public class S3BlobStoreContainerTests extends ESTestCase {
|
||||
|
||||
public void testExecuteSingleUploadBlobSizeTooLarge() {
|
||||
final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(6, 10));
|
||||
|
@ -462,35 +447,4 @@ public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
|||
assertEquals("Expected number of parts [" + expectedParts + "] but got [" + result.v1() + "]", expectedParts, (long) result.v1());
|
||||
assertEquals("Expected remaining [" + expectedRemaining + "] but got [" + result.v2() + "]", expectedRemaining, (long) result.v2());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link S3BlobStore} with random settings.
|
||||
* <p>
|
||||
* The blobstore uses a {@link MockAmazonS3} client.
|
||||
*/
|
||||
public static S3BlobStore randomMockS3BlobStore() {
|
||||
String bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||
ByteSizeValue bufferSize = new ByteSizeValue(randomIntBetween(5, 100), ByteSizeUnit.MB);
|
||||
boolean serverSideEncryption = randomBoolean();
|
||||
|
||||
String cannedACL = null;
|
||||
if (randomBoolean()) {
|
||||
cannedACL = randomFrom(CannedAccessControlList.values()).toString();
|
||||
}
|
||||
|
||||
String storageClass = null;
|
||||
if (randomBoolean()) {
|
||||
storageClass = randomValueOtherThan(StorageClass.Glacier, () -> randomFrom(StorageClass.values())).toString();
|
||||
}
|
||||
|
||||
final AmazonS3 client = new MockAmazonS3(new ConcurrentHashMap<>(), bucket, serverSideEncryption, cannedACL, storageClass);
|
||||
final S3Service service = new S3Service() {
|
||||
@Override
|
||||
public synchronized AmazonS3Reference client(RepositoryMetaData repositoryMetaData) {
|
||||
return new AmazonS3Reference(client);
|
||||
}
|
||||
};
|
||||
return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass,
|
||||
new RepositoryMetaData(bucket, "s3", Settings.EMPTY));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,81 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||
public class FsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
protected BlobStore newBlobStore() throws IOException {
|
||||
final Settings settings;
|
||||
if (randomBoolean()) {
|
||||
settings = Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
|
||||
} else {
|
||||
settings = Settings.EMPTY;
|
||||
}
|
||||
return new FsBlobStore(settings, createTempDir(), false);
|
||||
}
|
||||
|
||||
public void testReadOnly() throws Exception {
|
||||
Path tempDir = createTempDir();
|
||||
Path path = tempDir.resolve("bar");
|
||||
|
||||
try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, true)) {
|
||||
assertFalse(Files.exists(path));
|
||||
BlobPath blobPath = BlobPath.cleanPath().add("foo");
|
||||
store.blobContainer(blobPath);
|
||||
Path storePath = store.path();
|
||||
for (String d : blobPath) {
|
||||
storePath = storePath.resolve(d);
|
||||
}
|
||||
assertFalse(Files.exists(storePath));
|
||||
}
|
||||
|
||||
try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, false)) {
|
||||
assertTrue(Files.exists(path));
|
||||
BlobPath blobPath = BlobPath.cleanPath().add("foo");
|
||||
BlobContainer container = store.blobContainer(blobPath);
|
||||
Path storePath = store.path();
|
||||
for (String d : blobPath) {
|
||||
storePath = storePath.resolve(d);
|
||||
}
|
||||
assertTrue(Files.exists(storePath));
|
||||
assertTrue(Files.isDirectory(storePath));
|
||||
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(container, "test", new BytesArray(data));
|
||||
assertArrayEquals(readBlobFully(container, "test", data.length), data);
|
||||
assertTrue(BlobStoreTestUtil.blobExists(container, "test"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,17 +19,21 @@
|
|||
package org.elasticsearch.repositories.fs;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
@ -55,7 +59,7 @@ public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase
|
|||
return settings.build();
|
||||
}
|
||||
|
||||
public void testMissingDirectoriesNotCreatedInReadonlyRepository() throws IOException, ExecutionException, InterruptedException {
|
||||
public void testMissingDirectoriesNotCreatedInReadonlyRepository() throws IOException, InterruptedException {
|
||||
final String repoName = randomName();
|
||||
final Path repoPath = randomRepoPath();
|
||||
|
||||
|
@ -97,4 +101,37 @@ public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase
|
|||
|
||||
assertFalse("deleted path is not recreated in readonly repository", Files.exists(deletedPath));
|
||||
}
|
||||
|
||||
public void testReadOnly() throws Exception {
|
||||
Path tempDir = createTempDir();
|
||||
Path path = tempDir.resolve("bar");
|
||||
|
||||
try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, true)) {
|
||||
assertFalse(Files.exists(path));
|
||||
BlobPath blobPath = BlobPath.cleanPath().add("foo");
|
||||
store.blobContainer(blobPath);
|
||||
Path storePath = store.path();
|
||||
for (String d : blobPath) {
|
||||
storePath = storePath.resolve(d);
|
||||
}
|
||||
assertFalse(Files.exists(storePath));
|
||||
}
|
||||
|
||||
try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, false)) {
|
||||
assertTrue(Files.exists(path));
|
||||
BlobPath blobPath = BlobPath.cleanPath().add("foo");
|
||||
BlobContainer container = store.blobContainer(blobPath);
|
||||
Path storePath = store.path();
|
||||
for (String d : blobPath) {
|
||||
storePath = storePath.resolve(d);
|
||||
}
|
||||
assertTrue(Files.exists(storePath));
|
||||
assertTrue(Files.isDirectory(storePath));
|
||||
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(container, "test", new BytesArray(data));
|
||||
assertArrayEquals(readBlobFully(container, "test", data.length), data);
|
||||
assertTrue(BlobStoreTestUtil.blobExists(container, "test"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,213 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
|
||||
/**
|
||||
* Generic test case for blob store container implementation.
|
||||
* These tests check basic blob store functionality.
|
||||
*/
|
||||
public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
||||
|
||||
public void testReadNonExistingPath() throws IOException {
|
||||
try(BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
expectThrows(NoSuchFileException.class, () -> {
|
||||
try (InputStream is = container.readBlob("non-existing")) {
|
||||
is.read();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteRead() throws IOException {
|
||||
try(BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
// override file, to check if we get latest contents
|
||||
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||
}
|
||||
try (InputStream stream = container.readBlob("foobar")) {
|
||||
BytesRefBuilder target = new BytesRefBuilder();
|
||||
while (target.length() < data.length) {
|
||||
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||
int offset = scaledRandomIntBetween(0, buffer.length - 1);
|
||||
int read = stream.read(buffer, offset, buffer.length - offset);
|
||||
target.append(new BytesRef(buffer, offset, read));
|
||||
}
|
||||
assertEquals(data.length, target.length());
|
||||
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testList() throws IOException {
|
||||
try(BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
assertThat(container.listBlobs().size(), equalTo(0));
|
||||
int numberOfFooBlobs = randomIntBetween(0, 10);
|
||||
int numberOfBarBlobs = randomIntBetween(3, 20);
|
||||
Map<String, Long> generatedBlobs = new HashMap<>();
|
||||
for (int i = 0; i < numberOfFooBlobs; i++) {
|
||||
int length = randomIntBetween(10, 100);
|
||||
String name = "foo-" + i + "-";
|
||||
generatedBlobs.put(name, (long) length);
|
||||
writeRandomBlob(container, name, length);
|
||||
}
|
||||
for (int i = 1; i < numberOfBarBlobs; i++) {
|
||||
int length = randomIntBetween(10, 100);
|
||||
String name = "bar-" + i + "-";
|
||||
generatedBlobs.put(name, (long) length);
|
||||
writeRandomBlob(container, name, length);
|
||||
}
|
||||
int length = randomIntBetween(10, 100);
|
||||
String name = "bar-0-";
|
||||
generatedBlobs.put(name, (long) length);
|
||||
writeRandomBlob(container, name, length);
|
||||
|
||||
Map<String, BlobMetaData> blobs = container.listBlobs();
|
||||
assertThat(blobs.size(), equalTo(numberOfFooBlobs + numberOfBarBlobs));
|
||||
for (Map.Entry<String, Long> generated : generatedBlobs.entrySet()) {
|
||||
BlobMetaData blobMetaData = blobs.get(generated.getKey());
|
||||
assertThat(generated.getKey(), blobMetaData, notNullValue());
|
||||
assertThat(blobMetaData.name(), equalTo(generated.getKey()));
|
||||
assertThat(blobMetaData.length(), equalTo(generated.getValue()));
|
||||
}
|
||||
|
||||
assertThat(container.listBlobsByPrefix("foo-").size(), equalTo(numberOfFooBlobs));
|
||||
assertThat(container.listBlobsByPrefix("bar-").size(), equalTo(numberOfBarBlobs));
|
||||
assertThat(container.listBlobsByPrefix("baz-").size(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeleteBlobs() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
for (String blobName : blobNames) {
|
||||
writeBlob(container, blobName, bytesArray, randomBoolean());
|
||||
}
|
||||
assertEquals(container.listBlobs().size(), 2);
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
assertTrue(container.listBlobs().isEmpty());
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
|
||||
}
|
||||
}
|
||||
|
||||
public void testVerifyOverwriteFails() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final String blobName = "foobar";
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
writeBlob(container, blobName, bytesArray, true);
|
||||
// should not be able to overwrite existing blob
|
||||
expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray, true));
|
||||
container.deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName));
|
||||
writeBlob(container, blobName, bytesArray, true); // after deleting the previous blob, we should be able to write to it again
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray,
|
||||
boolean failIfAlreadyExists) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
if (randomBoolean()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
|
||||
} else {
|
||||
container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testContainerCreationAndDeletion() throws IOException {
|
||||
try(BlobStore store = newBlobStore()) {
|
||||
final BlobContainer containerFoo = store.blobContainer(new BlobPath().add("foo"));
|
||||
final BlobContainer containerBar = store.blobContainer(new BlobPath().add("bar"));
|
||||
byte[] data1 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
byte[] data2 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(containerFoo, "test", new BytesArray(data1));
|
||||
writeBlob(containerBar, "test", new BytesArray(data2));
|
||||
|
||||
assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1);
|
||||
assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2);
|
||||
|
||||
assertTrue(BlobStoreTestUtil.blobExists(containerFoo, "test"));
|
||||
assertTrue(BlobStoreTestUtil.blobExists(containerBar, "test"));
|
||||
}
|
||||
}
|
||||
|
||||
public static byte[] writeRandomBlob(BlobContainer container, String name, int length) throws IOException {
|
||||
byte[] data = randomBytes(length);
|
||||
writeBlob(container, name, new BytesArray(data));
|
||||
return data;
|
||||
}
|
||||
|
||||
public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException {
|
||||
byte[] data = new byte[length];
|
||||
try (InputStream inputStream = container.readBlob(name)) {
|
||||
assertThat(inputStream.read(data), equalTo(length));
|
||||
assertThat(inputStream.read(), equalTo(-1));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
public static byte[] randomBytes(int length) {
|
||||
byte[] data = new byte[length];
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
data[i] = (byte) randomInt();
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length(), true);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract BlobStore newBlobStore() throws IOException;
|
||||
}
|
|
@ -18,7 +18,10 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories.blobstore;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
|
||||
|
@ -27,7 +30,10 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
|
|||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
|
@ -37,11 +43,17 @@ import org.elasticsearch.snapshots.SnapshotMissingException;
|
|||
import org.elasticsearch.snapshots.SnapshotRestoreException;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
@ -92,6 +104,166 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
|||
return name;
|
||||
}
|
||||
|
||||
public void testReadNonExistingPath() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
expectThrows(NoSuchFileException.class, () -> {
|
||||
try (InputStream is = container.readBlob("non-existing")) {
|
||||
is.read();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteRead() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
// override file, to check if we get latest contents
|
||||
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||
}
|
||||
try (InputStream stream = container.readBlob("foobar")) {
|
||||
BytesRefBuilder target = new BytesRefBuilder();
|
||||
while (target.length() < data.length) {
|
||||
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||
int offset = scaledRandomIntBetween(0, buffer.length - 1);
|
||||
int read = stream.read(buffer, offset, buffer.length - offset);
|
||||
target.append(new BytesRef(buffer, offset, read));
|
||||
}
|
||||
assertEquals(data.length, target.length());
|
||||
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||
}
|
||||
container.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public void testList() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
assertThat(container.listBlobs().size(), CoreMatchers.equalTo(0));
|
||||
int numberOfFooBlobs = randomIntBetween(0, 10);
|
||||
int numberOfBarBlobs = randomIntBetween(3, 20);
|
||||
Map<String, Long> generatedBlobs = new HashMap<>();
|
||||
for (int i = 0; i < numberOfFooBlobs; i++) {
|
||||
int length = randomIntBetween(10, 100);
|
||||
String name = "foo-" + i + "-";
|
||||
generatedBlobs.put(name, (long) length);
|
||||
writeRandomBlob(container, name, length);
|
||||
}
|
||||
for (int i = 1; i < numberOfBarBlobs; i++) {
|
||||
int length = randomIntBetween(10, 100);
|
||||
String name = "bar-" + i + "-";
|
||||
generatedBlobs.put(name, (long) length);
|
||||
writeRandomBlob(container, name, length);
|
||||
}
|
||||
int length = randomIntBetween(10, 100);
|
||||
String name = "bar-0-";
|
||||
generatedBlobs.put(name, (long) length);
|
||||
writeRandomBlob(container, name, length);
|
||||
|
||||
Map<String, BlobMetaData> blobs = container.listBlobs();
|
||||
assertThat(blobs.size(), CoreMatchers.equalTo(numberOfFooBlobs + numberOfBarBlobs));
|
||||
for (Map.Entry<String, Long> generated : generatedBlobs.entrySet()) {
|
||||
BlobMetaData blobMetaData = blobs.get(generated.getKey());
|
||||
assertThat(generated.getKey(), blobMetaData, CoreMatchers.notNullValue());
|
||||
assertThat(blobMetaData.name(), CoreMatchers.equalTo(generated.getKey()));
|
||||
assertThat(blobMetaData.length(), CoreMatchers.equalTo(generated.getValue()));
|
||||
}
|
||||
|
||||
assertThat(container.listBlobsByPrefix("foo-").size(), CoreMatchers.equalTo(numberOfFooBlobs));
|
||||
assertThat(container.listBlobsByPrefix("bar-").size(), CoreMatchers.equalTo(numberOfBarBlobs));
|
||||
assertThat(container.listBlobsByPrefix("baz-").size(), CoreMatchers.equalTo(0));
|
||||
container.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeleteBlobs() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
for (String blobName : blobNames) {
|
||||
writeBlob(container, blobName, bytesArray, randomBoolean());
|
||||
}
|
||||
assertEquals(container.listBlobs().size(), 2);
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
assertTrue(container.listBlobs().isEmpty());
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
|
||||
}
|
||||
}
|
||||
|
||||
public static void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray,
|
||||
boolean failIfAlreadyExists) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
if (randomBoolean()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
|
||||
} else {
|
||||
container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testContainerCreationAndDeletion() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final BlobContainer containerFoo = store.blobContainer(new BlobPath().add("foo"));
|
||||
final BlobContainer containerBar = store.blobContainer(new BlobPath().add("bar"));
|
||||
byte[] data1 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
byte[] data2 = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
writeBlob(containerFoo, "test", new BytesArray(data1));
|
||||
writeBlob(containerBar, "test", new BytesArray(data2));
|
||||
|
||||
assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1);
|
||||
assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2);
|
||||
|
||||
assertTrue(BlobStoreTestUtil.blobExists(containerFoo, "test"));
|
||||
assertTrue(BlobStoreTestUtil.blobExists(containerBar, "test"));
|
||||
containerBar.delete();
|
||||
containerFoo.delete();
|
||||
}
|
||||
}
|
||||
|
||||
public static byte[] writeRandomBlob(BlobContainer container, String name, int length) throws IOException {
|
||||
byte[] data = randomBytes(length);
|
||||
writeBlob(container, name, new BytesArray(data));
|
||||
return data;
|
||||
}
|
||||
|
||||
public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException {
|
||||
byte[] data = new byte[length];
|
||||
try (InputStream inputStream = container.readBlob(name)) {
|
||||
assertThat(inputStream.read(data), CoreMatchers.equalTo(length));
|
||||
assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
public static byte[] randomBytes(int length) {
|
||||
byte[] data = new byte[length];
|
||||
for (int i = 0; i < data.length; i++) {
|
||||
data[i] = (byte) randomInt();
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
||||
try (InputStream stream = bytesArray.streamInput()) {
|
||||
container.writeBlob(blobName, stream, bytesArray.length(), true);
|
||||
}
|
||||
}
|
||||
|
||||
protected BlobStore newBlobStore() {
|
||||
final String repository = createRepository(randomName());
|
||||
final BlobStoreRepository blobStoreRepository =
|
||||
(BlobStoreRepository) internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repository);
|
||||
return PlainActionFuture.get(
|
||||
f -> blobStoreRepository.threadPool().generic().execute(ActionRunnable.supply(f, blobStoreRepository::blobStore)));
|
||||
}
|
||||
|
||||
public void testSnapshotAndRestore() throws Exception {
|
||||
final String repoName = createRepository(randomName());
|
||||
int indexCount = randomIntBetween(1, 5);
|
||||
|
|
|
@ -76,7 +76,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
import static java.time.Clock.systemUTC;
|
||||
import static org.elasticsearch.repositories.ESBlobStoreContainerTestCase.randomBytes;
|
||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
|
Loading…
Reference in New Issue