Merge AzureStorageService and AzureStorageServiceImpl and clean up tests (#31607)
This pull request merges the AzureStorageService interface and the AzureStorageServiceImpl classes into one single AzureStorageService class. It also removes some tests in the repository-azure plugin that have not been executed for 2+ years.
This commit is contained in:
parent
7a76e3a4fa
commit
7a0a0f2a9e
|
@ -43,7 +43,7 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
||||||
|
|
||||||
public AzureRepositoryPlugin(Settings settings) {
|
public AzureRepositoryPlugin(Settings settings) {
|
||||||
// eagerly load client settings so that secure settings are read
|
// eagerly load client settings so that secure settings are read
|
||||||
this.azureStoreService = new AzureStorageServiceImpl(settings);
|
this.azureStoreService = new AzureStorageService(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,27 +19,59 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.azure;
|
package org.elasticsearch.repositories.azure;
|
||||||
|
|
||||||
|
import com.microsoft.azure.storage.AccessCondition;
|
||||||
|
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||||
import com.microsoft.azure.storage.OperationContext;
|
import com.microsoft.azure.storage.OperationContext;
|
||||||
|
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||||
|
import com.microsoft.azure.storage.RetryPolicy;
|
||||||
|
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||||
import com.microsoft.azure.storage.StorageException;
|
import com.microsoft.azure.storage.StorageException;
|
||||||
|
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||||
|
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||||
|
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||||
|
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||||
|
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||||
|
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||||
|
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
|
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||||
|
import org.elasticsearch.common.collect.MapBuilder;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.settings.SettingsException;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.file.FileAlreadyExistsException;
|
import java.nio.file.FileAlreadyExistsException;
|
||||||
|
import java.security.InvalidKeyException;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
import static java.util.Collections.emptyMap;
|
||||||
* Azure Storage Service interface
|
|
||||||
* @see AzureStorageServiceImpl for Azure REST API implementation
|
public class AzureStorageService extends AbstractComponent {
|
||||||
*/
|
|
||||||
public interface AzureStorageService {
|
public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
|
||||||
|
public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
|
||||||
|
|
||||||
|
// 'package' for testing
|
||||||
|
volatile Map<String, AzureStorageSettings> storageSettings = emptyMap();
|
||||||
|
|
||||||
|
public AzureStorageService(Settings settings) {
|
||||||
|
super(settings);
|
||||||
|
// eagerly load client settings so that secure settings are read
|
||||||
|
final Map<String, AzureStorageSettings> clientsSettings = AzureStorageSettings.load(settings);
|
||||||
|
refreshAndClearCache(clientsSettings);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a {@code CloudBlobClient} on each invocation using the current client
|
* Creates a {@code CloudBlobClient} on each invocation using the current client
|
||||||
|
@ -48,7 +80,46 @@ public interface AzureStorageService {
|
||||||
* thread for logically coupled ops. The {@code OperationContext} is used to
|
* thread for logically coupled ops. The {@code OperationContext} is used to
|
||||||
* specify the proxy, but a new context is *required* for each call.
|
* specify the proxy, but a new context is *required* for each call.
|
||||||
*/
|
*/
|
||||||
Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName);
|
public Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName) {
|
||||||
|
final AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
|
||||||
|
if (azureStorageSettings == null) {
|
||||||
|
throw new SettingsException("Unable to find client with name [" + clientName + "]");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return new Tuple<>(buildClient(azureStorageSettings), () -> buildOperationContext(azureStorageSettings));
|
||||||
|
} catch (InvalidKeyException | URISyntaxException | IllegalArgumentException e) {
|
||||||
|
throw new SettingsException("Invalid azure client settings with name [" + clientName + "]", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
|
||||||
|
final CloudBlobClient client = createClient(azureStorageSettings);
|
||||||
|
// Set timeout option if the user sets cloud.azure.storage.timeout or
|
||||||
|
// cloud.azure.storage.xxx.timeout (it's negative by default)
|
||||||
|
final long timeout = azureStorageSettings.getTimeout().getMillis();
|
||||||
|
if (timeout > 0) {
|
||||||
|
if (timeout > Integer.MAX_VALUE) {
|
||||||
|
throw new IllegalArgumentException("Timeout [" + azureStorageSettings.getTimeout() + "] exceeds 2,147,483,647ms.");
|
||||||
|
}
|
||||||
|
client.getDefaultRequestOptions().setTimeoutIntervalInMs((int) timeout);
|
||||||
|
}
|
||||||
|
// We define a default exponential retry policy
|
||||||
|
client.getDefaultRequestOptions()
|
||||||
|
.setRetryPolicyFactory(new RetryExponentialRetry(RetryPolicy.DEFAULT_CLIENT_BACKOFF, azureStorageSettings.getMaxRetries()));
|
||||||
|
client.getDefaultRequestOptions().setLocationMode(azureStorageSettings.getLocationMode());
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
|
||||||
|
final String connectionString = azureStorageSettings.buildConnectionString();
|
||||||
|
return CloudStorageAccount.parse(connectionString).createCloudBlobClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected OperationContext buildOperationContext(AzureStorageSettings azureStorageSettings) {
|
||||||
|
final OperationContext context = new OperationContext();
|
||||||
|
context.setProxy(azureStorageSettings.getProxy());
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates settings for building clients. Any client cache is cleared. Future
|
* Updates settings for building clients. Any client cache is cleared. Future
|
||||||
|
@ -57,32 +128,134 @@ public interface AzureStorageService {
|
||||||
* @param clientsSettings the settings for new clients
|
* @param clientsSettings the settings for new clients
|
||||||
* @return the old settings
|
* @return the old settings
|
||||||
*/
|
*/
|
||||||
Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings);
|
public Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings) {
|
||||||
|
final Map<String, AzureStorageSettings> prevSettings = this.storageSettings;
|
||||||
|
this.storageSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
|
||||||
|
// clients are built lazily by {@link client(String)}
|
||||||
|
return prevSettings;
|
||||||
|
}
|
||||||
|
|
||||||
ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
|
public boolean doesContainerExist(String account, String container) throws URISyntaxException, StorageException {
|
||||||
ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
return SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, client.v2().get()));
|
||||||
|
}
|
||||||
|
|
||||||
boolean doesContainerExist(String account, String container) throws URISyntaxException, StorageException;
|
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
|
||||||
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
// container name must be lower case.
|
||||||
|
logger.trace(() -> new ParameterizedMessage("delete files container [{}], path [{}]", container, path));
|
||||||
|
SocketAccess.doPrivilegedVoidException(() -> {
|
||||||
|
// list the blobs using a flat blob listing mode
|
||||||
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true, EnumSet.noneOf(BlobListingDetails.class), null,
|
||||||
|
client.v2().get())) {
|
||||||
|
final String blobName = blobNameFromUri(blobItem.getUri());
|
||||||
|
logger.trace(() -> new ParameterizedMessage("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri()));
|
||||||
|
// don't call {@code #deleteBlob}, use the same client
|
||||||
|
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blobName);
|
||||||
|
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void removeContainer(String account, String container) throws URISyntaxException, StorageException;
|
/**
|
||||||
|
* Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
|
||||||
|
* It should remove the container part (first part of the path) and gives path/to/myfile
|
||||||
|
* @param uri URI to parse
|
||||||
|
* @return The blob name relative to the container
|
||||||
|
*/
|
||||||
|
static String blobNameFromUri(URI uri) {
|
||||||
|
final String path = uri.getPath();
|
||||||
|
// We remove the container name from the path
|
||||||
|
// The 3 magic number cames from the fact if path is /container/path/to/myfile
|
||||||
|
// First occurrence is empty "/"
|
||||||
|
// Second occurrence is "container
|
||||||
|
// Last part contains "path/to/myfile" which is what we want to get
|
||||||
|
final String[] splits = path.split("/", 3);
|
||||||
|
// We return the remaining end of the string
|
||||||
|
return splits[2];
|
||||||
|
}
|
||||||
|
|
||||||
void createContainer(String account, String container) throws URISyntaxException, StorageException;
|
public boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException {
|
||||||
|
// Container name must be lower case.
|
||||||
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
return SocketAccess.doPrivilegedException(() -> {
|
||||||
|
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||||
|
return azureBlob.exists(null, null, client.v2().get());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException;
|
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
|
||||||
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
// Container name must be lower case.
|
||||||
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
|
||||||
|
SocketAccess.doPrivilegedVoidException(() -> {
|
||||||
|
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
||||||
|
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
|
||||||
|
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
boolean blobExists(String account, String container, String blob) throws URISyntaxException, StorageException;
|
public InputStream getInputStream(String account, String container, String blob)
|
||||||
|
throws URISyntaxException, StorageException, IOException {
|
||||||
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
|
||||||
|
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
|
||||||
|
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
|
||||||
|
blockBlobReference.openInputStream(null, null, client.v2().get()));
|
||||||
|
return giveSocketPermissionsToStream(is);
|
||||||
|
}
|
||||||
|
|
||||||
void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException;
|
public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
|
||||||
|
throws URISyntaxException, StorageException {
|
||||||
|
// NOTE: this should be here: if (prefix == null) prefix = "";
|
||||||
|
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
|
||||||
|
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
|
||||||
|
final MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
|
||||||
|
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||||
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
|
||||||
|
SocketAccess.doPrivilegedVoidException(() -> {
|
||||||
|
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
|
||||||
|
enumBlobListingDetails, null, client.v2().get())) {
|
||||||
|
final URI uri = blobItem.getUri();
|
||||||
|
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||||
|
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||||
|
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||||
|
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
|
||||||
|
final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
|
||||||
|
final String name = blobPath.substring(keyPath.length());
|
||||||
|
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
|
||||||
|
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return blobsBuilder.immutableMap();
|
||||||
|
}
|
||||||
|
|
||||||
InputStream getInputStream(String account, String container, String blob) throws URISyntaxException, StorageException, IOException;
|
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
|
||||||
|
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||||
|
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
||||||
|
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||||
|
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||||
|
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||||
|
try {
|
||||||
|
SocketAccess.doPrivilegedVoidException(() ->
|
||||||
|
blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get()));
|
||||||
|
} catch (final StorageException se) {
|
||||||
|
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
||||||
|
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
||||||
|
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
|
||||||
|
}
|
||||||
|
throw se;
|
||||||
|
}
|
||||||
|
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
|
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
|
||||||
throws URISyntaxException, StorageException;
|
|
||||||
|
|
||||||
void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
|
|
||||||
throws URISyntaxException, StorageException, FileAlreadyExistsException;
|
|
||||||
|
|
||||||
static InputStream giveSocketPermissionsToStream(InputStream stream) {
|
|
||||||
return new InputStream() {
|
return new InputStream() {
|
||||||
@Override
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
|
|
|
@ -1,270 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.repositories.azure;
|
|
||||||
|
|
||||||
import com.microsoft.azure.storage.AccessCondition;
|
|
||||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
|
||||||
import com.microsoft.azure.storage.OperationContext;
|
|
||||||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
|
||||||
import com.microsoft.azure.storage.RetryPolicy;
|
|
||||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
|
||||||
import com.microsoft.azure.storage.StorageException;
|
|
||||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
|
||||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
|
||||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
|
||||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
|
||||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
|
||||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.settings.SettingsException;
|
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
|
||||||
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.security.InvalidKeyException;
|
|
||||||
import java.nio.file.FileAlreadyExistsException;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
|
||||||
|
|
||||||
public class AzureStorageServiceImpl extends AbstractComponent implements AzureStorageService {
|
|
||||||
|
|
||||||
// 'package' for testing
|
|
||||||
volatile Map<String, AzureStorageSettings> storageSettings = emptyMap();
|
|
||||||
|
|
||||||
public AzureStorageServiceImpl(Settings settings) {
|
|
||||||
super(settings);
|
|
||||||
// eagerly load client settings so that secure settings are read
|
|
||||||
final Map<String, AzureStorageSettings> clientsSettings = AzureStorageSettings.load(settings);
|
|
||||||
refreshAndClearCache(clientsSettings);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Tuple<CloudBlobClient, Supplier<OperationContext>> client(String clientName) {
|
|
||||||
final AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
|
|
||||||
if (azureStorageSettings == null) {
|
|
||||||
throw new SettingsException("Unable to find client with name [" + clientName + "]");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return new Tuple<>(buildClient(azureStorageSettings), () -> buildOperationContext(azureStorageSettings));
|
|
||||||
} catch (InvalidKeyException | URISyntaxException | IllegalArgumentException e) {
|
|
||||||
throw new SettingsException("Invalid azure client settings with name [" + clientName + "]", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
|
|
||||||
final CloudBlobClient client = createClient(azureStorageSettings);
|
|
||||||
// Set timeout option if the user sets cloud.azure.storage.timeout or
|
|
||||||
// cloud.azure.storage.xxx.timeout (it's negative by default)
|
|
||||||
final long timeout = azureStorageSettings.getTimeout().getMillis();
|
|
||||||
if (timeout > 0) {
|
|
||||||
if (timeout > Integer.MAX_VALUE) {
|
|
||||||
throw new IllegalArgumentException("Timeout [" + azureStorageSettings.getTimeout() + "] exceeds 2,147,483,647ms.");
|
|
||||||
}
|
|
||||||
client.getDefaultRequestOptions().setTimeoutIntervalInMs((int) timeout);
|
|
||||||
}
|
|
||||||
// We define a default exponential retry policy
|
|
||||||
client.getDefaultRequestOptions()
|
|
||||||
.setRetryPolicyFactory(new RetryExponentialRetry(RetryPolicy.DEFAULT_CLIENT_BACKOFF, azureStorageSettings.getMaxRetries()));
|
|
||||||
client.getDefaultRequestOptions().setLocationMode(azureStorageSettings.getLocationMode());
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException {
|
|
||||||
final String connectionString = azureStorageSettings.buildConnectionString();
|
|
||||||
return CloudStorageAccount.parse(connectionString).createCloudBlobClient();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected OperationContext buildOperationContext(AzureStorageSettings azureStorageSettings) {
|
|
||||||
final OperationContext context = new OperationContext();
|
|
||||||
context.setProxy(azureStorageSettings.getProxy());
|
|
||||||
return context;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, AzureStorageSettings> refreshAndClearCache(Map<String, AzureStorageSettings> clientsSettings) {
|
|
||||||
final Map<String, AzureStorageSettings> prevSettings = this.storageSettings;
|
|
||||||
this.storageSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
|
|
||||||
// clients are built lazily by {@link client(String)}
|
|
||||||
return prevSettings;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean doesContainerExist(String account, String container) throws URISyntaxException, StorageException {
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
return SocketAccess.doPrivilegedException(() -> blobContainer.exists(null, null, client.v2().get()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void removeContainer(String account, String container) throws URISyntaxException, StorageException {
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
logger.trace(() -> new ParameterizedMessage("removing container [{}]", container));
|
|
||||||
SocketAccess.doPrivilegedException(() -> blobContainer.deleteIfExists(null, null, client.v2().get()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void createContainer(String account, String container) throws URISyntaxException, StorageException {
|
|
||||||
try {
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
logger.trace(() -> new ParameterizedMessage("creating container [{}]", container));
|
|
||||||
SocketAccess.doPrivilegedException(() -> blobContainer.createIfNotExists(null, null, client.v2().get()));
|
|
||||||
} catch (final IllegalArgumentException e) {
|
|
||||||
logger.trace(() -> new ParameterizedMessage("failed creating container [{}]", container), e);
|
|
||||||
throw new RepositoryException(container, e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
// container name must be lower case.
|
|
||||||
logger.trace(() -> new ParameterizedMessage("delete files container [{}], path [{}]", container, path));
|
|
||||||
SocketAccess.doPrivilegedVoidException(() -> {
|
|
||||||
// list the blobs using a flat blob listing mode
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true, EnumSet.noneOf(BlobListingDetails.class), null,
|
|
||||||
client.v2().get())) {
|
|
||||||
final String blobName = blobNameFromUri(blobItem.getUri());
|
|
||||||
logger.trace(() -> new ParameterizedMessage("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri()));
|
|
||||||
// don't call {@code #deleteBlob}, use the same client
|
|
||||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blobName);
|
|
||||||
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
|
|
||||||
* It should remove the container part (first part of the path) and gives path/to/myfile
|
|
||||||
* @param uri URI to parse
|
|
||||||
* @return The blob name relative to the container
|
|
||||||
*/
|
|
||||||
static String blobNameFromUri(URI uri) {
|
|
||||||
final String path = uri.getPath();
|
|
||||||
// We remove the container name from the path
|
|
||||||
// The 3 magic number cames from the fact if path is /container/path/to/myfile
|
|
||||||
// First occurrence is empty "/"
|
|
||||||
// Second occurrence is "container
|
|
||||||
// Last part contains "path/to/myfile" which is what we want to get
|
|
||||||
final String[] splits = path.split("/", 3);
|
|
||||||
// We return the remaining end of the string
|
|
||||||
return splits[2];
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean blobExists(String account, String container, String blob)
|
|
||||||
throws URISyntaxException, StorageException {
|
|
||||||
// Container name must be lower case.
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
return SocketAccess.doPrivilegedException(() -> {
|
|
||||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
|
||||||
return azureBlob.exists(null, null, client.v2().get());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
// Container name must be lower case.
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
|
|
||||||
SocketAccess.doPrivilegedVoidException(() -> {
|
|
||||||
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
|
|
||||||
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
|
|
||||||
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InputStream getInputStream(String account, String container, String blob) throws URISyntaxException,
|
|
||||||
StorageException {
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
|
|
||||||
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
|
|
||||||
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
|
|
||||||
blockBlobReference.openInputStream(null, null, client.v2().get()));
|
|
||||||
return AzureStorageService.giveSocketPermissionsToStream(is);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
|
|
||||||
throws URISyntaxException, StorageException {
|
|
||||||
// NOTE: this should be here: if (prefix == null) prefix = "";
|
|
||||||
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
|
|
||||||
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
|
|
||||||
final MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
|
|
||||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
|
|
||||||
SocketAccess.doPrivilegedVoidException(() -> {
|
|
||||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
|
|
||||||
enumBlobListingDetails, null, client.v2().get())) {
|
|
||||||
final URI uri = blobItem.getUri();
|
|
||||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
|
||||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
|
||||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
|
||||||
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
|
|
||||||
final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
|
|
||||||
final String name = blobPath.substring(keyPath.length());
|
|
||||||
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
|
|
||||||
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return blobsBuilder.immutableMap();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
|
|
||||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
|
||||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
|
||||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
|
||||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
|
||||||
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
|
||||||
try {
|
|
||||||
SocketAccess.doPrivilegedVoidException(() ->
|
|
||||||
blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get()));
|
|
||||||
} catch (final StorageException se) {
|
|
||||||
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
|
||||||
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
|
||||||
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
|
|
||||||
}
|
|
||||||
throw se;
|
|
||||||
}
|
|
||||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,419 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.repositories.azure;
|
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
|
||||||
import com.microsoft.azure.storage.StorageException;
|
|
||||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
|
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
|
||||||
import org.elasticsearch.client.Client;
|
|
||||||
import org.elasticsearch.client.ClusterAdminClient;
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.common.Strings;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
||||||
import org.elasticsearch.plugins.Plugin;
|
|
||||||
import org.elasticsearch.repositories.RepositoryMissingException;
|
|
||||||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
|
||||||
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
|
|
||||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
|
||||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
|
||||||
import org.elasticsearch.snapshots.SnapshotRestoreException;
|
|
||||||
import org.elasticsearch.snapshots.SnapshotState;
|
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
|
||||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
|
||||||
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
|
|
||||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
|
||||||
import org.elasticsearch.test.store.MockFSIndexStore;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.azure.AzureTestUtils.generateMockSecureSettings;
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
|
||||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Those integration tests need an Azure access and must be run with
|
|
||||||
* {@code -Dtests.thirdparty=true -Dtests.azure.account=AzureStorageAccount -Dtests.azure.key=AzureStorageKey}
|
|
||||||
* options
|
|
||||||
*/
|
|
||||||
@ClusterScope(
|
|
||||||
scope = ESIntegTestCase.Scope.SUITE,
|
|
||||||
supportsDedicatedMasters = false, numDataNodes = 1,
|
|
||||||
transportClientRatio = 0.0)
|
|
||||||
@ThirdParty
|
|
||||||
public class AzureSnapshotRestoreTests extends ESBlobStoreRepositoryIntegTestCase {
|
|
||||||
|
|
||||||
private static Settings.Builder generateMockSettings() {
|
|
||||||
return Settings.builder().setSecureSettings(generateMockSecureSettings());
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("resource")
|
|
||||||
private static AzureStorageService getAzureStorageService() {
|
|
||||||
return new AzureRepositoryPlugin(generateMockSettings().build()).azureStoreService;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Settings nodeSettings(int nodeOrdinal) {
|
|
||||||
return generateMockSettings()
|
|
||||||
.put(super.nodeSettings(nodeOrdinal))
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getContainerName() {
|
|
||||||
/* Have a different name per test so that there is no possible race condition. As the long can be negative,
|
|
||||||
* there mustn't be a hyphen between the 2 concatenated numbers
|
|
||||||
* (can't have 2 consecutives hyphens on Azure containers)
|
|
||||||
*/
|
|
||||||
final String testName = "snapshot-itest-"
|
|
||||||
.concat(RandomizedTest.getContext().getRunnerSeedAsString().toLowerCase(Locale.ROOT));
|
|
||||||
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void createTestContainers() throws Exception {
|
|
||||||
createTestContainer(getContainerName());
|
|
||||||
// This is needed for testMultipleRepositories() test case
|
|
||||||
createTestContainer(getContainerName() + "-1");
|
|
||||||
createTestContainer(getContainerName() + "-2");
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void removeContainer() throws Exception {
|
|
||||||
removeTestContainer(getContainerName());
|
|
||||||
// This is needed for testMultipleRepositories() test case
|
|
||||||
removeTestContainer(getContainerName() + "-1");
|
|
||||||
removeTestContainer(getContainerName() + "-2");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a test container in Azure
|
|
||||||
* @param containerName container name to use
|
|
||||||
*/
|
|
||||||
private static void createTestContainer(String containerName) throws Exception {
|
|
||||||
// It could happen that we run this test really close to a previous one
|
|
||||||
// so we might need some time to be able to create the container
|
|
||||||
assertBusy(() -> {
|
|
||||||
getAzureStorageService().createContainer("default", containerName);
|
|
||||||
}, 30, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove a test container in Azure
|
|
||||||
* @param containerName container name to use
|
|
||||||
*/
|
|
||||||
private static void removeTestContainer(String containerName) throws URISyntaxException, StorageException {
|
|
||||||
getAzureStorageService().removeContainer("default", containerName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
||||||
return Arrays.asList(AzureRepositoryPlugin.class, MockFSIndexStore.TestPlugin.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getRepositoryPath() {
|
|
||||||
final String testName = "it-" + getTestName();
|
|
||||||
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Settings indexSettings() {
|
|
||||||
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
|
|
||||||
// checksum file to be written twice during restore operation
|
|
||||||
return Settings.builder().put(super.indexSettings())
|
|
||||||
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE_SETTING.getKey(), false)
|
|
||||||
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE_SETTING.getKey(), false)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public final void wipeAzureRepositories() {
|
|
||||||
try {
|
|
||||||
client().admin().cluster().prepareDeleteRepository("*").get();
|
|
||||||
} catch (final RepositoryMissingException ignored) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testMultipleRepositories() {
|
|
||||||
final Client client = client();
|
|
||||||
logger.info("--> creating azure repository with path [{}]", getRepositoryPath());
|
|
||||||
final PutRepositoryResponse putRepositoryResponse1 = client.admin().cluster().preparePutRepository("test-repo1")
|
|
||||||
.setType("azure").setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName().concat("-1"))
|
|
||||||
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
|
|
||||||
.put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponse1.isAcknowledged(), equalTo(true));
|
|
||||||
final PutRepositoryResponse putRepositoryResponse2 = client.admin().cluster().preparePutRepository("test-repo2")
|
|
||||||
.setType("azure").setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName().concat("-2"))
|
|
||||||
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
|
|
||||||
.put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponse2.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
createIndex("test-idx-1", "test-idx-2");
|
|
||||||
ensureGreen();
|
|
||||||
|
|
||||||
logger.info("--> indexing some data");
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
|
|
||||||
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
|
|
||||||
}
|
|
||||||
refresh();
|
|
||||||
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
|
||||||
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
|
||||||
|
|
||||||
logger.info("--> snapshot 1");
|
|
||||||
final CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot("test-repo1", "test-snap")
|
|
||||||
.setWaitForCompletion(true).setIndices("test-idx-1").get();
|
|
||||||
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0));
|
|
||||||
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(),
|
|
||||||
equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards()));
|
|
||||||
|
|
||||||
logger.info("--> snapshot 2");
|
|
||||||
final CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot("test-repo2", "test-snap")
|
|
||||||
.setWaitForCompletion(true).setIndices("test-idx-2").get();
|
|
||||||
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
|
|
||||||
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(),
|
|
||||||
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()));
|
|
||||||
|
|
||||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo1").setSnapshots("test-snap").get().getSnapshots().get(0).state(),
|
|
||||||
equalTo(SnapshotState.SUCCESS));
|
|
||||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo2").setSnapshots("test-snap").get().getSnapshots().get(0).state(),
|
|
||||||
equalTo(SnapshotState.SUCCESS));
|
|
||||||
|
|
||||||
// Test restore after index deletion
|
|
||||||
logger.info("--> delete indices");
|
|
||||||
cluster().wipeIndices("test-idx-1", "test-idx-2");
|
|
||||||
logger.info("--> restore one index after deletion from snapshot 1");
|
|
||||||
final RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin().cluster().prepareRestoreSnapshot("test-repo1", "test-snap")
|
|
||||||
.setWaitForCompletion(true).setIndices("test-idx-1").get();
|
|
||||||
assertThat(restoreSnapshotResponse1.getRestoreInfo().totalShards(), greaterThan(0));
|
|
||||||
ensureGreen();
|
|
||||||
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
|
||||||
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
|
|
||||||
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
|
|
||||||
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
|
|
||||||
|
|
||||||
logger.info("--> restore other index after deletion from snapshot 2");
|
|
||||||
final RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin().cluster().prepareRestoreSnapshot("test-repo2", "test-snap")
|
|
||||||
.setWaitForCompletion(true).setIndices("test-idx-2").get();
|
|
||||||
assertThat(restoreSnapshotResponse2.getRestoreInfo().totalShards(), greaterThan(0));
|
|
||||||
ensureGreen();
|
|
||||||
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
|
||||||
clusterState = client.admin().cluster().prepareState().get().getState();
|
|
||||||
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
|
|
||||||
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(true));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For issue #26: https://github.com/elastic/elasticsearch-cloud-azure/issues/26
|
|
||||||
*/
|
|
||||||
public void testListBlobs_26() throws StorageException, URISyntaxException {
|
|
||||||
final String repositoryName="test-repo-26";
|
|
||||||
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
|
|
||||||
ensureGreen();
|
|
||||||
|
|
||||||
logger.info("--> indexing some data");
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
|
|
||||||
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
|
|
||||||
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
|
|
||||||
}
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
final ClusterAdminClient client = client().admin().cluster();
|
|
||||||
logger.info("--> creating azure repository without any path");
|
|
||||||
PutRepositoryResponse putRepositoryResponse = client.preparePutRepository(repositoryName).setType("azure")
|
|
||||||
.setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
// Get all snapshots - should be empty
|
|
||||||
assertThat(client.prepareGetSnapshots(repositoryName).get().getSnapshots().size(), equalTo(0));
|
|
||||||
|
|
||||||
logger.info("--> snapshot");
|
|
||||||
CreateSnapshotResponse createSnapshotResponse = client.prepareCreateSnapshot(repositoryName, "test-snap-26")
|
|
||||||
.setWaitForCompletion(true).setIndices("test-idx-*").get();
|
|
||||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
|
||||||
|
|
||||||
// Get all snapshots - should have one
|
|
||||||
assertThat(client.prepareGetSnapshots(repositoryName).get().getSnapshots().size(), equalTo(1));
|
|
||||||
|
|
||||||
// Clean the snapshot
|
|
||||||
client.prepareDeleteSnapshot(repositoryName, "test-snap-26").get();
|
|
||||||
client.prepareDeleteRepository(repositoryName).get();
|
|
||||||
|
|
||||||
logger.info("--> creating azure repository path [{}]", getRepositoryPath());
|
|
||||||
putRepositoryResponse = client.preparePutRepository(repositoryName).setType("azure")
|
|
||||||
.setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
// Get all snapshots - should be empty
|
|
||||||
assertThat(client.prepareGetSnapshots(repositoryName).get().getSnapshots().size(), equalTo(0));
|
|
||||||
|
|
||||||
logger.info("--> snapshot");
|
|
||||||
createSnapshotResponse = client.prepareCreateSnapshot(repositoryName, "test-snap-26").setWaitForCompletion(true)
|
|
||||||
.setIndices("test-idx-*").get();
|
|
||||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
|
||||||
|
|
||||||
// Get all snapshots - should have one
|
|
||||||
assertThat(client.prepareGetSnapshots(repositoryName).get().getSnapshots().size(), equalTo(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For issue #28: https://github.com/elastic/elasticsearch-cloud-azure/issues/28
|
|
||||||
*/
|
|
||||||
public void testGetDeleteNonExistingSnapshot_28() throws StorageException, URISyntaxException {
|
|
||||||
final String repositoryName="test-repo-28";
|
|
||||||
final ClusterAdminClient client = client().admin().cluster();
|
|
||||||
logger.info("--> creating azure repository without any path");
|
|
||||||
final PutRepositoryResponse putRepositoryResponse = client.preparePutRepository(repositoryName).setType("azure")
|
|
||||||
.setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
try {
|
|
||||||
client.prepareGetSnapshots(repositoryName).addSnapshots("nonexistingsnapshotname").get();
|
|
||||||
fail("Shouldn't be here");
|
|
||||||
} catch (final SnapshotMissingException ex) {
|
|
||||||
// Expected
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
client.prepareDeleteSnapshot(repositoryName, "nonexistingsnapshotname").get();
|
|
||||||
fail("Shouldn't be here");
|
|
||||||
} catch (final SnapshotMissingException ex) {
|
|
||||||
// Expected
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test case for issue #23: https://github.com/elastic/elasticsearch-cloud-azure/issues/23
|
|
||||||
*/
|
|
||||||
public void testNonExistingRepo_23() {
|
|
||||||
final String repositoryName = "test-repo-test23";
|
|
||||||
final Client client = client();
|
|
||||||
logger.info("--> creating azure repository with path [{}]", getRepositoryPath());
|
|
||||||
final PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName)
|
|
||||||
.setType("azure").setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
|
|
||||||
.put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
logger.info("--> restore non existing snapshot");
|
|
||||||
try {
|
|
||||||
client.admin().cluster().prepareRestoreSnapshot(repositoryName, "no-existing-snapshot").setWaitForCompletion(true).get();
|
|
||||||
fail("Shouldn't be here");
|
|
||||||
} catch (final SnapshotRestoreException ex) {
|
|
||||||
// Expected
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* When a user remove a container you can not immediately create it again.
|
|
||||||
*/
|
|
||||||
public void testRemoveAndCreateContainer() throws Exception {
|
|
||||||
final String container = getContainerName().concat("-testremove");
|
|
||||||
|
|
||||||
createTestContainer(container);
|
|
||||||
removeTestContainer(container);
|
|
||||||
|
|
||||||
final ClusterAdminClient client = client().admin().cluster();
|
|
||||||
logger.info("--> creating azure repository while container is being removed");
|
|
||||||
try {
|
|
||||||
client.preparePutRepository("test-repo").setType("azure")
|
|
||||||
.setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), container)
|
|
||||||
).get();
|
|
||||||
fail("we should get a RepositoryVerificationException");
|
|
||||||
} catch (final RepositoryVerificationException e) {
|
|
||||||
// Fine we expect that
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that you can snapshot on the primary repository and list the available snapshots
|
|
||||||
* from the secondary repository.
|
|
||||||
*
|
|
||||||
* Note that this test requires an Azure storage account which must be a Read-access geo-redundant
|
|
||||||
* storage (RA-GRS) account type.
|
|
||||||
* @throws Exception If anything goes wrong
|
|
||||||
*/
|
|
||||||
public void testGeoRedundantStorage() throws Exception {
|
|
||||||
final Client client = client();
|
|
||||||
logger.info("--> creating azure primary repository");
|
|
||||||
final PutRepositoryResponse putRepositoryResponsePrimary = client.admin().cluster().preparePutRepository("primary")
|
|
||||||
.setType("azure").setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponsePrimary.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
logger.info("--> start get snapshots on primary");
|
|
||||||
long startWait = System.currentTimeMillis();
|
|
||||||
client.admin().cluster().prepareGetSnapshots("primary").get();
|
|
||||||
long endWait = System.currentTimeMillis();
|
|
||||||
// definitely should be done in 30s, and if its not working as expected, it takes over 1m
|
|
||||||
assertThat(endWait - startWait, lessThanOrEqualTo(30000L));
|
|
||||||
|
|
||||||
logger.info("--> creating azure secondary repository");
|
|
||||||
final PutRepositoryResponse putRepositoryResponseSecondary = client.admin().cluster().preparePutRepository("secondary")
|
|
||||||
.setType("azure").setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
.put(Repository.LOCATION_MODE_SETTING.getKey(), "secondary_only")
|
|
||||||
).get();
|
|
||||||
assertThat(putRepositoryResponseSecondary.isAcknowledged(), equalTo(true));
|
|
||||||
|
|
||||||
logger.info("--> start get snapshots on secondary");
|
|
||||||
startWait = System.currentTimeMillis();
|
|
||||||
client.admin().cluster().prepareGetSnapshots("secondary").get();
|
|
||||||
endWait = System.currentTimeMillis();
|
|
||||||
logger.info("--> end of get snapshots on secondary. Took {} ms", endWait - startWait);
|
|
||||||
assertThat(endWait - startWait, lessThanOrEqualTo(30000L));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void createTestRepository(String name) {
|
|
||||||
assertAcked(client().admin().cluster().preparePutRepository(name)
|
|
||||||
.setType(AzureRepository.TYPE)
|
|
||||||
.setSettings(Settings.builder()
|
|
||||||
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
|
|
||||||
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
|
|
||||||
.put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
import org.elasticsearch.common.collect.MapBuilder;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.core.internal.io.Streams;
|
import org.elasticsearch.core.internal.io.Streams;
|
||||||
|
|
||||||
|
@ -49,7 +48,7 @@ import static java.util.Collections.emptyMap;
|
||||||
/**
|
/**
|
||||||
* In memory storage for unit tests
|
* In memory storage for unit tests
|
||||||
*/
|
*/
|
||||||
public class AzureStorageServiceMock extends AbstractComponent implements AzureStorageService {
|
public class AzureStorageServiceMock extends AzureStorageService {
|
||||||
|
|
||||||
protected final Map<String, ByteArrayOutputStream> blobs = new ConcurrentHashMap<>();
|
protected final Map<String, ByteArrayOutputStream> blobs = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@ -62,14 +61,6 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void removeContainer(String account, String container) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void createContainer(String account, String container) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
|
public void deleteFiles(String account, String container, String path) throws URISyntaxException, StorageException {
|
||||||
final Map<String, BlobMetaData> blobs = listBlobsByPrefix(account, container, path, null);
|
final Map<String, BlobMetaData> blobs = listBlobsByPrefix(account, container, path, null);
|
||||||
|
|
|
@ -37,7 +37,7 @@ import java.net.UnknownHostException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.azure.AzureStorageServiceImpl.blobNameFromUri;
|
import static org.elasticsearch.repositories.azure.AzureStorageService.blobNameFromUri;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
@ -64,7 +64,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
final Settings settings = Settings.builder().setSecureSettings(buildSecureSettings())
|
final Settings settings = Settings.builder().setSecureSettings(buildSecureSettings())
|
||||||
.put("azure.client.azure1.endpoint_suffix", "my_endpoint_suffix").build();
|
.put("azure.client.azure1.endpoint_suffix", "my_endpoint_suffix").build();
|
||||||
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings)) {
|
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings)) {
|
||||||
final AzureStorageServiceImpl azureStorageService = (AzureStorageServiceImpl) plugin.azureStoreService;
|
final AzureStorageService azureStorageService = plugin.azureStoreService;
|
||||||
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client1.getEndpoint().toString(), equalTo("https://myaccount1.blob.my_endpoint_suffix"));
|
assertThat(client1.getEndpoint().toString(), equalTo("https://myaccount1.blob.my_endpoint_suffix"));
|
||||||
final CloudBlobClient client2 = azureStorageService.client("azure2").v1();
|
final CloudBlobClient client2 = azureStorageService.client("azure2").v1();
|
||||||
|
@ -86,7 +86,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
secureSettings2.setString("azure.client.azure3.key", encodeKey("mykey23"));
|
secureSettings2.setString("azure.client.azure3.key", encodeKey("mykey23"));
|
||||||
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
|
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
|
||||||
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings1)) {
|
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings1)) {
|
||||||
final AzureStorageServiceImpl azureStorageService = (AzureStorageServiceImpl) plugin.azureStoreService;
|
final AzureStorageService azureStorageService = plugin.azureStoreService;
|
||||||
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount11.blob.core.windows.net"));
|
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount11.blob.core.windows.net"));
|
||||||
final CloudBlobClient client12 = azureStorageService.client("azure2").v1();
|
final CloudBlobClient client12 = azureStorageService.client("azure2").v1();
|
||||||
|
@ -118,7 +118,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
secureSettings.setString("azure.client.azure1.key", encodeKey("mykey11"));
|
secureSettings.setString("azure.client.azure1.key", encodeKey("mykey11"));
|
||||||
final Settings settings = Settings.builder().setSecureSettings(secureSettings).build();
|
final Settings settings = Settings.builder().setSecureSettings(secureSettings).build();
|
||||||
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings)) {
|
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings)) {
|
||||||
final AzureStorageServiceImpl azureStorageService = (AzureStorageServiceImpl) plugin.azureStoreService;
|
final AzureStorageService azureStorageService = plugin.azureStoreService;
|
||||||
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
|
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
|
||||||
// reinit with empty settings
|
// reinit with empty settings
|
||||||
|
@ -142,7 +142,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
// missing key
|
// missing key
|
||||||
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
|
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
|
||||||
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings1)) {
|
try (AzureRepositoryPlugin plugin = new AzureRepositoryPlugin(settings1)) {
|
||||||
final AzureStorageServiceImpl azureStorageService = (AzureStorageServiceImpl) plugin.azureStoreService;
|
final AzureStorageService azureStorageService = plugin.azureStoreService;
|
||||||
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client11 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
|
assertThat(client11.getEndpoint().toString(), equalTo("https://myaccount1.blob.core.windows.net"));
|
||||||
plugin.reload(settings2);
|
plugin.reload(settings2);
|
||||||
|
@ -154,7 +154,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetSelectedClientNonExisting() {
|
public void testGetSelectedClientNonExisting() {
|
||||||
final AzureStorageServiceImpl azureStorageService = new AzureStorageServiceImpl(buildSettings());
|
final AzureStorageService azureStorageService = new AzureStorageService(buildSettings());
|
||||||
final SettingsException e = expectThrows(SettingsException.class, () -> azureStorageService.client("azure4"));
|
final SettingsException e = expectThrows(SettingsException.class, () -> azureStorageService.client("azure4"));
|
||||||
assertThat(e.getMessage(), is("Unable to find client with name [azure4]"));
|
assertThat(e.getMessage(), is("Unable to find client with name [azure4]"));
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.setSecureSettings(buildSecureSettings())
|
.setSecureSettings(buildSecureSettings())
|
||||||
.put("azure.client.azure3.timeout", "30s")
|
.put("azure.client.azure3.timeout", "30s")
|
||||||
.build();
|
.build();
|
||||||
final AzureStorageServiceImpl azureStorageService = new AzureStorageServiceImpl(timeoutSettings);
|
final AzureStorageService azureStorageService = new AzureStorageService(timeoutSettings);
|
||||||
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), nullValue());
|
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), nullValue());
|
||||||
final CloudBlobClient client3 = azureStorageService.client("azure3").v1();
|
final CloudBlobClient client3 = azureStorageService.client("azure3").v1();
|
||||||
|
@ -172,13 +172,13 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetSelectedClientNoTimeout() {
|
public void testGetSelectedClientNoTimeout() {
|
||||||
final AzureStorageServiceImpl azureStorageService = new AzureStorageServiceImpl(buildSettings());
|
final AzureStorageService azureStorageService = new AzureStorageService(buildSettings());
|
||||||
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(nullValue()));
|
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetSelectedClientBackoffPolicy() {
|
public void testGetSelectedClientBackoffPolicy() {
|
||||||
final AzureStorageServiceImpl azureStorageService = new AzureStorageServiceImpl(buildSettings());
|
final AzureStorageService azureStorageService = new AzureStorageService(buildSettings());
|
||||||
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
|
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
|
||||||
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
|
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
|
||||||
|
@ -190,7 +190,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.max_retries", 7)
|
.put("azure.client.azure1.max_retries", 7)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final AzureStorageServiceImpl azureStorageService = new AzureStorageServiceImpl(timeoutSettings);
|
final AzureStorageService azureStorageService = new AzureStorageService(timeoutSettings);
|
||||||
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
final CloudBlobClient client1 = azureStorageService.client("azure1").v1();
|
||||||
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
|
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
|
||||||
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
|
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
|
||||||
|
@ -200,7 +200,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
final Settings settings = Settings.builder()
|
final Settings settings = Settings.builder()
|
||||||
.setSecureSettings(buildSecureSettings())
|
.setSecureSettings(buildSecureSettings())
|
||||||
.build();
|
.build();
|
||||||
final AzureStorageServiceImpl mock = new AzureStorageServiceImpl(settings);
|
final AzureStorageService mock = new AzureStorageService(settings);
|
||||||
assertThat(mock.storageSettings.get("azure1").getProxy(), nullValue());
|
assertThat(mock.storageSettings.get("azure1").getProxy(), nullValue());
|
||||||
assertThat(mock.storageSettings.get("azure2").getProxy(), nullValue());
|
assertThat(mock.storageSettings.get("azure2").getProxy(), nullValue());
|
||||||
assertThat(mock.storageSettings.get("azure3").getProxy(), nullValue());
|
assertThat(mock.storageSettings.get("azure3").getProxy(), nullValue());
|
||||||
|
@ -213,7 +213,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.proxy.port", 8080)
|
.put("azure.client.azure1.proxy.port", 8080)
|
||||||
.put("azure.client.azure1.proxy.type", "http")
|
.put("azure.client.azure1.proxy.type", "http")
|
||||||
.build();
|
.build();
|
||||||
final AzureStorageServiceImpl mock = new AzureStorageServiceImpl(settings);
|
final AzureStorageService mock = new AzureStorageService(settings);
|
||||||
final Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
|
final Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
|
||||||
|
|
||||||
assertThat(azure1Proxy, notNullValue());
|
assertThat(azure1Proxy, notNullValue());
|
||||||
|
@ -233,7 +233,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure2.proxy.port", 8081)
|
.put("azure.client.azure2.proxy.port", 8081)
|
||||||
.put("azure.client.azure2.proxy.type", "http")
|
.put("azure.client.azure2.proxy.type", "http")
|
||||||
.build();
|
.build();
|
||||||
final AzureStorageServiceImpl mock = new AzureStorageServiceImpl(settings);
|
final AzureStorageService mock = new AzureStorageService(settings);
|
||||||
final Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
|
final Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
|
||||||
assertThat(azure1Proxy, notNullValue());
|
assertThat(azure1Proxy, notNullValue());
|
||||||
assertThat(azure1Proxy.type(), is(Proxy.Type.HTTP));
|
assertThat(azure1Proxy.type(), is(Proxy.Type.HTTP));
|
||||||
|
@ -252,7 +252,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.proxy.port", 8080)
|
.put("azure.client.azure1.proxy.port", 8080)
|
||||||
.put("azure.client.azure1.proxy.type", "socks")
|
.put("azure.client.azure1.proxy.type", "socks")
|
||||||
.build();
|
.build();
|
||||||
final AzureStorageServiceImpl mock = new AzureStorageServiceImpl(settings);
|
final AzureStorageService mock = new AzureStorageService(settings);
|
||||||
final Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
|
final Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
|
||||||
assertThat(azure1Proxy, notNullValue());
|
assertThat(azure1Proxy, notNullValue());
|
||||||
assertThat(azure1Proxy.type(), is(Proxy.Type.SOCKS));
|
assertThat(azure1Proxy.type(), is(Proxy.Type.SOCKS));
|
||||||
|
@ -267,7 +267,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.proxy.port", 8080)
|
.put("azure.client.azure1.proxy.port", 8080)
|
||||||
.put("azure.client.azure1.proxy.type", randomFrom("socks", "http"))
|
.put("azure.client.azure1.proxy.type", randomFrom("socks", "http"))
|
||||||
.build();
|
.build();
|
||||||
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceImpl(settings));
|
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageService(settings));
|
||||||
assertEquals("Azure Proxy type has been set but proxy host or port is not defined.", e.getMessage());
|
assertEquals("Azure Proxy type has been set but proxy host or port is not defined.", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,7 +278,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.proxy.type", randomFrom("socks", "http"))
|
.put("azure.client.azure1.proxy.type", randomFrom("socks", "http"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceImpl(settings));
|
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageService(settings));
|
||||||
assertEquals("Azure Proxy type has been set but proxy host or port is not defined.", e.getMessage());
|
assertEquals("Azure Proxy type has been set but proxy host or port is not defined.", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.proxy.port", 8080)
|
.put("azure.client.azure1.proxy.port", 8080)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceImpl(settings));
|
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageService(settings));
|
||||||
assertEquals("Azure Proxy port or host have been set but proxy type is not defined.", e.getMessage());
|
assertEquals("Azure Proxy port or host have been set but proxy type is not defined.", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -301,7 +301,7 @@ public class AzureStorageServiceTests extends ESTestCase {
|
||||||
.put("azure.client.azure1.proxy.port", 8080)
|
.put("azure.client.azure1.proxy.port", 8080)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceImpl(settings));
|
final SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageService(settings));
|
||||||
assertEquals("Azure proxy host is unknown.", e.getMessage());
|
assertEquals("Azure proxy host is unknown.", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.repositories.azure;
|
|
||||||
|
|
||||||
import org.elasticsearch.common.Strings;
|
|
||||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
|
||||||
import org.elasticsearch.common.settings.SecureSettings;
|
|
||||||
|
|
||||||
public class AzureTestUtils {
|
|
||||||
/**
|
|
||||||
* Mock secure settings from sysprops when running integration tests with ThirdParty annotation.
|
|
||||||
* Start the tests with {@code -Dtests.azure.account=AzureStorageAccount and -Dtests.azure.key=AzureStorageKey}
|
|
||||||
* @return Mock Settings from sysprops
|
|
||||||
*/
|
|
||||||
public static SecureSettings generateMockSecureSettings() {
|
|
||||||
MockSecureSettings secureSettings = new MockSecureSettings();
|
|
||||||
|
|
||||||
if (Strings.isEmpty(System.getProperty("tests.azure.account")) ||
|
|
||||||
Strings.isEmpty(System.getProperty("tests.azure.key"))) {
|
|
||||||
throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and " +
|
|
||||||
"-Dtests.azure.account=azure-account -Dtests.azure.key=azure-key");
|
|
||||||
}
|
|
||||||
|
|
||||||
secureSettings.setString("azure.client.default.account", System.getProperty("tests.azure.account"));
|
|
||||||
secureSettings.setString("azure.client.default.key", System.getProperty("tests.azure.key"));
|
|
||||||
|
|
||||||
return secureSettings;
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue