Add Azure Storage repository

elasticsearch 1.0 will provide a new feature named `Snapshot & Restore`.

We want to add support for [Azure Storage](http://www.windowsazure.com/en-us/documentation/services/storage/).

To enable Azure repositories, you have first to set your azure storage settings:

```yaml
    cloud:
        azure:
            storage_account: your_azure_storage_account
            storage_key: your_azure_storage_key
```

The Azure repository supports following settings:

* `container`: Container name. Defaults to `elasticsearch-snapshots`
* `base_path`: Specifies the path within container to repository data. Defaults to empty (root directory).
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified
in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `64m` (64m max)
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index
files that are already compressed by default. Defaults to `false`.

Some examples, using scripts:

```sh
$ curl -XPUT 'http://localhost:9200/_snapshot/my_backup1' -d '{
    "type": "azure"
}'

$ curl -XPUT 'http://localhost:9200/_snapshot/my_backup2' -d '{
    "type": "azure",
    "settings": {
        "container": "backup_container",
        "base_path": "backups",
        "concurrent_streams": 2,
        "chunk_size": "32m",
        "compress": true
    }
}'
```

Example using Java:

```java
client.admin().cluster().preparePutRepository("my_backup3")
        .setType("azure").setSettings(ImmutableSettings.settingsBuilder()
                .put(AzureStorageService.Fields.CONTAINER, "backup_container")
                .put(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB))
        ).get();
```

Closes #2.
This commit is contained in:
David Pilato 2014-01-06 23:14:34 +01:00
parent 696cad1f67
commit 11dfdfb189
23 changed files with 1639 additions and 29 deletions

View File

@ -335,6 +335,82 @@ If you want to remove your running instances:
azure vm delete myesnode1
```
Azure Repository
================
To enable Azure repositories, you have first to set your azure storage settings:
```
cloud:
azure:
storage_account: your_azure_storage_account
storage_key: your_azure_storage_key
```
The Azure repository supports following settings:
* `container`: Container name. Defaults to `elasticsearch-snapshots`
* `base_path`: Specifies the path within container to repository data. Defaults to empty (root directory).
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified
in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `64m` (64m max)
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index
files that are already compressed by default. Defaults to `false`.
Some examples, using scripts:
```sh
# The simpliest one
$ curl -XPUT 'http://localhost:9200/_snapshot/my_backup1' -d '{
"type": "azure"
}'
# With some settings
$ curl -XPUT 'http://localhost:9200/_snapshot/my_backup2' -d '{
"type": "azure",
"settings": {
"container": "backup_container",
"base_path": "backups",
"concurrent_streams": 2,
"chunk_size": "32m",
"compress": true
}
}'
```
Example using Java:
```java
client.admin().cluster().preparePutRepository("my_backup3")
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
.put(AzureStorageService.Fields.CONTAINER, "backup_container")
.put(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB))
).get();
```
Testing
-------
Integrations tests in this plugin require working Azure configuration and therefore disabled by default.
To enable tests prepare a config file elasticsearch.yml with the following content:
```
repositories:
azure:
account: "YOUR-AZURE-STORAGE-NAME"
key: "YOUR-AZURE-STORAGE-KEY"
```
Replaces `account`, `key` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified bucket.
To run test:
```sh
mvn -Dtests.azure=true -Des.config=/path/to/config/file/elasticsearch.yml clean test
```
License
-------

View File

@ -68,6 +68,11 @@ governing permissions and limitations under the License. -->
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.windowsazure</groupId>
<artifactId>microsoft-windowsazure-api</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>

View File

@ -1,5 +1,5 @@
<?xml version="1.0"?>
<!-- Licensed to ElasticSearch under one or more contributor
<!-- Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. ElasticSearch licenses this file to you
under the Apache License, Version 2.0 (the "License"); you may not use this
@ -25,5 +25,13 @@ governing permissions and limitations under the License. -->
<exclude>org.elasticsearch:elasticsearch</exclude>
</excludes>
</dependencySet>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>com.microsoft.windowsazure:microsoft-windowsazure-api</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -20,11 +20,9 @@
package org.elasticsearch.cloud.azure;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.settings.SettingsFilter;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@ -84,12 +82,7 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent<AzureCom
// Check that we have all needed properties
try {
checkProperty(Fields.SUBSCRIPTION_ID, subscription_id);
checkProperty(Fields.SERVICE_NAME, service_name);
checkProperty(Fields.KEYSTORE, keystore);
checkProperty(Fields.PASSWORD, password);
socketFactory = getSocketFactory(keystore, password);
if (logger.isTraceEnabled()) logger.trace("creating new Azure client for [{}], [{}], [{}], [{}]",
subscription_id, service_name, port_name);
} catch (Exception e) {
@ -210,11 +203,4 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent<AzureCom
@Override
protected void doClose() throws ElasticsearchException {
}
private void checkProperty(String name, String value) throws ElasticsearchException {
if (!Strings.hasText(value)) {
throw new SettingsException("cloud.azure." + name +" is not set or is incorrect.");
}
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cloud.azure;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -32,9 +34,12 @@ import org.elasticsearch.discovery.azure.AzureDiscovery;
* <ul>
* <li>If needed this module will bind azure discovery service by default
* to AzureComputeServiceImpl.</li>
* <li>If needed this module will bind azure repository service by default
* to AzureStorageServiceImpl.</li>
* </ul>
*
* @see org.elasticsearch.cloud.azure.AzureComputeServiceImpl
* @see org.elasticsearch.cloud.azure.AzureStorageServiceImpl
*/
public class AzureModule extends AbstractModule {
protected final ESLogger logger;
@ -51,10 +56,18 @@ public class AzureModule extends AbstractModule {
logger.debug("starting azure services");
// If we have set discovery to azure, let's start the azure compute service
if (isDiscoveryReady(settings)) {
if (isDiscoveryReady(settings, logger)) {
logger.debug("starting azure discovery service");
bind(AzureComputeService.class)
.to(settings.getAsClass("cloud.azure.api.impl", AzureComputeServiceImpl.class))
bind(AzureComputeService.class)
.to(settings.getAsClass("cloud.azure.api.impl", AzureComputeServiceImpl.class))
.asEagerSingleton();
}
// If we have settings for azure repository, let's start the azure storage service
if (isSnapshotReady(settings, logger)) {
logger.debug("starting azure repository service");
bind(AzureStorageService.class)
.to(settings.getAsClass("repositories.azure.api.impl", AzureStorageServiceImpl.class))
.asEagerSingleton();
}
}
@ -63,7 +76,72 @@ public class AzureModule extends AbstractModule {
* Check if discovery is meant to start
* @return true if we can start discovery features
*/
public static boolean isDiscoveryReady(Settings settings) {
return (AzureDiscovery.AZURE.equalsIgnoreCase(settings.get("discovery.type")));
public static boolean isCloudReady(Settings settings) {
return (settings.getAsBoolean("cloud.enabled", true));
}
/**
* Check if discovery is meant to start
* @return true if we can start discovery features
*/
public static boolean isDiscoveryReady(Settings settings, ESLogger logger) {
// Cloud services are disabled
if (!isCloudReady(settings)) {
logger.trace("cloud settings are disabled");
return false;
}
// User set discovery.type: azure
if (!AzureDiscovery.AZURE.equalsIgnoreCase(settings.get("discovery.type"))) {
logger.trace("discovery.type not set to {}", AzureDiscovery.AZURE);
return false;
}
if (isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.SUBSCRIPTION_ID, logger) ||
isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.SERVICE_NAME, logger) ||
isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.KEYSTORE, logger) ||
isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.PASSWORD, logger)
) {
return false;
}
logger.trace("all required properties for azure discovery are set!");
return true;
}
/**
* Check if we have repository azure settings available
* @return true if we can use snapshot and restore
*/
public static boolean isSnapshotReady(Settings settings, ESLogger logger) {
// Cloud services are disabled
if (!isCloudReady(settings)) {
logger.trace("cloud settings are disabled");
return false;
}
if (isPropertyMissing(settings, "cloud.azure." + AzureStorageService.Fields.ACCOUNT, null) ||
isPropertyMissing(settings, "cloud.azure." + AzureStorageService.Fields.KEY, null)) {
logger.trace("azure repository is not set using {} and {} properties",
AzureStorageService.Fields.ACCOUNT,
AzureStorageService.Fields.KEY);
return false;
}
logger.trace("all required properties for azure repository are set!");
return true;
}
public static boolean isPropertyMissing(Settings settings, String name, ESLogger logger) throws ElasticsearchException {
if (!Strings.hasText(settings.get(name))) {
if (logger != null) {
logger.warn("{} is not set or is incorrect.", name);
}
return true;
}
return false;
}
}

View File

@ -23,16 +23,22 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.SettingsFilter;
/**
* Filtering cloud.azure.* settings
* Filtering cloud.azure.* and repositories.azure.* settings
*/
public class AzureSettingsFilter implements SettingsFilter.Filter {
@Override
public void filter(ImmutableSettings.Builder settings) {
// Cloud settings
settings.remove("cloud.certificate");
settings.remove("cloud.azure.keystore");
settings.remove("cloud.azure.password");
settings.remove("cloud.azure.subscription_id");
settings.remove("cloud.azure.service_name");
// Repositories settings
settings.remove("repositories.azure.account");
settings.remove("repositories.azure.key");
settings.remove("repositories.azure.container");
settings.remove("repositories.azure.base_path");
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
/**
* Azure Storage Service interface
* @see org.elasticsearch.cloud.azure.AzureStorageServiceImpl for Azure REST API implementation
*/
public interface AzureStorageService {
static public final class Fields {
public static final String ACCOUNT = "storage_account";
public static final String KEY = "storage_key";
public static final String CONTAINER = "container";
public static final String BASE_PATH = "base_path";
public static final String CONCURRENT_STREAMS = "concurrent_streams";
public static final String CHUNK_SIZE = "chunk_size";
public static final String COMPRESS = "compress";
}
boolean doesContainerExist(String container);
void removeContainer(String container) throws URISyntaxException, StorageException;
void createContainer(String container) throws URISyntaxException, StorageException;
void deleteFiles(String container, String path) throws URISyntaxException, StorageException, ServiceException;
boolean blobExists(String container, String blob) throws URISyntaxException, StorageException;
void deleteBlob(String container, String blob) throws URISyntaxException, StorageException;
InputStream getInputStream(String container, String blob) throws ServiceException;
ImmutableMap<String,BlobMetaData> listBlobsByPrefix(String container, String prefix) throws URISyntaxException, StorageException, ServiceException;
void putObject(String container, String blob, InputStream is, long length) throws URISyntaxException, StorageException, IOException;
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure;
import com.microsoft.windowsazure.services.blob.BlobConfiguration;
import com.microsoft.windowsazure.services.blob.BlobContract;
import com.microsoft.windowsazure.services.blob.BlobService;
import com.microsoft.windowsazure.services.blob.client.CloudBlobClient;
import com.microsoft.windowsazure.services.blob.client.CloudBlobContainer;
import com.microsoft.windowsazure.services.blob.client.CloudBlockBlob;
import com.microsoft.windowsazure.services.blob.client.ListBlobItem;
import com.microsoft.windowsazure.services.blob.models.BlobProperties;
import com.microsoft.windowsazure.services.blob.models.GetBlobResult;
import com.microsoft.windowsazure.services.blob.models.ListBlobsOptions;
import com.microsoft.windowsazure.services.blob.models.ListBlobsResult;
import com.microsoft.windowsazure.services.core.Configuration;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
/**
*
*/
public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureStorageServiceImpl>
implements AzureStorageService {
private final String account;
private final String key;
private final String blob;
private CloudStorageAccount storage_account;
private CloudBlobClient client;
private BlobContract service;
@Inject
public AzureStorageServiceImpl(Settings settings, SettingsFilter settingsFilter) {
super(settings);
settingsFilter.addFilter(new AzureSettingsFilter());
// We try to load storage API settings from `cloud.azure.`
account = settings.get("cloud.azure." + Fields.ACCOUNT);
key = settings.get("cloud.azure." + Fields.KEY);
blob = "http://" + account + ".blob.core.windows.net/";
try {
if (account != null) {
if (logger.isTraceEnabled()) logger.trace("creating new Azure storage client using account [{}], key [{}], blob [{}]",
account, key, blob);
String storageConnectionString =
"DefaultEndpointsProtocol=http;"
+ "AccountName="+ account +";"
+ "AccountKey=" + key;
Configuration configuration = Configuration.getInstance();
configuration.setProperty(BlobConfiguration.ACCOUNT_NAME, account);
configuration.setProperty(BlobConfiguration.ACCOUNT_KEY, key);
configuration.setProperty(BlobConfiguration.URI, blob);
service = BlobService.create(configuration);
storage_account = CloudStorageAccount.parse(storageConnectionString);
client = storage_account.createCloudBlobClient();
}
} catch (Exception e) {
// Can not start Azure Storage Client
logger.error("can not start azure storage client: {}", e.getMessage());
}
}
@Override
public boolean doesContainerExist(String container) {
try {
CloudBlobContainer blob_container = client.getContainerReference(container);
return blob_container.exists();
} catch (Exception e) {
logger.error("can not access container [{}]", container);
}
return false;
}
@Override
public void removeContainer(String container) throws URISyntaxException, StorageException {
CloudBlobContainer blob_container = client.getContainerReference(container);
blob_container.delete();
}
@Override
public void createContainer(String container) throws URISyntaxException, StorageException {
CloudBlobContainer blob_container = client.getContainerReference(container);
if (logger.isTraceEnabled()) {
logger.trace("creating container [{}]", container);
}
blob_container.createIfNotExist();
}
@Override
public void deleteFiles(String container, String path) throws URISyntaxException, StorageException, ServiceException {
if (logger.isTraceEnabled()) {
logger.trace("delete files container [{}], path [{}]",
container, path);
}
// Container name must be lower case.
CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) {
ListBlobsOptions options = new ListBlobsOptions();
options.setPrefix(path);
List<ListBlobsResult.BlobEntry> blobs = service.listBlobs(container, options).getBlobs();
for (ListBlobsResult.BlobEntry blob : blobs) {
if (logger.isTraceEnabled()) {
logger.trace("removing in container [{}], path [{}], blob [{}]",
container, path, blob.getName());
}
service.deleteBlob(container, blob.getName());
}
}
}
@Override
public boolean blobExists(String container, String blob) throws URISyntaxException, StorageException {
// Container name must be lower case.
CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) {
CloudBlockBlob azureBlob = blob_container.getBlockBlobReference(blob);
return azureBlob.exists();
}
return false;
}
@Override
public void deleteBlob(String container, String blob) throws URISyntaxException, StorageException {
if (logger.isTraceEnabled()) {
logger.trace("delete blob for container [{}], blob [{}]",
container, blob);
}
// Container name must be lower case.
CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) {
if (logger.isTraceEnabled()) {
logger.trace("blob found. removing.",
container, blob);
}
// TODO A REVOIR
CloudBlockBlob azureBlob = blob_container.getBlockBlobReference(blob);
azureBlob.delete();
}
}
@Override
public InputStream getInputStream(String container, String blob) throws ServiceException {
GetBlobResult blobResult = service.getBlob(container, blob);
return blobResult.getContentStream();
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String prefix) throws URISyntaxException, StorageException, ServiceException {
logger.debug("listBlobsByPrefix container [{}], prefix [{}]", container, prefix);
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
CloudBlobContainer blob_container = client.getContainerReference(container);
if (blob_container.exists()) {
Iterable<ListBlobItem> blobs = blob_container.listBlobs(prefix);
for (ListBlobItem blob : blobs) {
URI uri = blob.getUri();
if (logger.isTraceEnabled()) {
logger.trace("blob url [{}]", uri);
}
String blobpath = uri.getPath().substring(container.length() + 1);
BlobProperties properties = service.getBlobProperties(container, blobpath).getProperties();
String name = uri.getPath().substring(prefix.length());
if (logger.isTraceEnabled()) {
logger.trace("blob url [{}], name [{}], size [{}]", uri, name, properties.getContentLength());
}
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getContentLength()));
}
}
return blobsBuilder.build();
}
@Override
public void putObject(String container, String blobname, InputStream is, long length) throws URISyntaxException, StorageException, IOException {
if (logger.isTraceEnabled()) {
logger.trace("creating blob in container [{}], blob [{}], length [{}]",
container, blobname, length);
}
CloudBlockBlob blob = client.getContainerReference(container).getBlockBlobReference(blobname);
blob.upload(is, length);
}
@Override
protected void doStart() throws ElasticsearchException {
logger.debug("starting azure storage client instance");
}
@Override
protected void doStop() throws ElasticsearchException {
logger.debug("stopping azure storage client instance");
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.blobstore;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
/**
*
*/
public class AbstractAzureBlobContainer extends AbstractBlobContainer {
protected final ESLogger logger = ESLoggerFactory.getLogger(AbstractAzureBlobContainer.class.getName());
protected final AzureBlobStore blobStore;
protected final String keyPath;
public AbstractAzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path);
this.blobStore = blobStore;
String keyPath = path.buildAsString("/");
if (!keyPath.isEmpty()) {
keyPath = keyPath + "/";
}
this.keyPath = keyPath;
}
@Override
public boolean blobExists(String blobName) {
try {
return blobStore.client().blobExists(blobStore.container(), buildKey(blobName));
} catch (URISyntaxException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage());
} catch (StorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage());
}
return false;
}
@Override
public boolean deleteBlob(String blobName) throws IOException {
try {
blobStore.client().deleteBlob(blobStore.container(), buildKey(blobName));
return true;
} catch (URISyntaxException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage());
throw new IOException(e);
} catch (StorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage());
throw new IOException(e);
}
}
@Override
public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
InputStream is = null;
try {
is = blobStore.client().getInputStream(blobStore.container(), buildKey(blobName));
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
is.close();
listener.onCompleted();
} catch (Throwable e) {
IOUtils.closeWhileHandlingException(is);
listener.onFailure(e);
}
}
});
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
final String prefix;
if (blobNamePrefix != null) {
prefix = buildKey(blobNamePrefix);
} else {
prefix = keyPath;
}
try {
return blobStore.client().listBlobsByPrefix(blobStore.container(), prefix);
} catch (URISyntaxException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage());
throw new IOException(e);
} catch (StorageException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage());
throw new IOException(e);
} catch (ServiceException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage());
throw new IOException(e);
}
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}
protected String buildKey(String blobName) {
return keyPath + blobName;
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.blobstore;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.net.URISyntaxException;
import java.util.concurrent.Executor;
/**
*
*/
public class AzureBlobStore extends AbstractComponent implements BlobStore {
private final AzureStorageService client;
private final String container;
private final Executor executor;
private final int bufferSizeInBytes;
public AzureBlobStore(Settings settings, AzureStorageService client, String container, Executor executor) throws URISyntaxException, StorageException {
super(settings);
this.client = client;
this.container = container;
this.executor = executor;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
if (!client.doesContainerExist(container)) {
client.createContainer(container);
}
}
@Override
public String toString() {
return container;
}
public AzureStorageService client() {
return client;
}
public String container() {
return container;
}
public Executor executor() {
return executor;
}
public int bufferSizeInBytes() {
return bufferSizeInBytes;
}
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new AzureImmutableBlobContainer(path, this);
}
@Override
public void delete(BlobPath path) {
String keyPath = path.buildAsString("/");
if (!keyPath.isEmpty()) {
keyPath = keyPath + "/";
}
try {
client.deleteFiles(container, keyPath);
} catch (URISyntaxException e) {
logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage());
} catch (StorageException e) {
logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage());
} catch (ServiceException e) {
logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage());
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.blobstore;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
import java.io.IOException;
import java.io.InputStream;
/**
*
*/
public class AzureImmutableBlobContainer extends AbstractAzureBlobContainer implements ImmutableBlobContainer {
public AzureImmutableBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path, blobStore);
}
@Override
public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
try {
blobStore.client().putObject(blobStore.container(), buildKey(blobName), is, sizeInBytes);
listener.onCompleted();
} catch (Throwable e) {
listener.onFailure(e);
}
}
});
}
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
}
}

View File

@ -43,8 +43,8 @@ public class AzureDiscoveryModule extends ZenDiscoveryModule {
}
@Override
protected void bindDiscovery() {
if (AzureModule.isDiscoveryReady(settings)) {
bind(Discovery.class).to(AzureDiscovery.class).asEagerSingleton();
if (AzureModule.isDiscoveryReady(settings, logger)) {
bind(Discovery.class).to(AzureDiscovery.class).asEagerSingleton();
} else {
logger.debug("disabling azure discovery features");
}

View File

@ -22,8 +22,13 @@ package org.elasticsearch.plugin.cloud.azure;
import org.elasticsearch.cloud.azure.AzureModule;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.azure.AzureRepository;
import org.elasticsearch.repositories.azure.AzureRepositoryModule;
import java.util.Collection;
@ -33,6 +38,7 @@ import java.util.Collection;
public class CloudAzurePlugin extends AbstractPlugin {
private final Settings settings;
private final ESLogger logger = ESLoggerFactory.getLogger(CloudAzurePlugin.class.getName());
public CloudAzurePlugin(Settings settings) {
this.settings = settings;
@ -51,9 +57,17 @@ public class CloudAzurePlugin extends AbstractPlugin {
@Override
public Collection<Class<? extends Module>> modules() {
Collection<Class<? extends Module>> modules = Lists.newArrayList();
if (settings.getAsBoolean("cloud.enabled", true)) {
if (AzureModule.isCloudReady(settings)) {
modules.add(AzureModule.class);
}
return modules;
}
@Override
public void processModule(Module module) {
if (AzureModule.isSnapshotReady(settings, logger)
&& module instanceof RepositoriesModule) {
((RepositoriesModule)module).registerRepository(AzureRepository.TYPE, AzureRepositoryModule.class);
}
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Azure file system implementation of the BlobStoreRepository
* <p/>
* Azure file system repository supports the following settings:
* <dl>
* <dt>{@code container}</dt><dd>Azure container name. Defaults to elasticsearch-snapshots</dd>
* <dt>{@code base_path}</dt><dd>Specifies the path within bucket to repository data. Defaults to root directory.</dd>
* <dt>{@code concurrent_streams}</dt><dd>Number of concurrent read/write stream (per repository on each node). Defaults to 5.</dd>
* <dt>{@code chunk_size}</dt><dd>Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to 64mb.</dd>
* <dt>{@code compress}</dt><dd>If set to true metadata files will be stored compressed. Defaults to false.</dd>
* </dl>
*/
public class AzureRepository extends BlobStoreRepository {
public final static String TYPE = "azure";
public final static String CONTAINER_DEFAULT = "elasticsearch-snapshots";
private final AzureBlobStore blobStore;
private final BlobPath basePath;
private ByteSizeValue chunkSize;
private boolean compress;
/**
* Constructs new shared file system repository
*
* @param name repository name
* @param repositorySettings repository settings
* @param indexShardRepository index shard repository
* @param azureStorageService Azure Storage service
* @throws java.io.IOException
*/
@Inject
public AzureRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AzureStorageService azureStorageService) throws IOException, URISyntaxException, StorageException {
super(name.getName(), repositorySettings, indexShardRepository);
String container = repositorySettings.settings().get(AzureStorageService.Fields.CONTAINER,
componentSettings.get(AzureStorageService.Fields.CONTAINER, CONTAINER_DEFAULT));
int concurrentStreams = repositorySettings.settings().getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS,
componentSettings.getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS, 5));
ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[azure_stream]"));
this.blobStore = new AzureBlobStore(settings, azureStorageService, container, concurrentStreamPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE,
componentSettings.getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(64, ByteSizeUnit.MB)));
if (this.chunkSize.getMb() > 64) {
logger.warn("azure repository does not support yet size > 64mb. Fall back to 64mb.");
this.chunkSize = new ByteSizeValue(64, ByteSizeUnit.MB);
}
this.compress = repositorySettings.settings().getAsBoolean(AzureStorageService.Fields.COMPRESS,
componentSettings.getAsBoolean(AzureStorageService.Fields.COMPRESS, false));
String basePath = repositorySettings.settings().get(AzureStorageService.Fields.BASE_PATH, null);
if (Strings.hasLength(basePath)) {
// Remove starting / if any
basePath = Strings.trimLeadingCharacter(basePath, '/');
BlobPath path = new BlobPath();
for(String elem : Strings.splitStringToArray(basePath, '/')) {
path = path.add(elem);
}
this.basePath = path;
} else {
this.basePath = BlobPath.cleanPath();
}
logger.debug("using container [{}], chunk_size [{}], concurrent_streams [{}], compress [{}], base_path [{}]",
container, chunkSize, concurrentStreams, compress, basePath);
}
/**
* {@inheritDoc}
*/
@Override
protected BlobStore blobStore() {
return blobStore;
}
@Override
protected BlobPath basePath() {
return basePath;
}
/**
* {@inheritDoc}
*/
@Override
protected boolean isCompress() {
return compress;
}
/**
* {@inheritDoc}
*/
@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.cloud.azure.AzureModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.Repository;
/**
* Azure repository module
*/
public class AzureRepositoryModule extends AbstractModule {
protected final ESLogger logger;
private Settings settings;
@Inject
public AzureRepositoryModule(Settings settings) {
super();
this.logger = Loggers.getLogger(getClass(), settings);
this.settings = settings;
}
/**
* {@inheritDoc}
*/
@Override
protected void configure() {
if (AzureModule.isSnapshotReady(settings, logger)) {
bind(Repository.class).to(AzureRepository.class).asEagerSingleton();
bind(IndexShardRepository.class).to(BlobStoreIndexShardRepository.class).asEagerSingleton();
} else {
logger.debug("disabling azure snapshot and restore features");
}
}
}

View File

@ -1,4 +1,4 @@
# Licensed to ElasticSearch under one or more contributor
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with this work for additional
# information regarding copyright ownership. ElasticSearch licenses this file to you
# under the Apache License, Version 2.0 (the "License"); you may not use this

View File

@ -48,6 +48,12 @@ public abstract class AbstractAzureTest extends ElasticsearchIntegrationTest {
discovery:
type: azure
repositories:
azure:
account: "yourstorageaccount"
key: "storage key"
container: "container name"
* </pre>
*/
@Documented

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.cloud.azure.AbstractAzureTest;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.junit.After;
import org.junit.Before;
import java.net.URISyntaxException;
public abstract class AbstractAzureRepositoryServiceTest extends AbstractAzureTest {
protected String basePath;
private Class<? extends AzureStorageService> mock;
public AbstractAzureRepositoryServiceTest(Class<? extends AzureStorageService> mock,
String basePath) {
// We want to inject the Azure API Mock
this.mock = mock;
this.basePath = basePath;
}
/**
* Deletes repositories, supports wildcard notation.
*/
public static void wipeRepositories(String... repositories) {
// if nothing is provided, delete all
if (repositories.length == 0) {
repositories = new String[]{"*"};
}
for (String repository : repositories) {
try {
client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet();
} catch (RepositoryMissingException ex) {
// ignore
}
}
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder()
.put("cloud.azure." + AzureStorageService.Fields.ACCOUNT, "mock_azure_account")
.put("cloud.azure." + AzureStorageService.Fields.KEY, "mock_azure_key")
.put("repositories.azure.api.impl", mock)
.put("repositories.azure.container", "snapshots");
return builder.build();
}
@Override
public Settings indexSettings() {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
// checksum file to be written twice during restore operation
return ImmutableSettings.builder().put(super.indexSettings())
.put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
}
@Before @After
public final void wipe() throws StorageException, ServiceException, URISyntaxException {
wipeRepositories();
cleanRepositoryFiles(basePath);
}
/**
* Purge the test container
*/
public void cleanRepositoryFiles(String path) throws StorageException, ServiceException, URISyntaxException {
String container = cluster().getInstance(Settings.class).get("repositories.azure.container");
logger.info("--> remove blobs in container [{}]", container);
AzureStorageService client = cluster().getInstance(AzureStorageService.class);
client.deleteFiles(container, path);
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cloud.azure.AbstractAzureTest;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.URISyntaxException;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
/**
* This test needs Azure to run and -Dtests.azure=true to be set
* and -Des.config=/path/to/elasticsearch.yml
* @see org.elasticsearch.cloud.azure.AbstractAzureTest
*/
@AbstractAzureTest.AzureTest
@ElasticsearchIntegrationTest.ClusterScope(
scope = ElasticsearchIntegrationTest.Scope.SUITE,
numNodes = 2,
transportClientRatio = 0.0)
public class AzureSnapshotRestoreITest extends AbstractAzureTest {
private final String basePath;
public AzureSnapshotRestoreITest() {
basePath = "/snapshot-itest/repo-" + randomInt();
}
@Override
public Settings indexSettings() {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
// checksum file to be written twice during restore operation
return ImmutableSettings.builder().put(super.indexSettings())
.put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)
.build();
}
@Before
public final void wipeBefore() throws StorageException, ServiceException, URISyntaxException {
wipeRepositories();
cleanRepositoryFiles(basePath);
}
@After
public final void wipeAfter() throws StorageException, ServiceException, URISyntaxException {
wipeRepositories();
cleanRepositoryFiles(basePath);
}
@Test
public void testSimpleWorkflow() {
Client client = client();
logger.info("--> creating azure repository with path [{}]", basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration")
.put(AzureStorageService.Fields.BASE_PATH, basePath)
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
@Test
public void testMultipleRepositories() {
Client client = client();
logger.info("--> creating azure repository with path [{}]", basePath);
PutRepositoryResponse putRepositoryResponse1 = client.admin().cluster().preparePutRepository("test-repo1")
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration1")
.put(AzureStorageService.Fields.BASE_PATH, basePath)
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
).get();
assertThat(putRepositoryResponse1.isAcknowledged(), equalTo(true));
PutRepositoryResponse putRepositoryResponse2 = client.admin().cluster().preparePutRepository("test-repo2")
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration2")
.put(AzureStorageService.Fields.BASE_PATH, basePath)
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
).get();
assertThat(putRepositoryResponse2.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
logger.info("--> snapshot 1");
CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot("test-repo1", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").get();
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards()));
logger.info("--> snapshot 2");
CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot("test-repo2", "test-snap").setWaitForCompletion(true).setIndices("test-idx-2").get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo1").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo2").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
// Test restore after index deletion
logger.info("--> delete indices");
wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion from snapshot 1");
RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin().cluster().prepareRestoreSnapshot("test-repo1", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").execute().actionGet();
assertThat(restoreSnapshotResponse1.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
logger.info("--> restore other index after deletion from snapshot 2");
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin().cluster().prepareRestoreSnapshot("test-repo2", "test-snap").setWaitForCompletion(true).setIndices("test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse2.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(true));
}
/**
* Deletes repositories, supports wildcard notation.
*/
public static void wipeRepositories(String... repositories) {
// if nothing is provided, delete all
if (repositories.length == 0) {
repositories = new String[]{"*"};
}
for (String repository : repositories) {
try {
client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet();
} catch (RepositoryMissingException ex) {
// ignore
}
}
}
/**
* Purge the test container
*/
public void cleanRepositoryFiles(String path) throws StorageException, ServiceException, URISyntaxException {
String container = cluster().getInstance(Settings.class).get("repositories.azure.container",
AzureRepository.CONTAINER_DEFAULT);
logger.info("--> remove blobs in container [{}], path [{}]", container, path);
AzureStorageService client = cluster().getInstance(AzureStorageService.class);
// Remove starting / if any
path = Strings.trimLeadingCharacter(path, '/');
client.deleteFiles(container, path);
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ElasticsearchIntegrationTest.ClusterScope(
scope = ElasticsearchIntegrationTest.Scope.SUITE,
numNodes = 1,
transportClientRatio = 0.0)
public class AzureSnapshotRestoreTest extends AbstractAzureRepositoryServiceTest {
public AzureSnapshotRestoreTest() {
super(AzureStorageServiceMock.class, "/snapshot-test/repo-" + randomInt());
}
@Test
public void testSimpleWorkflow() {
Client client = client();
logger.info("--> creating azure repository with path [{}]", basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
.put("base_path", basePath)
.put("chunk_size", randomIntBetween(1000, 10000))
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* In memory storage for unit tests
*/
public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureStorageServiceMock>
implements AzureStorageService {
protected Map<String, byte[]> blobs = new ConcurrentHashMap<String, byte[]>();
@Inject
protected AzureStorageServiceMock(Settings settings) {
super(settings);
}
@Override
public boolean doesContainerExist(String container) {
return true;
}
@Override
public void removeContainer(String container) {
}
@Override
public void createContainer(String container) {
}
@Override
public void deleteFiles(String container, String path) {
}
@Override
public boolean blobExists(String container, String blob) {
return blobs.containsKey(blob);
}
@Override
public void deleteBlob(String container, String blob) {
blobs.remove(blob);
}
@Override
public InputStream getInputStream(String container, String blob) {
return new ByteArrayInputStream(blobs.get(blob));
}
@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String prefix) {
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
for (String blobName : blobs.keySet()) {
if (Strings.startsWithIgnoreCase(blobName, prefix)) {
blobsBuilder.put(blobName, new PlainBlobMetaData(blobName, blobs.get(blobName).length));
}
}
ImmutableMap<String, BlobMetaData> map = blobsBuilder.build();
return map;
}
@Override
public void putObject(String container, String blob, InputStream is, long length) {
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[65535];
while ((nRead = is.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
buffer.flush();
blobs.put(blob, buffer.toByteArray());
} catch (IOException e) {
}
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -1,4 +1,4 @@
# Licensed to ElasticSearch under one or more contributor
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with this work for additional
# information regarding copyright ownership. ElasticSearch licenses this file to you
# under the Apache License, Version 2.0 (the "License"); you may not use this
@ -25,6 +25,16 @@
# password: YOUR-PASSWORD
# subscription_id: YOUR-AZURE-SUBSCRIPTION-ID
# service_name: YOUR-AZURE-SERVICE-NAME
# storage_account: "YOUR-AZURE-STORAGE-NAME"
# storage_key: "YOUR-AZURE-STORAGE-KEY"
#
# discovery:
# type: azure
#
# repositories:
# azure:
# container: "NAME-OF-CONTAINER-USED-FOR-SNAPSHOTS" #optional default to "elasticsearch-snapshots"
# base_path: "path/to/snapshots" #optional default to empty
# concurrent_streams: 5 #optional default to 5
# chunk_size: 64mb #optional default to "64mb"
# compress: false #optional default to false

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!-- Licensed to ElasticSearch under one or more contributor
<!-- Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. ElasticSearch licenses this file to you
under the Apache License, Version 2.0 (the "License"); you may not use this
@ -30,9 +30,12 @@ governing permissions and limitations under the License. -->
<logger name="org.elasticsearch.discovery.azure">
<level value="trace" />
</logger>
<logger name="org.elasticsearch.repositories.azure">
<level value="trace" />
</logger>
<root>
<priority value="trace" />
<priority value="info" />
<appender-ref ref="console" />
</root>