Add support for secondary azure storage account

Follow up for #13228.

This commit adds support for a secondary storage account:

```yml
cloud:
    azure:
        storage:
            my_account1:
                account: your_azure_storage_account1
                key: your_azure_storage_key1
                default: true
            my_account2:
                account: your_azure_storage_account2
                key: your_azure_storage_key2
```

When creating a repository, you can choose which azure account you want to use for it:

```sh
curl -XPUT localhost:9200/_snapshot/my_backup1?pretty -d '{
  "type": "azure"
}'

curl -XPUT localhost:9200/_snapshot/my_backup2?pretty -d '{
  "type": "azure",
  "settings": {
    "account" : "my_account2",
    "location_mode": "secondary_only"
  }
}'
```

`location_mode` supports `primary_only` or `secondary_only`. Defaults to `primary_only`. Note that if you set it
to `secondary_only`, it will force `read_only` to true.
This commit is contained in:
craigwi 2015-08-31 12:44:48 -07:00 committed by David Pilato
parent b855e7d14e
commit 79a4d9ce36
16 changed files with 780 additions and 177 deletions

View File

@ -41,22 +41,37 @@ To enable Azure repositories, you have first to set your azure storage settings
cloud: cloud:
azure: azure:
storage: storage:
account: your_azure_storage_account my_account:
key: your_azure_storage_key account: your_azure_storage_account
key: your_azure_storage_key
---- ----
For information, in previous version of the azure plugin, settings were: Note that you can also define more than one account:
[source,yaml] [source,yaml]
---- ----
cloud: cloud:
azure: azure:
storage_account: your_azure_storage_account storage:
storage_key: your_azure_storage_key my_account1:
account: your_azure_storage_account1
key: your_azure_storage_key1
default: true
my_account2:
account: your_azure_storage_account2
key: your_azure_storage_key2
---- ----
`my_account1` is the default account which will be used by a repository unless you set an explicit one.
The Azure repository supports following settings: The Azure repository supports following settings:
`account`::
Azure account settings to use. Defaults to the only one if you set a single
account or to the one marked as `default` if you have more than one.
`container`:: `container`::
Container name. Defaults to `elasticsearch-snapshots` Container name. Defaults to `elasticsearch-snapshots`
@ -82,6 +97,11 @@ The Azure repository supports following settings:
Makes repository read-only. coming[2.1.0] Defaults to `false`. Makes repository read-only. coming[2.1.0] Defaults to `false`.
`location_mode`::
`primary_only` or `secondary_only`. Defaults to `primary_only`. Note that if you set it
to `secondary_only`, it will force `read_only` to true.
Some examples, using scripts: Some examples, using scripts:
[source,json] [source,json]
@ -103,6 +123,24 @@ PUT _snapshot/my_backup2
"compress": true "compress": true
} }
} }
# With two accounts defined in elasticsearch.yml (my_account1 and my_account2)
PUT _snapshot/my_backup3
{
"type": "azure",
"settings": {
"account": "my_account1"
}
}
PUT _snapshot/my_backup4
{
"type": "azure",
"settings": {
"account": "my_account2",
"location_mode": "primary_only"
}
}
---- ----
// AUTOSENSE // AUTOSENSE
@ -110,7 +148,7 @@ Example using Java:
[source,java] [source,java]
---- ----
client.admin().cluster().preparePutRepository("my_backup3") client.admin().cluster().preparePutRepository("my_backup_java1")
.setType("azure").setSettings(Settings.settingsBuilder() .setType("azure").setSettings(Settings.settingsBuilder()
.put(Storage.CONTAINER, "backup_container") .put(Storage.CONTAINER, "backup_container")
.put(Storage.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB)) .put(Storage.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB))
@ -129,27 +167,3 @@ permitted in container names.
* All letters in a container name must be lowercase. * All letters in a container name must be lowercase.
* Container names must be from 3 through 63 characters long. * Container names must be from 3 through 63 characters long.
[[repository-azure-testing]]
==== Testing Azure
Integrations tests in this plugin require working Azure configuration and therefore disabled by default.
To enable tests prepare a config file `elasticsearch.yml` with the following content:
[source,yaml]
----
cloud:
azure:
storage:
account: "YOUR-AZURE-STORAGE-NAME"
key: "YOUR-AZURE-STORAGE-KEY"
----
Replaces `account`, `key` with your settings. Please, note that the test will delete all snapshot/restore related
files in the specified bucket.
To run test:
[source,sh]
----
mvn -Dtests.azure=true -Dtests.config=/path/to/config/file/elasticsearch.yml clean test
----

View File

@ -107,5 +107,4 @@ public class AzureDiscoveryModule extends AbstractModule {
} }
return false; return false;
} }
} }

View File

@ -19,12 +19,9 @@
package org.elasticsearch.cloud.azure; package org.elasticsearch.cloud.azure;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.storage.AzureStorageService; import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceImpl; import org.elasticsearch.cloud.azure.storage.AzureStorageServiceImpl;
import org.elasticsearch.cloud.azure.storage.AzureStorageSettingsFilter; import org.elasticsearch.cloud.azure.storage.AzureStorageSettingsFilter;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -43,18 +40,12 @@ import org.elasticsearch.common.settings.Settings;
*/ */
public class AzureRepositoryModule extends AbstractModule { public class AzureRepositoryModule extends AbstractModule {
protected final ESLogger logger; protected final ESLogger logger;
private Settings settings;
// pkg private so it is settable by tests // pkg private so it is settable by tests
static Class<? extends AzureStorageService> storageServiceImpl = AzureStorageServiceImpl.class; static Class<? extends AzureStorageService> storageServiceImpl = AzureStorageServiceImpl.class;
public static Class<? extends AzureStorageService> getStorageServiceImpl() {
return storageServiceImpl;
}
@Inject @Inject
public AzureRepositoryModule(Settings settings) { public AzureRepositoryModule(Settings settings) {
this.settings = settings;
this.logger = Loggers.getLogger(getClass(), settings); this.logger = Loggers.getLogger(getClass(), settings);
} }
@ -64,35 +55,7 @@ public class AzureRepositoryModule extends AbstractModule {
bind(AzureStorageSettingsFilter.class).asEagerSingleton(); bind(AzureStorageSettingsFilter.class).asEagerSingleton();
// If we have settings for azure repository, let's start the azure storage service // If we have settings for azure repository, let's start the azure storage service
if (isSnapshotReady(settings, logger)) { logger.debug("starting azure repository service");
logger.debug("starting azure repository service"); bind(AzureStorageService.class).to(storageServiceImpl).asEagerSingleton();
bind(AzureStorageService.class).to(storageServiceImpl).asEagerSingleton();
}
} }
/**
* Check if we have repository azure settings available
* @return true if we can use snapshot and restore
*/
public static boolean isSnapshotReady(Settings settings, ESLogger logger) {
if (isPropertyMissing(settings, Storage.ACCOUNT) ||
isPropertyMissing(settings, Storage.KEY)) {
logger.debug("azure repository is not set using [{}] and [{}] properties",
Storage.ACCOUNT,
Storage.KEY);
return false;
}
logger.trace("all required properties for azure repository are set!");
return true;
}
public static boolean isPropertyMissing(Settings settings, String name) throws ElasticsearchException {
if (!Strings.hasText(settings.get(name))) {
return true;
}
return false;
}
} }

View File

@ -63,7 +63,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public boolean blobExists(String blobName) { public boolean blobExists(String blobName) {
try { try {
return blobStore.client().blobExists(blobStore.container(), buildKey(blobName)); return blobStore.blobExists(blobStore.container(), buildKey(blobName));
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage()); logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage());
} }
@ -73,7 +73,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public InputStream readBlob(String blobName) throws IOException { public InputStream readBlob(String blobName) throws IOException {
try { try {
return blobStore.client().getInputStream(blobStore.container(), buildKey(blobName)); return blobStore.getInputStream(blobStore.container(), buildKey(blobName));
} catch (StorageException e) { } catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage()); throw new FileNotFoundException(e.getMessage());
@ -100,7 +100,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
private OutputStream createOutput(String blobName) throws IOException { private OutputStream createOutput(String blobName) throws IOException {
try { try {
return new AzureOutputStream(blobStore.client().getOutputStream(blobStore.container(), buildKey(blobName))); return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName)));
} catch (StorageException e) { } catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage()); throw new FileNotFoundException(e.getMessage());
@ -116,7 +116,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
try { try {
blobStore.client().deleteBlob(blobStore.container(), buildKey(blobName)); blobStore.deleteBlob(blobStore.container(), buildKey(blobName));
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage()); logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage());
throw new IOException(e); throw new IOException(e);
@ -127,7 +127,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException { public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
try { try {
return blobStore.client().listBlobsByPrefix(blobStore.container(), keyPath, prefix); return blobStore.listBlobsByPrefix(blobStore.container(), keyPath, prefix);
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", prefix, blobStore.container(), e.getMessage()); logger.warn("can not access [{}] in container {{}}: {}", prefix, blobStore.container(), e.getMessage());
throw new IOException(e); throw new IOException(e);
@ -142,7 +142,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
logger.debug("moving blob [{}] to [{}] in container {{}}", source, target, blobStore.container()); logger.debug("moving blob [{}] to [{}] in container {{}}", source, target, blobStore.container());
blobStore.client().moveBlob(blobStore.container(), source, target); blobStore.moveBlob(blobStore.container(), source, target);
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
logger.warn("can not move blob [{}] to [{}] in container {{}}: {}", sourceBlobName, targetBlobName, blobStore.container(), e.getMessage()); logger.warn("can not move blob [{}] to [{}] in container {{}}: {}", sourceBlobName, targetBlobName, blobStore.container(), e.getMessage());
throw new IOException(e); throw new IOException(e);

View File

@ -19,9 +19,11 @@
package org.elasticsearch.cloud.azure.blobstore; package org.elasticsearch.cloud.azure.blobstore;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.storage.AzureStorageService; import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -30,18 +32,22 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.RepositorySettings;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage.CONTAINER; import static org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage.CONTAINER;
import static org.elasticsearch.repositories.azure.AzureRepository.CONTAINER_DEFAULT; import static org.elasticsearch.repositories.azure.AzureRepository.CONTAINER_DEFAULT;
import static org.elasticsearch.repositories.azure.AzureRepository.Repository;
/**
*
*/
public class AzureBlobStore extends AbstractComponent implements BlobStore { public class AzureBlobStore extends AbstractComponent implements BlobStore {
private final AzureStorageService client; private final AzureStorageService client;
private final String accountName;
private final LocationMode locMode;
private final String container; private final String container;
private final String repositoryName; private final String repositoryName;
@ -49,9 +55,19 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
public AzureBlobStore(RepositoryName name, Settings settings, RepositorySettings repositorySettings, public AzureBlobStore(RepositoryName name, Settings settings, RepositorySettings repositorySettings,
AzureStorageService client) throws URISyntaxException, StorageException { AzureStorageService client) throws URISyntaxException, StorageException {
super(settings); super(settings);
this.client = client; this.client = client.start();
this.container = repositorySettings.settings().get("container", settings.get(CONTAINER, CONTAINER_DEFAULT)); this.container = repositorySettings.settings().get("container", settings.get(CONTAINER, CONTAINER_DEFAULT));
this.repositoryName = name.getName(); this.repositoryName = name.getName();
// NOTE: null account means to use the first one specified in config
this.accountName = repositorySettings.settings().get(Repository.ACCOUNT, null);
String modeStr = repositorySettings.settings().get(Repository.LOCATION_MODE, null);
if (modeStr == null) {
this.locMode = LocationMode.PRIMARY_ONLY;
} else {
this.locMode = LocationMode.valueOf(modeStr.toUpperCase(Locale.ROOT));
}
} }
@Override @Override
@ -59,10 +75,6 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
return container; return container;
} }
public AzureStorageService client() {
return client;
}
public String container() { public String container() {
return container; return container;
} }
@ -80,7 +92,7 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
} }
try { try {
client.deleteFiles(container, keyPath); this.client.deleteFiles(this.accountName, this.locMode, container, keyPath);
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {
logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage()); logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage());
} }
@ -89,4 +101,54 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
@Override @Override
public void close() { public void close() {
} }
public boolean doesContainerExist(String container)
{
return this.client.doesContainerExist(this.accountName, this.locMode, container);
}
public void removeContainer(String container) throws URISyntaxException, StorageException
{
this.client.removeContainer(this.accountName, this.locMode, container);
}
public void createContainer(String container) throws URISyntaxException, StorageException
{
this.client.createContainer(this.accountName, this.locMode, container);
}
public void deleteFiles(String container, String path) throws URISyntaxException, StorageException
{
this.client.deleteFiles(this.accountName, this.locMode, container, path);
}
public boolean blobExists(String container, String blob) throws URISyntaxException, StorageException
{
return this.client.blobExists(this.accountName, this.locMode, container, blob);
}
public void deleteBlob(String container, String blob) throws URISyntaxException, StorageException
{
this.client.deleteBlob(this.accountName, this.locMode, container, blob);
}
public InputStream getInputStream(String container, String blob) throws URISyntaxException, StorageException
{
return this.client.getInputStream(this.accountName, this.locMode, container, blob);
}
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException
{
return this.client.getOutputStream(this.accountName, this.locMode, container, blob);
}
public Map<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException
{
return this.client.listBlobsByPrefix(this.accountName, this.locMode, container, keyPath, prefix);
}
public void moveBlob(String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException
{
this.client.moveBlob(this.accountName, this.locMode, container, sourceBlob, targetBlob);
}
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.azure.storage; package org.elasticsearch.cloud.azure.storage;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import java.io.InputStream; import java.io.InputStream;
@ -32,9 +33,12 @@ import java.util.Map;
* @see AzureStorageServiceImpl for Azure REST API implementation * @see AzureStorageServiceImpl for Azure REST API implementation
*/ */
public interface AzureStorageService { public interface AzureStorageService {
static public final class Storage {
public static final String API_IMPLEMENTATION = "cloud.azure.storage.api.impl"; final class Storage {
public static final String PREFIX = "cloud.azure.storage.";
@Deprecated
public static final String ACCOUNT = "cloud.azure.storage.account"; public static final String ACCOUNT = "cloud.azure.storage.account";
@Deprecated
public static final String KEY = "cloud.azure.storage.key"; public static final String KEY = "cloud.azure.storage.key";
public static final String CONTAINER = "repositories.azure.container"; public static final String CONTAINER = "repositories.azure.container";
public static final String BASE_PATH = "repositories.azure.base_path"; public static final String BASE_PATH = "repositories.azure.base_path";
@ -42,23 +46,25 @@ public interface AzureStorageService {
public static final String COMPRESS = "repositories.azure.compress"; public static final String COMPRESS = "repositories.azure.compress";
} }
boolean doesContainerExist(String container); boolean doesContainerExist(String account, LocationMode mode, String container);
void removeContainer(String container) throws URISyntaxException, StorageException; void removeContainer(String account, LocationMode mode, String container) throws URISyntaxException, StorageException;
void createContainer(String container) throws URISyntaxException, StorageException; void createContainer(String account, LocationMode mode, String container) throws URISyntaxException, StorageException;
void deleteFiles(String container, String path) throws URISyntaxException, StorageException; void deleteFiles(String account, LocationMode mode, String container, String path) throws URISyntaxException, StorageException;
boolean blobExists(String container, String blob) throws URISyntaxException, StorageException; boolean blobExists(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException;
void deleteBlob(String container, String blob) throws URISyntaxException, StorageException; void deleteBlob(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException;
InputStream getInputStream(String container, String blob) throws URISyntaxException, StorageException; InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException;
OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException; OutputStream getOutputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException;
Map<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException; Map<String,BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) throws URISyntaxException, StorageException;
void moveBlob(String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException; void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException;
AzureStorageService start();
} }

View File

@ -20,12 +20,14 @@
package org.elasticsearch.cloud.azure.storage; package org.elasticsearch.cloud.azure.storage;
import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.*; import com.microsoft.azure.storage.blob.*;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -35,54 +37,91 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Hashtable;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage.*;
/**
*
*/
public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureStorageServiceImpl> public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureStorageServiceImpl>
implements AzureStorageService { implements AzureStorageService {
private final String account; final AzureStorageSettings primaryStorageSettings;
private final String key; final Map<String, AzureStorageSettings> secondariesStorageSettings;
private final String blob;
private CloudBlobClient client;
final Map<String, CloudBlobClient> clients;
@Inject @Inject
public AzureStorageServiceImpl(Settings settings) { public AzureStorageServiceImpl(Settings settings) {
super(settings); super(settings);
// We try to load storage API settings from `cloud.azure.`
account = settings.get(ACCOUNT);
key = settings.get(KEY);
blob = "https://" + account + ".blob.core.windows.net/";
try { Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> storageSettings = AzureStorageSettings.parse(settings);
if (account != null) { this.primaryStorageSettings = storageSettings.v1();
logger.trace("creating new Azure storage client using account [{}], key [{}], blob [{}]", account, key, blob); this.secondariesStorageSettings = storageSettings.v2();
String storageConnectionString = this.clients = new Hashtable<>();
"DefaultEndpointsProtocol=https;"
+ "AccountName="+ account +";"
+ "AccountKey=" + key;
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
// Create the blob client.
client = storageAccount.createCloudBlobClient();
}
} catch (Exception e) {
// Can not start Azure Storage Client
logger.error("can not start azure storage client: {}", e.getMessage());
}
} }
@Override void createClient(AzureStorageSettings azureStorageSettings) {
public boolean doesContainerExist(String container) {
try { try {
logger.trace("creating new Azure storage client using account [{}], key [{}]",
azureStorageSettings.getAccount(), azureStorageSettings.getKey());
String storageConnectionString =
"DefaultEndpointsProtocol=https;"
+ "AccountName="+ azureStorageSettings.getAccount() +";"
+ "AccountKey=" + azureStorageSettings.getKey();
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
// Create the blob client.
CloudBlobClient client = storageAccount.createCloudBlobClient();
// Register the client
this.clients.put(azureStorageSettings.getAccount(), client);
} catch (Exception e) {
logger.error("can not create azure storage client: {}", e.getMessage());
}
}
CloudBlobClient getSelectedClient(String account, LocationMode mode) {
logger.trace("selecting a client for account [{}], mode [{}]", account, mode.name());
AzureStorageSettings azureStorageSettings = null;
if (this.primaryStorageSettings == null || this.secondariesStorageSettings.isEmpty()) {
throw new IllegalArgumentException("No azure storage can be found. Check your elasticsearch.yml.");
}
if (account != null) {
azureStorageSettings = this.secondariesStorageSettings.get(account);
}
// if account is not secondary, it's the primary
if (azureStorageSettings == null) {
if (account == null || primaryStorageSettings.getName() == null || account.equals(primaryStorageSettings.getName())) {
azureStorageSettings = primaryStorageSettings;
}
}
if (azureStorageSettings == null) {
// We did not get an account. That's bad.
throw new IllegalArgumentException("Can not find azure account [" + account + "]. Check your elasticsearch.yml.");
}
CloudBlobClient client = this.clients.get(azureStorageSettings.getAccount());
if (client == null) {
throw new IllegalArgumentException("Can not find an azure client for account [" + account + "]");
}
// NOTE: for now, just set the location mode in case it is different;
// only one mode per storage account can be active at a time
client.getDefaultRequestOptions().setLocationMode(mode);
return client;
}
@Override
public boolean doesContainerExist(String account, LocationMode mode, String container) {
try {
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
return blob_container.exists(); return blob_container.exists();
} catch (Exception e) { } catch (Exception e) {
@ -92,7 +131,8 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public void removeContainer(String container) throws URISyntaxException, StorageException { public void removeContainer(String account, LocationMode mode, String container) throws URISyntaxException, StorageException {
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
// TODO Should we set some timeout and retry options? // TODO Should we set some timeout and retry options?
/* /*
@ -106,8 +146,9 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public void createContainer(String container) throws URISyntaxException, StorageException { public void createContainer(String account, LocationMode mode, String container) throws URISyntaxException, StorageException {
try { try {
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
logger.trace("creating container [{}]", container); logger.trace("creating container [{}]", container);
blob_container.createIfNotExists(); blob_container.createIfNotExists();
@ -118,22 +159,24 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public void deleteFiles(String container, String path) throws URISyntaxException, StorageException { public void deleteFiles(String account, LocationMode mode, String container, String path) throws URISyntaxException, StorageException {
logger.trace("delete files container [{}], path [{}]", container, path); logger.trace("delete files container [{}], path [{}]", container, path);
// Container name must be lower case. // Container name must be lower case.
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) { if (blob_container.exists()) {
for (ListBlobItem blobItem : blob_container.listBlobs(path)) { for (ListBlobItem blobItem : blob_container.listBlobs(path)) {
logger.trace("removing blob [{}]", blobItem.getUri()); logger.trace("removing blob [{}]", blobItem.getUri());
deleteBlob(container, blobItem.getUri().toString()); deleteBlob(account, mode, container, blobItem.getUri().toString());
} }
} }
} }
@Override @Override
public boolean blobExists(String container, String blob) throws URISyntaxException, StorageException { public boolean blobExists(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
// Container name must be lower case. // Container name must be lower case.
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) { if (blob_container.exists()) {
CloudBlockBlob azureBlob = blob_container.getBlockBlobReference(blob); CloudBlockBlob azureBlob = blob_container.getBlockBlobReference(blob);
@ -144,10 +187,11 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public void deleteBlob(String container, String blob) throws URISyntaxException, StorageException { public void deleteBlob(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
logger.trace("delete blob for container [{}], blob [{}]", container, blob); logger.trace("delete blob for container [{}], blob [{}]", container, blob);
// Container name must be lower case. // Container name must be lower case.
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) { if (blob_container.exists()) {
logger.trace("container [{}]: blob [{}] found. removing.", container, blob); logger.trace("container [{}]: blob [{}] found. removing.", container, blob);
@ -157,22 +201,29 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public InputStream getInputStream(String container, String blob) throws URISyntaxException, StorageException { public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
logger.trace("reading container [{}], blob [{}]", container, blob); logger.trace("reading container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
return client.getContainerReference(container).getBlockBlobReference(blob).openInputStream(); return client.getContainerReference(container).getBlockBlobReference(blob).openInputStream();
} }
@Override @Override
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException { public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
logger.trace("writing container [{}], blob [{}]", container, blob); logger.trace("writing container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
return client.getContainerReference(container).getBlockBlobReference(blob).openOutputStream(); return client.getContainerReference(container).getBlockBlobReference(blob).openOutputStream();
} }
@Override @Override
public Map<String, BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException { public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) throws URISyntaxException, StorageException {
// NOTE: this should be here: if (prefix == null) prefix = "";
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
logger.debug("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix); logger.debug("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix);
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder(); MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container); CloudBlobContainer blobContainer = client.getContainerReference(container);
if (blobContainer.exists()) { if (blobContainer.exists()) {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix))) { for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix))) {
@ -200,8 +251,10 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public void moveBlob(String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException { public void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException {
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}]", container, sourceBlob, targetBlob); logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}]", container, sourceBlob, targetBlob);
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blob_container = client.getContainerReference(container); CloudBlobContainer blob_container = client.getContainerReference(container);
CloudBlockBlob blobSource = blob_container.getBlockBlobReference(sourceBlob); CloudBlockBlob blobSource = blob_container.getBlockBlobReference(sourceBlob);
if (blobSource.exists()) { if (blobSource.exists()) {
@ -215,11 +268,25 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
@Override @Override
protected void doStart() throws ElasticsearchException { protected void doStart() throws ElasticsearchException {
logger.debug("starting azure storage client instance"); logger.debug("starting azure storage client instance");
// We register the primary client if any
if (primaryStorageSettings != null) {
logger.debug("registering primary client for account [{}]", primaryStorageSettings.getAccount());
createClient(primaryStorageSettings);
}
// We register all secondary clients
for (Map.Entry<String, AzureStorageSettings> azureStorageSettingsEntry : secondariesStorageSettings.entrySet()) {
logger.debug("registering secondary client for account [{}]", azureStorageSettingsEntry.getKey());
createClient(azureStorageSettingsEntry.getValue());
}
} }
@Override @Override
protected void doStop() throws ElasticsearchException { protected void doStop() throws ElasticsearchException {
logger.debug("stopping azure storage client instance"); logger.debug("stopping azure storage client instance");
// We should stop all clients but it does sound like CloudBlobClient has
// any shutdown method...
} }
@Override @Override

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.storage;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import java.util.HashMap;
import java.util.Map;
public class AzureStorageSettings {
private static ESLogger logger = ESLoggerFactory.getLogger(AzureStorageSettings.class.getName());
private String name;
private String account;
private String key;
public AzureStorageSettings(String name, String account, String key) {
this.name = name;
this.account = account;
this.key = key;
}
public String getName() {
return name;
}
public String getKey() {
return key;
}
public String getAccount() {
return account;
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("AzureStorageSettings{");
sb.append("name='").append(name).append('\'');
sb.append(", account='").append(account).append('\'');
sb.append(", key='").append(key).append('\'');
sb.append('}');
return sb.toString();
}
/**
* Parses settings and read all settings available under cloud.azure.storage.*
* @param settings settings to parse
* @return A tuple with v1 = primary storage and v2 = secondary storage
*/
public static Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> parse(Settings settings) {
AzureStorageSettings primaryStorage = null;
Map<String, AzureStorageSettings> secondaryStorage = new HashMap<>();
// We check for deprecated settings
String account = settings.get(Storage.ACCOUNT);
String key = settings.get(Storage.KEY);
if (account != null) {
logger.warn("[{}] and [{}] have been deprecated. Use now [{}xxx.account] and [{}xxx.key] where xxx is any name",
Storage.ACCOUNT, Storage.KEY, Storage.PREFIX, Storage.PREFIX);
primaryStorage = new AzureStorageSettings(null, account, key);
} else {
Settings storageSettings = settings.getByPrefix(Storage.PREFIX);
if (storageSettings != null) {
Map<String, Object> asMap = storageSettings.getAsStructuredMap();
for (Map.Entry<String, Object> storage : asMap.entrySet()) {
if (storage.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> map = (Map) storage.getValue();
AzureStorageSettings current = new AzureStorageSettings(storage.getKey(), map.get("account"), map.get("key"));
boolean activeByDefault = Boolean.parseBoolean(map.getOrDefault("default", "false"));
if (activeByDefault) {
if (primaryStorage == null) {
primaryStorage = current;
} else {
logger.warn("default storage settings has already been defined. You can not define it to [{}]", storage.getKey());
secondaryStorage.put(storage.getKey(), current);
}
} else {
secondaryStorage.put(storage.getKey(), current);
}
}
}
// If we did not set any default storage, we should complain and define it
if (primaryStorage == null && secondaryStorage.isEmpty() == false) {
Map.Entry<String, AzureStorageSettings> fallback = secondaryStorage.entrySet().iterator().next();
// We only warn if the number of secondary storage if > to 1
// If the user defined only one storage account, that's fine. We know it's the default one.
if (secondaryStorage.size() > 1) {
logger.warn("no default storage settings has been defined. " +
"Add \"default\": true to the settings you want to activate by default. " +
"Forcing default to [{}].", fallback.getKey());
}
primaryStorage = fallback.getValue();
secondaryStorage.remove(fallback.getKey());
}
}
}
return Tuple.tuple(primaryStorage, secondaryStorage);
}
}

View File

@ -32,8 +32,6 @@ import org.elasticsearch.repositories.azure.AzureRepository;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import static org.elasticsearch.cloud.azure.AzureRepositoryModule.isSnapshotReady;
/** /**
* *
*/ */
@ -63,8 +61,7 @@ public class AzureRepositoryPlugin extends Plugin {
} }
public void onModule(RepositoriesModule module) { public void onModule(RepositoriesModule module) {
if (isSnapshotReady(settings, logger)) { logger.debug("registering repository type [{}]", AzureRepository.TYPE);
module.registerRepository(AzureRepository.TYPE, AzureRepository.class, BlobStoreIndexShardRepository.class); module.registerRepository(AzureRepository.TYPE, AzureRepository.class, BlobStoreIndexShardRepository.class);
}
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.repositories.azure; package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore; import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage; import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -40,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotCreationException;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.Locale;
/** /**
* Azure file system implementation of the BlobStoreRepository * Azure file system implementation of the BlobStoreRepository
@ -58,6 +60,8 @@ public class AzureRepository extends BlobStoreRepository {
public final static String CONTAINER_DEFAULT = "elasticsearch-snapshots"; public final static String CONTAINER_DEFAULT = "elasticsearch-snapshots";
static public final class Repository { static public final class Repository {
public static final String ACCOUNT = "account";
public static final String LOCATION_MODE = "location_mode";
public static final String CONTAINER = "container"; public static final String CONTAINER = "container";
public static final String CHUNK_SIZE = "chunk_size"; public static final String CHUNK_SIZE = "chunk_size";
public static final String COMPRESS = "compress"; public static final String COMPRESS = "compress";
@ -71,6 +75,7 @@ public class AzureRepository extends BlobStoreRepository {
private ByteSizeValue chunkSize; private ByteSizeValue chunkSize;
private boolean compress; private boolean compress;
private final boolean readonly;
@Inject @Inject
public AzureRepository(RepositoryName name, RepositorySettings repositorySettings, public AzureRepository(RepositoryName name, RepositorySettings repositorySettings,
@ -92,6 +97,18 @@ public class AzureRepository extends BlobStoreRepository {
this.compress = repositorySettings.settings().getAsBoolean(Repository.COMPRESS, this.compress = repositorySettings.settings().getAsBoolean(Repository.COMPRESS,
settings.getAsBoolean(Storage.COMPRESS, false)); settings.getAsBoolean(Storage.COMPRESS, false));
String modeStr = repositorySettings.settings().get(Repository.LOCATION_MODE, null);
if (modeStr != null) {
LocationMode locationMode = LocationMode.valueOf(modeStr.toUpperCase(Locale.ROOT));
if (locationMode == LocationMode.SECONDARY_ONLY) {
readonly = true;
} else {
readonly = false;
}
} else {
readonly = false;
}
String basePath = repositorySettings.settings().get(Repository.BASE_PATH, null); String basePath = repositorySettings.settings().get(Repository.BASE_PATH, null);
if (Strings.hasLength(basePath)) { if (Strings.hasLength(basePath)) {
@ -141,15 +158,12 @@ public class AzureRepository extends BlobStoreRepository {
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) { public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {
try { try {
if (!blobStore.client().doesContainerExist(blobStore.container())) { if (!blobStore.doesContainerExist(blobStore.container())) {
logger.debug("container [{}] does not exist. Creating...", blobStore.container()); logger.debug("container [{}] does not exist. Creating...", blobStore.container());
blobStore.client().createContainer(blobStore.container()); blobStore.createContainer(blobStore.container());
} }
super.initializeSnapshot(snapshotId, indices, metaData); super.initializeSnapshot(snapshotId, indices, metaData);
} catch (StorageException e) { } catch (StorageException | URISyntaxException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new SnapshotCreationException(snapshotId, e);
} catch (URISyntaxException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage()); logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new SnapshotCreationException(snapshotId, e); throw new SnapshotCreationException(snapshotId, e);
} }
@ -157,18 +171,22 @@ public class AzureRepository extends BlobStoreRepository {
@Override @Override
public String startVerification() { public String startVerification() {
try { if (readonly == false) {
if (!blobStore.client().doesContainerExist(blobStore.container())) { try {
logger.debug("container [{}] does not exist. Creating...", blobStore.container()); if (!blobStore.doesContainerExist(blobStore.container())) {
blobStore.client().createContainer(blobStore.container()); logger.debug("container [{}] does not exist. Creating...", blobStore.container());
blobStore.createContainer(blobStore.container());
}
} catch (StorageException | URISyntaxException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new RepositoryVerificationException(repositoryName, "can not initialize container " + blobStore.container(), e);
} }
return super.startVerification();
} catch (StorageException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new RepositoryVerificationException(repositoryName, "can not initialize container " + blobStore.container(), e);
} catch (URISyntaxException e) {
logger.warn("can not initialize container [{}]: [{}]", blobStore.container(), e.getMessage());
throw new RepositoryVerificationException(repositoryName, "can not initialize container " + blobStore.container(), e);
} }
return super.startVerification();
}
@Override
public boolean readOnly() {
return readonly;
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.azure; package org.elasticsearch.cloud.azure;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.cloud.azure.storage.AzureStorageService; import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage; import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceMock; import org.elasticsearch.cloud.azure.storage.AzureStorageServiceMock;
@ -115,6 +116,6 @@ public abstract class AbstractAzureRepositoryServiceTestCase extends AbstractAzu
String container = internalCluster().getInstance(Settings.class).get("repositories.azure.container"); String container = internalCluster().getInstance(Settings.class).get("repositories.azure.container");
logger.info("--> remove blobs in container [{}]", container); logger.info("--> remove blobs in container [{}]", container);
AzureStorageService client = internalCluster().getInstance(AzureStorageService.class); AzureStorageService client = internalCluster().getInstance(AzureStorageService.class);
client.deleteFiles(container, path); client.deleteFiles(null, LocationMode.PRIMARY_ONLY, container, path);
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.azure.storage; package org.elasticsearch.cloud.azure.storage;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
@ -51,46 +52,46 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public boolean doesContainerExist(String container) { public boolean doesContainerExist(String account, LocationMode mode, String container) {
return true; return true;
} }
@Override @Override
public void removeContainer(String container) { public void removeContainer(String account, LocationMode mode, String container) {
} }
@Override @Override
public void createContainer(String container) { public void createContainer(String account, LocationMode mode, String container) {
} }
@Override @Override
public void deleteFiles(String container, String path) { public void deleteFiles(String account, LocationMode mode, String container, String path) {
} }
@Override @Override
public boolean blobExists(String container, String blob) { public boolean blobExists(String account, LocationMode mode, String container, String blob) {
return blobs.containsKey(blob); return blobs.containsKey(blob);
} }
@Override @Override
public void deleteBlob(String container, String blob) { public void deleteBlob(String account, LocationMode mode, String container, String blob) {
blobs.remove(blob); blobs.remove(blob);
} }
@Override @Override
public InputStream getInputStream(String container, String blob) { public InputStream getInputStream(String account, LocationMode mode, String container, String blob) {
return new ByteArrayInputStream(blobs.get(blob).toByteArray()); return new ByteArrayInputStream(blobs.get(blob).toByteArray());
} }
@Override @Override
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException { public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
blobs.put(blob, outputStream); blobs.put(blob, outputStream);
return outputStream; return outputStream;
} }
@Override @Override
public Map<String, BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) { public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) {
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder(); MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
if (startsWithIgnoreCase(blobName, prefix)) { if (startsWithIgnoreCase(blobName, prefix)) {
@ -101,7 +102,7 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureSto
} }
@Override @Override
public void moveBlob(String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException { public void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException {
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
if (endsWithIgnoreCase(blobName, sourceBlob)) { if (endsWithIgnoreCase(blobName, sourceBlob)) {
ByteArrayOutputStream outputStream = blobs.get(blobName); ByteArrayOutputStream outputStream = blobs.get(blobName);

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.storage;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import java.net.URI;
import static org.hamcrest.Matchers.is;
public class AzureStorageServiceTest extends ESTestCase {
final static Settings settings = Settings.builder()
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.put("cloud.azure.storage.azure1.default", true)
.put("cloud.azure.storage.azure2.account", "myaccount2")
.put("cloud.azure.storage.azure2.key", "mykey2")
.put("cloud.azure.storage.azure3.account", "myaccount3")
.put("cloud.azure.storage.azure3.key", "mykey3")
.build();
public void testGetSelectedClientWithNoPrimaryAndSecondary() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(Settings.EMPTY);
azureStorageService.doStart();
try {
azureStorageService.getSelectedClient("whatever", LocationMode.PRIMARY_ONLY);
fail("we should have raised an IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("No azure storage can be found. Check your elasticsearch.yml."));
}
}
public void testGetSelectedClientPrimary() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
public void testGetSelectedClientSecondary1() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure2", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure2")));
}
public void testGetSelectedClientSecondary2() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient("azure3", LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure3")));
}
public void testGetSelectedClientNonExisting() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
try {
azureStorageService.getSelectedClient("azure4", LocationMode.PRIMARY_ONLY);
fail("we should have raised an IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("Can not find azure account [azure4]. Check your elasticsearch.yml."));
}
}
public void testGetSelectedClientDefault() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMock(settings);
azureStorageService.doStart();
CloudBlobClient client = azureStorageService.getSelectedClient(null, LocationMode.PRIMARY_ONLY);
assertThat(client.getEndpoint(), is(URI.create("https://azure1")));
}
/**
* This internal class just overload createClient method which is called by AzureStorageServiceImpl.doStart()
*/
class AzureStorageServiceMock extends AzureStorageServiceImpl {
public AzureStorageServiceMock(Settings settings) {
super(settings);
}
// We fake the client here
@Override
void createClient(AzureStorageSettings azureStorageSettings) {
this.clients.put(azureStorageSettings.getAccount(),
new CloudBlobClient(URI.create("https://" + azureStorageSettings.getName())));
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugin.repository.azure.AzureRepositoryPlugin;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
/**
* Azure Repository
* Main class to easily run Azure from a IDE.
* It sets all the options to run the Azure plugin and access it from Sense.
*
* In order to run this class set configure the following:
* 1) Set `-Des.path.home=` to a directory containing an ES config directory
* 2) Set `-Dcloud.azure.storage.my_account.account=account_name`
* 3) Set `-Dcloud.azure.storage.my_account.key=account_key`
*
* Then you can run REST calls like:
* <pre>
# Clean test env
curl -XDELETE localhost:9200/foo?pretty
curl -XDELETE localhost:9200/_snapshot/my_backup1?pretty
curl -XDELETE localhost:9200/_snapshot/my_backup2?pretty
# Create data
curl -XPUT localhost:9200/foo/bar/1?pretty -d '{
"foo": "bar"
}'
curl -XPOST localhost:9200/foo/_refresh?pretty
curl -XGET localhost:9200/foo/_count?pretty
# Create repository using default account
curl -XPUT localhost:9200/_snapshot/my_backup1?pretty -d '{
"type": "azure"
}'
# Backup
curl -XPOST "localhost:9200/_snapshot/my_backup1/snap1?pretty&amp;wait_for_completion=true"
# Remove data
curl -XDELETE localhost:9200/foo?pretty
# Restore data
curl -XPOST "localhost:9200/_snapshot/my_backup1/snap1/_restore?pretty&amp;wait_for_completion=true"
curl -XGET localhost:9200/foo/_count?pretty
</pre>
*
* If you want to define a secondary repository:
*
* 4) Set `-Dcloud.azure.storage.my_account.default=true`
* 5) Set `-Dcloud.azure.storage.my_account2.account=account_name`
* 6) Set `-Dcloud.azure.storage.my_account2.key=account_key_secondary`
*
* Then you can run REST calls like:
* <pre>
# Remove data
curl -XDELETE localhost:9200/foo?pretty
# Create repository using account2 (secondary)
curl -XPUT localhost:9200/_snapshot/my_backup2?pretty -d '{
"type": "azure",
"settings": {
"account" : "my_account2",
"location_mode": "secondary_only"
}
}'
# Restore data from the secondary endpoint
curl -XPOST "localhost:9200/_snapshot/my_backup2/snap1/_restore?pretty&amp;wait_for_completion=true"
curl -XGET localhost:9200/foo/_count?pretty
</pre>
*/
public class AzureRepositoryF {
public static void main(String[] args) throws Throwable {
Settings.Builder settings = Settings.builder();
settings.put("http.cors.enabled", "true");
settings.put("http.cors.allow-origin", "*");
settings.put("cluster.name", AzureRepositoryF.class.getSimpleName());
// Example for azure repo settings
// settings.put("cloud.azure.storage.my_account1.account", "account_name");
// settings.put("cloud.azure.storage.my_account1.key", "account_key");
// settings.put("cloud.azure.storage.my_account1.default", true);
// settings.put("cloud.azure.storage.my_account2.account", "account_name");
// settings.put("cloud.azure.storage.my_account2.key", "account_key_secondary");
final CountDownLatch latch = new CountDownLatch(1);
final Node node = new MockNode(settings.build(), Version.CURRENT, Collections.singletonList(AzureRepositoryPlugin.class));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
node.close();
latch.countDown();
}
});
node.start();
latch.await();
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cloud.azure.storage.AzureStorageSettings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
import static org.hamcrest.Matchers.*;
public class AzureSettingsParserTest extends LuceneTestCase {
public void testParseTwoSettingsExplicitDefault() {
Settings settings = Settings.builder()
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.put("cloud.azure.storage.azure1.default", true)
.put("cloud.azure.storage.azure2.account", "myaccount2")
.put("cloud.azure.storage.azure2.key", "mykey2")
.build();
Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> tuple = AzureStorageSettings.parse(settings);
assertThat(tuple.v1(), notNullValue());
assertThat(tuple.v1().getAccount(), is("myaccount1"));
assertThat(tuple.v1().getKey(), is("mykey1"));
assertThat(tuple.v2().keySet(), hasSize(1));
assertThat(tuple.v2().get("azure2"), notNullValue());
assertThat(tuple.v2().get("azure2").getAccount(), is("myaccount2"));
assertThat(tuple.v2().get("azure2").getKey(), is("mykey2"));
}
public void testParseUniqueSettings() {
Settings settings = Settings.builder()
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.build();
Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> tuple = AzureStorageSettings.parse(settings);
assertThat(tuple.v1(), notNullValue());
assertThat(tuple.v1().getAccount(), is("myaccount1"));
assertThat(tuple.v1().getKey(), is("mykey1"));
assertThat(tuple.v2().keySet(), hasSize(0));
}
public void testDeprecatedSettings() {
Settings settings = Settings.builder()
.put(Storage.ACCOUNT, "myaccount1")
.put(Storage.KEY, "mykey1")
.build();
Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> tuple = AzureStorageSettings.parse(settings);
assertThat(tuple.v1(), notNullValue());
assertThat(tuple.v1().getAccount(), is("myaccount1"));
assertThat(tuple.v1().getKey(), is("mykey1"));
assertThat(tuple.v2().keySet(), hasSize(0));
}
public void testParseTwoSettingsNoDefault() {
Settings settings = Settings.builder()
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.put("cloud.azure.storage.azure2.account", "myaccount2")
.put("cloud.azure.storage.azure2.key", "mykey2")
.build();
Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> tuple = AzureStorageSettings.parse(settings);
assertThat(tuple.v1(), notNullValue());
assertThat(tuple.v1().getAccount(), is("myaccount1"));
assertThat(tuple.v1().getKey(), is("mykey1"));
assertThat(tuple.v2().keySet(), hasSize(1));
assertThat(tuple.v2().get("azure2"), notNullValue());
assertThat(tuple.v2().get("azure2").getAccount(), is("myaccount2"));
assertThat(tuple.v2().get("azure2").getKey(), is("mykey2"));
}
public void testParseTwoSettingsTooManyDefaultSet() {
Settings settings = Settings.builder()
.put("cloud.azure.storage.azure1.account", "myaccount1")
.put("cloud.azure.storage.azure1.key", "mykey1")
.put("cloud.azure.storage.azure1.default", true)
.put("cloud.azure.storage.azure2.account", "myaccount2")
.put("cloud.azure.storage.azure2.key", "mykey2")
.put("cloud.azure.storage.azure2.default", true)
.build();
Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> tuple = AzureStorageSettings.parse(settings);
assertThat(tuple.v1(), notNullValue());
assertThat(tuple.v1().getAccount(), is("myaccount1"));
assertThat(tuple.v1().getKey(), is("mykey1"));
assertThat(tuple.v2().keySet(), hasSize(1));
assertThat(tuple.v2().get("azure2"), notNullValue());
assertThat(tuple.v2().get("azure2").getAccount(), is("myaccount2"));
assertThat(tuple.v2().get("azure2").getKey(), is("mykey2"));
}
public void testParseEmptySettings() {
Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> tuple = AzureStorageSettings.parse(Settings.EMPTY);
assertThat(tuple.v1(), nullValue());
assertThat(tuple.v2().keySet(), hasSize(0));
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.azure;
import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
@ -472,7 +473,7 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyTestCa
@Override @Override
public void run() { public void run() {
try { try {
storageService.createContainer(container); storageService.createContainer(null, LocationMode.PRIMARY_ONLY, container);
logger.debug(" -> container created..."); logger.debug(" -> container created...");
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
// Incorrect URL. This should never happen. // Incorrect URL. This should never happen.
@ -484,7 +485,7 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyTestCa
} }
} }
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
storageService.removeContainer(container); storageService.removeContainer(null, LocationMode.PRIMARY_ONLY, container);
ClusterAdminClient client = client().admin().cluster(); ClusterAdminClient client = client().admin().cluster();
logger.info("--> creating azure repository while container is being removed"); logger.info("--> creating azure repository while container is being removed");
@ -523,7 +524,7 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyTestCa
Settings settings = readSettingsFromFile(); Settings settings = readSettingsFromFile();
AzureStorageService client = new AzureStorageServiceImpl(settings); AzureStorageService client = new AzureStorageServiceImpl(settings);
for (String container : containers) { for (String container : containers) {
client.removeContainer(container); client.removeContainer(null, LocationMode.PRIMARY_ONLY, container);
} }
} }
} }