diff --git a/README.md b/README.md index 46a81081b5a..3f5528ae431 100644 --- a/README.md +++ b/README.md @@ -335,6 +335,82 @@ If you want to remove your running instances: azure vm delete myesnode1 ``` +Azure Repository +================ + +To enable Azure repositories, you have first to set your azure storage settings: + +``` + cloud: + azure: + storage_account: your_azure_storage_account + storage_key: your_azure_storage_key +``` + +The Azure repository supports following settings: + +* `container`: Container name. Defaults to `elasticsearch-snapshots` +* `base_path`: Specifies the path within container to repository data. Defaults to empty (root directory). +* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`. +* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified +in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `64m` (64m max) +* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index +files that are already compressed by default. Defaults to `false`. + +Some examples, using scripts: + +```sh +# The simpliest one +$ curl -XPUT 'http://localhost:9200/_snapshot/my_backup1' -d '{ + "type": "azure" +}' + +# With some settings +$ curl -XPUT 'http://localhost:9200/_snapshot/my_backup2' -d '{ + "type": "azure", + "settings": { + "container": "backup_container", + "base_path": "backups", + "concurrent_streams": 2, + "chunk_size": "32m", + "compress": true + } +}' +``` + +Example using Java: + +```java +client.admin().cluster().preparePutRepository("my_backup3") + .setType("azure").setSettings(ImmutableSettings.settingsBuilder() + .put(AzureStorageService.Fields.CONTAINER, "backup_container") + .put(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(32, ByteSizeUnit.MB)) + ).get(); +``` + + +Testing +------- + +Integrations tests in this plugin require working Azure configuration and therefore disabled by default. +To enable tests prepare a config file elasticsearch.yml with the following content: + +``` + repositories: + azure: + account: "YOUR-AZURE-STORAGE-NAME" + key: "YOUR-AZURE-STORAGE-KEY" +``` + +Replaces `account`, `key` with your settings. Please, note that the test will delete all snapshot/restore related files in the specified bucket. + +To run test: + +```sh +mvn -Dtests.azure=true -Des.config=/path/to/config/file/elasticsearch.yml clean test +``` + + License ------- diff --git a/pom.xml b/pom.xml index fcda58e69e3..cf31508e264 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,11 @@ governing permissions and limitations under the License. --> elasticsearch ${elasticsearch.version} + + com.microsoft.windowsazure + microsoft-windowsazure-api + 0.4.5 + log4j log4j diff --git a/src/main/assemblies/plugin.xml b/src/main/assemblies/plugin.xml index 634831c235c..b18ecaff3a0 100644 --- a/src/main/assemblies/plugin.xml +++ b/src/main/assemblies/plugin.xml @@ -1,5 +1,5 @@ - org.elasticsearch:elasticsearch + + / + true + true + + com.microsoft.windowsazure:microsoft-windowsazure-api + + diff --git a/src/main/java/org/elasticsearch/cloud/azure/AzureComputeServiceImpl.java b/src/main/java/org/elasticsearch/cloud/azure/AzureComputeServiceImpl.java index 60428c35873..cb74062723e 100644 --- a/src/main/java/org/elasticsearch/cloud/azure/AzureComputeServiceImpl.java +++ b/src/main/java/org/elasticsearch/cloud/azure/AzureComputeServiceImpl.java @@ -20,11 +20,9 @@ package org.elasticsearch.cloud.azure; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsFilter; import org.w3c.dom.Document; import org.w3c.dom.Node; @@ -84,12 +82,7 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent *
  • If needed this module will bind azure discovery service by default * to AzureComputeServiceImpl.
  • + *
  • If needed this module will bind azure repository service by default + * to AzureStorageServiceImpl.
  • * * * @see org.elasticsearch.cloud.azure.AzureComputeServiceImpl + * @see org.elasticsearch.cloud.azure.AzureStorageServiceImpl */ public class AzureModule extends AbstractModule { protected final ESLogger logger; @@ -51,10 +56,18 @@ public class AzureModule extends AbstractModule { logger.debug("starting azure services"); // If we have set discovery to azure, let's start the azure compute service - if (isDiscoveryReady(settings)) { + if (isDiscoveryReady(settings, logger)) { logger.debug("starting azure discovery service"); - bind(AzureComputeService.class) - .to(settings.getAsClass("cloud.azure.api.impl", AzureComputeServiceImpl.class)) + bind(AzureComputeService.class) + .to(settings.getAsClass("cloud.azure.api.impl", AzureComputeServiceImpl.class)) + .asEagerSingleton(); + } + + // If we have settings for azure repository, let's start the azure storage service + if (isSnapshotReady(settings, logger)) { + logger.debug("starting azure repository service"); + bind(AzureStorageService.class) + .to(settings.getAsClass("repositories.azure.api.impl", AzureStorageServiceImpl.class)) .asEagerSingleton(); } } @@ -63,7 +76,72 @@ public class AzureModule extends AbstractModule { * Check if discovery is meant to start * @return true if we can start discovery features */ - public static boolean isDiscoveryReady(Settings settings) { - return (AzureDiscovery.AZURE.equalsIgnoreCase(settings.get("discovery.type"))); + public static boolean isCloudReady(Settings settings) { + return (settings.getAsBoolean("cloud.enabled", true)); } + + /** + * Check if discovery is meant to start + * @return true if we can start discovery features + */ + public static boolean isDiscoveryReady(Settings settings, ESLogger logger) { + // Cloud services are disabled + if (!isCloudReady(settings)) { + logger.trace("cloud settings are disabled"); + return false; + } + + // User set discovery.type: azure + if (!AzureDiscovery.AZURE.equalsIgnoreCase(settings.get("discovery.type"))) { + logger.trace("discovery.type not set to {}", AzureDiscovery.AZURE); + return false; + } + + if (isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.SUBSCRIPTION_ID, logger) || + isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.SERVICE_NAME, logger) || + isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.KEYSTORE, logger) || + isPropertyMissing(settings, "cloud.azure." + AzureComputeService.Fields.PASSWORD, logger) + ) { + return false; + } + + logger.trace("all required properties for azure discovery are set!"); + + return true; + } + + /** + * Check if we have repository azure settings available + * @return true if we can use snapshot and restore + */ + public static boolean isSnapshotReady(Settings settings, ESLogger logger) { + // Cloud services are disabled + if (!isCloudReady(settings)) { + logger.trace("cloud settings are disabled"); + return false; + } + + if (isPropertyMissing(settings, "cloud.azure." + AzureStorageService.Fields.ACCOUNT, null) || + isPropertyMissing(settings, "cloud.azure." + AzureStorageService.Fields.KEY, null)) { + logger.trace("azure repository is not set using {} and {} properties", + AzureStorageService.Fields.ACCOUNT, + AzureStorageService.Fields.KEY); + return false; + } + + logger.trace("all required properties for azure repository are set!"); + + return true; + } + + public static boolean isPropertyMissing(Settings settings, String name, ESLogger logger) throws ElasticsearchException { + if (!Strings.hasText(settings.get(name))) { + if (logger != null) { + logger.warn("{} is not set or is incorrect.", name); + } + return true; + } + return false; + } + } diff --git a/src/main/java/org/elasticsearch/cloud/azure/AzureSettingsFilter.java b/src/main/java/org/elasticsearch/cloud/azure/AzureSettingsFilter.java index e56bb03c993..f8911ae8830 100644 --- a/src/main/java/org/elasticsearch/cloud/azure/AzureSettingsFilter.java +++ b/src/main/java/org/elasticsearch/cloud/azure/AzureSettingsFilter.java @@ -23,16 +23,22 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.SettingsFilter; /** - * Filtering cloud.azure.* settings + * Filtering cloud.azure.* and repositories.azure.* settings */ public class AzureSettingsFilter implements SettingsFilter.Filter { @Override public void filter(ImmutableSettings.Builder settings) { // Cloud settings - settings.remove("cloud.certificate"); + settings.remove("cloud.azure.keystore"); settings.remove("cloud.azure.password"); settings.remove("cloud.azure.subscription_id"); settings.remove("cloud.azure.service_name"); + + // Repositories settings + settings.remove("repositories.azure.account"); + settings.remove("repositories.azure.key"); + settings.remove("repositories.azure.container"); + settings.remove("repositories.azure.base_path"); } } diff --git a/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java new file mode 100644 index 00000000000..f346348ed88 --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.azure; + +import com.microsoft.windowsazure.services.core.ServiceException; +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.collect.ImmutableMap; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; + +/** + * Azure Storage Service interface + * @see org.elasticsearch.cloud.azure.AzureStorageServiceImpl for Azure REST API implementation + */ +public interface AzureStorageService { + static public final class Fields { + public static final String ACCOUNT = "storage_account"; + public static final String KEY = "storage_key"; + public static final String CONTAINER = "container"; + public static final String BASE_PATH = "base_path"; + public static final String CONCURRENT_STREAMS = "concurrent_streams"; + public static final String CHUNK_SIZE = "chunk_size"; + public static final String COMPRESS = "compress"; + } + + boolean doesContainerExist(String container); + + void removeContainer(String container) throws URISyntaxException, StorageException; + + void createContainer(String container) throws URISyntaxException, StorageException; + + void deleteFiles(String container, String path) throws URISyntaxException, StorageException, ServiceException; + + boolean blobExists(String container, String blob) throws URISyntaxException, StorageException; + + void deleteBlob(String container, String blob) throws URISyntaxException, StorageException; + + InputStream getInputStream(String container, String blob) throws ServiceException; + + ImmutableMap listBlobsByPrefix(String container, String prefix) throws URISyntaxException, StorageException, ServiceException; + + void putObject(String container, String blob, InputStream is, long length) throws URISyntaxException, StorageException, IOException; + + +} diff --git a/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java new file mode 100644 index 00000000000..c396aaae679 --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java @@ -0,0 +1,239 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.azure; + +import com.microsoft.windowsazure.services.blob.BlobConfiguration; +import com.microsoft.windowsazure.services.blob.BlobContract; +import com.microsoft.windowsazure.services.blob.BlobService; +import com.microsoft.windowsazure.services.blob.client.CloudBlobClient; +import com.microsoft.windowsazure.services.blob.client.CloudBlobContainer; +import com.microsoft.windowsazure.services.blob.client.CloudBlockBlob; +import com.microsoft.windowsazure.services.blob.client.ListBlobItem; +import com.microsoft.windowsazure.services.blob.models.BlobProperties; +import com.microsoft.windowsazure.services.blob.models.GetBlobResult; +import com.microsoft.windowsazure.services.blob.models.ListBlobsOptions; +import com.microsoft.windowsazure.services.blob.models.ListBlobsResult; +import com.microsoft.windowsazure.services.core.Configuration; +import com.microsoft.windowsazure.services.core.ServiceException; +import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount; +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +/** + * + */ +public class AzureStorageServiceImpl extends AbstractLifecycleComponent + implements AzureStorageService { + + private final String account; + private final String key; + private final String blob; + + private CloudStorageAccount storage_account; + private CloudBlobClient client; + private BlobContract service; + + + @Inject + public AzureStorageServiceImpl(Settings settings, SettingsFilter settingsFilter) { + super(settings); + settingsFilter.addFilter(new AzureSettingsFilter()); + + // We try to load storage API settings from `cloud.azure.` + account = settings.get("cloud.azure." + Fields.ACCOUNT); + key = settings.get("cloud.azure." + Fields.KEY); + blob = "http://" + account + ".blob.core.windows.net/"; + + try { + if (account != null) { + if (logger.isTraceEnabled()) logger.trace("creating new Azure storage client using account [{}], key [{}], blob [{}]", + account, key, blob); + + String storageConnectionString = + "DefaultEndpointsProtocol=http;" + + "AccountName="+ account +";" + + "AccountKey=" + key; + + Configuration configuration = Configuration.getInstance(); + configuration.setProperty(BlobConfiguration.ACCOUNT_NAME, account); + configuration.setProperty(BlobConfiguration.ACCOUNT_KEY, key); + configuration.setProperty(BlobConfiguration.URI, blob); + service = BlobService.create(configuration); + + storage_account = CloudStorageAccount.parse(storageConnectionString); + client = storage_account.createCloudBlobClient(); + } + } catch (Exception e) { + // Can not start Azure Storage Client + logger.error("can not start azure storage client: {}", e.getMessage()); + } + } + + @Override + public boolean doesContainerExist(String container) { + try { + CloudBlobContainer blob_container = client.getContainerReference(container); + return blob_container.exists(); + } catch (Exception e) { + logger.error("can not access container [{}]", container); + } + return false; + } + + @Override + public void removeContainer(String container) throws URISyntaxException, StorageException { + CloudBlobContainer blob_container = client.getContainerReference(container); + blob_container.delete(); + } + + @Override + public void createContainer(String container) throws URISyntaxException, StorageException { + CloudBlobContainer blob_container = client.getContainerReference(container); + if (logger.isTraceEnabled()) { + logger.trace("creating container [{}]", container); + } + blob_container.createIfNotExist(); + } + + @Override + public void deleteFiles(String container, String path) throws URISyntaxException, StorageException, ServiceException { + if (logger.isTraceEnabled()) { + logger.trace("delete files container [{}], path [{}]", + container, path); + } + + // Container name must be lower case. + CloudBlobContainer blob_container = client.getContainerReference(container); + if (blob_container.exists()) { + ListBlobsOptions options = new ListBlobsOptions(); + options.setPrefix(path); + + List blobs = service.listBlobs(container, options).getBlobs(); + for (ListBlobsResult.BlobEntry blob : blobs) { + if (logger.isTraceEnabled()) { + logger.trace("removing in container [{}], path [{}], blob [{}]", + container, path, blob.getName()); + } + service.deleteBlob(container, blob.getName()); + } + } + } + + @Override + public boolean blobExists(String container, String blob) throws URISyntaxException, StorageException { + // Container name must be lower case. + CloudBlobContainer blob_container = client.getContainerReference(container); + if (blob_container.exists()) { + CloudBlockBlob azureBlob = blob_container.getBlockBlobReference(blob); + return azureBlob.exists(); + } + + return false; + } + + @Override + public void deleteBlob(String container, String blob) throws URISyntaxException, StorageException { + if (logger.isTraceEnabled()) { + logger.trace("delete blob for container [{}], blob [{}]", + container, blob); + } + + // Container name must be lower case. + CloudBlobContainer blob_container = client.getContainerReference(container); + if (blob_container.exists()) { + if (logger.isTraceEnabled()) { + logger.trace("blob found. removing.", + container, blob); + } + // TODO A REVOIR + CloudBlockBlob azureBlob = blob_container.getBlockBlobReference(blob); + azureBlob.delete(); + } + } + + @Override + public InputStream getInputStream(String container, String blob) throws ServiceException { + GetBlobResult blobResult = service.getBlob(container, blob); + return blobResult.getContentStream(); + } + + @Override + public ImmutableMap listBlobsByPrefix(String container, String prefix) throws URISyntaxException, StorageException, ServiceException { + logger.debug("listBlobsByPrefix container [{}], prefix [{}]", container, prefix); + ImmutableMap.Builder blobsBuilder = ImmutableMap.builder(); + + CloudBlobContainer blob_container = client.getContainerReference(container); + if (blob_container.exists()) { + Iterable blobs = blob_container.listBlobs(prefix); + for (ListBlobItem blob : blobs) { + URI uri = blob.getUri(); + if (logger.isTraceEnabled()) { + logger.trace("blob url [{}]", uri); + } + String blobpath = uri.getPath().substring(container.length() + 1); + BlobProperties properties = service.getBlobProperties(container, blobpath).getProperties(); + String name = uri.getPath().substring(prefix.length()); + if (logger.isTraceEnabled()) { + logger.trace("blob url [{}], name [{}], size [{}]", uri, name, properties.getContentLength()); + } + blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getContentLength())); + } + } + + return blobsBuilder.build(); + } + + @Override + public void putObject(String container, String blobname, InputStream is, long length) throws URISyntaxException, StorageException, IOException { + if (logger.isTraceEnabled()) { + logger.trace("creating blob in container [{}], blob [{}], length [{}]", + container, blobname, length); + } + CloudBlockBlob blob = client.getContainerReference(container).getBlockBlobReference(blobname); + blob.upload(is, length); + } + + @Override + protected void doStart() throws ElasticsearchException { + logger.debug("starting azure storage client instance"); + } + + @Override + protected void doStop() throws ElasticsearchException { + logger.debug("stopping azure storage client instance"); + } + + @Override + protected void doClose() throws ElasticsearchException { + } +} diff --git a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AbstractAzureBlobContainer.java b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AbstractAzureBlobContainer.java new file mode 100644 index 00000000000..2d6bac608b3 --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AbstractAzureBlobContainer.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.azure.blobstore; + +import com.microsoft.windowsazure.services.core.ServiceException; +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; + +/** + * + */ +public class AbstractAzureBlobContainer extends AbstractBlobContainer { + + protected final ESLogger logger = ESLoggerFactory.getLogger(AbstractAzureBlobContainer.class.getName()); + protected final AzureBlobStore blobStore; + + protected final String keyPath; + + public AbstractAzureBlobContainer(BlobPath path, AzureBlobStore blobStore) { + super(path); + this.blobStore = blobStore; + String keyPath = path.buildAsString("/"); + if (!keyPath.isEmpty()) { + keyPath = keyPath + "/"; + } + this.keyPath = keyPath; + } + + @Override + public boolean blobExists(String blobName) { + try { + return blobStore.client().blobExists(blobStore.container(), buildKey(blobName)); + } catch (URISyntaxException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage()); + } catch (StorageException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage()); + } + return false; + } + + @Override + public boolean deleteBlob(String blobName) throws IOException { + try { + blobStore.client().deleteBlob(blobStore.container(), buildKey(blobName)); + return true; + } catch (URISyntaxException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage()); + throw new IOException(e); + } catch (StorageException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore.container(), e.getMessage()); + throw new IOException(e); + } + } + + @Override + public void readBlob(final String blobName, final ReadBlobListener listener) { + blobStore.executor().execute(new Runnable() { + @Override + public void run() { + InputStream is = null; + try { + is = blobStore.client().getInputStream(blobStore.container(), buildKey(blobName)); + byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + listener.onPartial(buffer, 0, bytesRead); + } + is.close(); + listener.onCompleted(); + } catch (Throwable e) { + IOUtils.closeWhileHandlingException(is); + listener.onFailure(e); + } + } + }); + } + + @Override + public ImmutableMap listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { + final String prefix; + if (blobNamePrefix != null) { + prefix = buildKey(blobNamePrefix); + } else { + prefix = keyPath; + } + + try { + return blobStore.client().listBlobsByPrefix(blobStore.container(), prefix); + } catch (URISyntaxException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage()); + throw new IOException(e); + } catch (StorageException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage()); + throw new IOException(e); + } catch (ServiceException e) { + logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage()); + throw new IOException(e); + } + } + + @Override + public ImmutableMap listBlobs() throws IOException { + return listBlobsByPrefix(null); + } + + protected String buildKey(String blobName) { + return keyPath + blobName; + } +} diff --git a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java new file mode 100644 index 00000000000..3958e94a956 --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.azure.blobstore; + +import com.microsoft.windowsazure.services.core.ServiceException; +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.elasticsearch.cloud.azure.AzureStorageService; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.net.URISyntaxException; +import java.util.concurrent.Executor; + +/** + * + */ +public class AzureBlobStore extends AbstractComponent implements BlobStore { + + private final AzureStorageService client; + + private final String container; + + private final Executor executor; + + private final int bufferSizeInBytes; + + public AzureBlobStore(Settings settings, AzureStorageService client, String container, Executor executor) throws URISyntaxException, StorageException { + super(settings); + this.client = client; + this.container = container; + this.executor = executor; + + this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); + + if (!client.doesContainerExist(container)) { + client.createContainer(container); + } + } + + @Override + public String toString() { + return container; + } + + public AzureStorageService client() { + return client; + } + + public String container() { + return container; + } + + public Executor executor() { + return executor; + } + + public int bufferSizeInBytes() { + return bufferSizeInBytes; + } + + @Override + public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { + return new AzureImmutableBlobContainer(path, this); + } + + @Override + public void delete(BlobPath path) { + String keyPath = path.buildAsString("/"); + if (!keyPath.isEmpty()) { + keyPath = keyPath + "/"; + } + + try { + client.deleteFiles(container, keyPath); + } catch (URISyntaxException e) { + logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage()); + } catch (StorageException e) { + logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage()); + } catch (ServiceException e) { + logger.warn("can not remove [{}] in container {{}}: {}", keyPath, container, e.getMessage()); + } + } + + @Override + public void close() { + } +} diff --git a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureImmutableBlobContainer.java b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureImmutableBlobContainer.java new file mode 100644 index 00000000000..2b4f80ba88b --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureImmutableBlobContainer.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.azure.blobstore; + +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.blobstore.support.BlobStores; + +import java.io.IOException; +import java.io.InputStream; + +/** + * + */ +public class AzureImmutableBlobContainer extends AbstractAzureBlobContainer implements ImmutableBlobContainer { + + public AzureImmutableBlobContainer(BlobPath path, AzureBlobStore blobStore) { + super(path, blobStore); + } + + @Override + public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { + blobStore.executor().execute(new Runnable() { + @Override + public void run() { + try { + blobStore.client().putObject(blobStore.container(), buildKey(blobName), is, sizeInBytes); + listener.onCompleted(); + } catch (Throwable e) { + listener.onFailure(e); + } + } + }); + } + + @Override + public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { + BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes); + } +} diff --git a/src/main/java/org/elasticsearch/discovery/azure/AzureDiscoveryModule.java b/src/main/java/org/elasticsearch/discovery/azure/AzureDiscoveryModule.java index 8ebb674f432..490ee61719f 100644 --- a/src/main/java/org/elasticsearch/discovery/azure/AzureDiscoveryModule.java +++ b/src/main/java/org/elasticsearch/discovery/azure/AzureDiscoveryModule.java @@ -43,8 +43,8 @@ public class AzureDiscoveryModule extends ZenDiscoveryModule { } @Override protected void bindDiscovery() { - if (AzureModule.isDiscoveryReady(settings)) { - bind(Discovery.class).to(AzureDiscovery.class).asEagerSingleton(); + if (AzureModule.isDiscoveryReady(settings, logger)) { + bind(Discovery.class).to(AzureDiscovery.class).asEagerSingleton(); } else { logger.debug("disabling azure discovery features"); } diff --git a/src/main/java/org/elasticsearch/plugin/cloud/azure/CloudAzurePlugin.java b/src/main/java/org/elasticsearch/plugin/cloud/azure/CloudAzurePlugin.java index 0caedbccf54..469ce6a0ddb 100644 --- a/src/main/java/org/elasticsearch/plugin/cloud/azure/CloudAzurePlugin.java +++ b/src/main/java/org/elasticsearch/plugin/cloud/azure/CloudAzurePlugin.java @@ -22,8 +22,13 @@ package org.elasticsearch.plugin.cloud.azure; import org.elasticsearch.cloud.azure.AzureModule; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.repositories.azure.AzureRepository; +import org.elasticsearch.repositories.azure.AzureRepositoryModule; import java.util.Collection; @@ -33,6 +38,7 @@ import java.util.Collection; public class CloudAzurePlugin extends AbstractPlugin { private final Settings settings; + private final ESLogger logger = ESLoggerFactory.getLogger(CloudAzurePlugin.class.getName()); public CloudAzurePlugin(Settings settings) { this.settings = settings; @@ -51,9 +57,17 @@ public class CloudAzurePlugin extends AbstractPlugin { @Override public Collection> modules() { Collection> modules = Lists.newArrayList(); - if (settings.getAsBoolean("cloud.enabled", true)) { + if (AzureModule.isCloudReady(settings)) { modules.add(AzureModule.class); } return modules; } + + @Override + public void processModule(Module module) { + if (AzureModule.isSnapshotReady(settings, logger) + && module instanceof RepositoriesModule) { + ((RepositoriesModule)module).registerRepository(AzureRepository.TYPE, AzureRepositoryModule.class); + } + } } diff --git a/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java new file mode 100644 index 00000000000..795ff123982 --- /dev/null +++ b/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -0,0 +1,146 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.azure; + +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.elasticsearch.cloud.azure.AzureStorageService; +import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.repositories.RepositoryName; +import org.elasticsearch.repositories.RepositorySettings; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Azure file system implementation of the BlobStoreRepository + *

    + * Azure file system repository supports the following settings: + *

    + *
    {@code container}
    Azure container name. Defaults to elasticsearch-snapshots
    + *
    {@code base_path}
    Specifies the path within bucket to repository data. Defaults to root directory.
    + *
    {@code concurrent_streams}
    Number of concurrent read/write stream (per repository on each node). Defaults to 5.
    + *
    {@code chunk_size}
    Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to 64mb.
    + *
    {@code compress}
    If set to true metadata files will be stored compressed. Defaults to false.
    + *
    + */ +public class AzureRepository extends BlobStoreRepository { + + public final static String TYPE = "azure"; + public final static String CONTAINER_DEFAULT = "elasticsearch-snapshots"; + + private final AzureBlobStore blobStore; + + private final BlobPath basePath; + + private ByteSizeValue chunkSize; + + private boolean compress; + + /** + * Constructs new shared file system repository + * + * @param name repository name + * @param repositorySettings repository settings + * @param indexShardRepository index shard repository + * @param azureStorageService Azure Storage service + * @throws java.io.IOException + */ + @Inject + public AzureRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AzureStorageService azureStorageService) throws IOException, URISyntaxException, StorageException { + super(name.getName(), repositorySettings, indexShardRepository); + + String container = repositorySettings.settings().get(AzureStorageService.Fields.CONTAINER, + componentSettings.get(AzureStorageService.Fields.CONTAINER, CONTAINER_DEFAULT)); + + int concurrentStreams = repositorySettings.settings().getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS, + componentSettings.getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS, 5)); + ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory(settings, "[azure_stream]")); + + this.blobStore = new AzureBlobStore(settings, azureStorageService, container, concurrentStreamPool); + this.chunkSize = repositorySettings.settings().getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE, + componentSettings.getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(64, ByteSizeUnit.MB))); + + if (this.chunkSize.getMb() > 64) { + logger.warn("azure repository does not support yet size > 64mb. Fall back to 64mb."); + this.chunkSize = new ByteSizeValue(64, ByteSizeUnit.MB); + } + + this.compress = repositorySettings.settings().getAsBoolean(AzureStorageService.Fields.COMPRESS, + componentSettings.getAsBoolean(AzureStorageService.Fields.COMPRESS, false)); + String basePath = repositorySettings.settings().get(AzureStorageService.Fields.BASE_PATH, null); + + if (Strings.hasLength(basePath)) { + // Remove starting / if any + basePath = Strings.trimLeadingCharacter(basePath, '/'); + BlobPath path = new BlobPath(); + for(String elem : Strings.splitStringToArray(basePath, '/')) { + path = path.add(elem); + } + this.basePath = path; + } else { + this.basePath = BlobPath.cleanPath(); + } + logger.debug("using container [{}], chunk_size [{}], concurrent_streams [{}], compress [{}], base_path [{}]", + container, chunkSize, concurrentStreams, compress, basePath); + } + + /** + * {@inheritDoc} + */ + @Override + protected BlobStore blobStore() { + return blobStore; + } + + @Override + protected BlobPath basePath() { + return basePath; + } + + /** + * {@inheritDoc} + */ + @Override + protected boolean isCompress() { + return compress; + } + + /** + * {@inheritDoc} + */ + @Override + protected ByteSizeValue chunkSize() { + return chunkSize; + } + + +} diff --git a/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryModule.java b/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryModule.java new file mode 100644 index 00000000000..9197d114003 --- /dev/null +++ b/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryModule.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.azure; + +import org.elasticsearch.cloud.azure.AzureModule; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository; +import org.elasticsearch.repositories.Repository; + +/** + * Azure repository module + */ +public class AzureRepositoryModule extends AbstractModule { + + protected final ESLogger logger; + private Settings settings; + + @Inject + public AzureRepositoryModule(Settings settings) { + super(); + this.logger = Loggers.getLogger(getClass(), settings); + this.settings = settings; + } + + /** + * {@inheritDoc} + */ + @Override + protected void configure() { + if (AzureModule.isSnapshotReady(settings, logger)) { + bind(Repository.class).to(AzureRepository.class).asEagerSingleton(); + bind(IndexShardRepository.class).to(BlobStoreIndexShardRepository.class).asEagerSingleton(); + } else { + logger.debug("disabling azure snapshot and restore features"); + } + } + +} + diff --git a/src/main/resources/es-plugin.properties b/src/main/resources/es-plugin.properties index 21cdee38637..3bcf1e294a7 100644 --- a/src/main/resources/es-plugin.properties +++ b/src/main/resources/es-plugin.properties @@ -1,4 +1,4 @@ -# Licensed to ElasticSearch under one or more contributor +# Licensed to Elasticsearch under one or more contributor # license agreements. See the NOTICE file distributed with this work for additional # information regarding copyright ownership. ElasticSearch licenses this file to you # under the Apache License, Version 2.0 (the "License"); you may not use this diff --git a/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureTest.java b/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureTest.java index 8f34054e7f3..3aeb1e2866f 100644 --- a/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureTest.java +++ b/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureTest.java @@ -48,6 +48,12 @@ public abstract class AbstractAzureTest extends ElasticsearchIntegrationTest { discovery: type: azure + + repositories: + azure: + account: "yourstorageaccount" + key: "storage key" + container: "container name" * */ @Documented diff --git a/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureRepositoryServiceTest.java b/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureRepositoryServiceTest.java new file mode 100644 index 00000000000..2cc457ad85e --- /dev/null +++ b/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureRepositoryServiceTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.azure; + +import com.microsoft.windowsazure.services.core.ServiceException; +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.elasticsearch.cloud.azure.AbstractAzureTest; +import org.elasticsearch.cloud.azure.AzureStorageService; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.test.store.MockDirectoryHelper; +import org.junit.After; +import org.junit.Before; + +import java.net.URISyntaxException; + +public abstract class AbstractAzureRepositoryServiceTest extends AbstractAzureTest { + + protected String basePath; + private Class mock; + + public AbstractAzureRepositoryServiceTest(Class mock, + String basePath) { + // We want to inject the Azure API Mock + this.mock = mock; + this.basePath = basePath; + } + + /** + * Deletes repositories, supports wildcard notation. + */ + public static void wipeRepositories(String... repositories) { + // if nothing is provided, delete all + if (repositories.length == 0) { + repositories = new String[]{"*"}; + } + for (String repository : repositories) { + try { + client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet(); + } catch (RepositoryMissingException ex) { + // ignore + } + } + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() + .put("cloud.azure." + AzureStorageService.Fields.ACCOUNT, "mock_azure_account") + .put("cloud.azure." + AzureStorageService.Fields.KEY, "mock_azure_key") + .put("repositories.azure.api.impl", mock) + .put("repositories.azure.container", "snapshots"); + + return builder.build(); + } + + @Override + public Settings indexSettings() { + // During restore we frequently restore index to exactly the same state it was before, that might cause the same + // checksum file to be written twice during restore operation + return ImmutableSettings.builder().put(super.indexSettings()) + .put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false) + .put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + } + + @Before @After + public final void wipe() throws StorageException, ServiceException, URISyntaxException { + wipeRepositories(); + cleanRepositoryFiles(basePath); + } + + /** + * Purge the test container + */ + public void cleanRepositoryFiles(String path) throws StorageException, ServiceException, URISyntaxException { + String container = cluster().getInstance(Settings.class).get("repositories.azure.container"); + logger.info("--> remove blobs in container [{}]", container); + AzureStorageService client = cluster().getInstance(AzureStorageService.class); + client.deleteFiles(container, path); + } +} diff --git a/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java b/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java new file mode 100644 index 00000000000..33220cad250 --- /dev/null +++ b/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java @@ -0,0 +1,258 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.azure; + + +import com.microsoft.windowsazure.services.core.ServiceException; +import com.microsoft.windowsazure.services.core.storage.StorageException; +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cloud.azure.AbstractAzureTest; +import org.elasticsearch.cloud.azure.AzureStorageService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.store.MockDirectoryHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.URISyntaxException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +/** + * This test needs Azure to run and -Dtests.azure=true to be set + * and -Des.config=/path/to/elasticsearch.yml + * @see org.elasticsearch.cloud.azure.AbstractAzureTest + */ +@AbstractAzureTest.AzureTest +@ElasticsearchIntegrationTest.ClusterScope( + scope = ElasticsearchIntegrationTest.Scope.SUITE, + numNodes = 2, + transportClientRatio = 0.0) +public class AzureSnapshotRestoreITest extends AbstractAzureTest { + + private final String basePath; + + public AzureSnapshotRestoreITest() { + basePath = "/snapshot-itest/repo-" + randomInt(); + } + + @Override + public Settings indexSettings() { + // During restore we frequently restore index to exactly the same state it was before, that might cause the same + // checksum file to be written twice during restore operation + return ImmutableSettings.builder().put(super.indexSettings()) + .put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false) + .put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false) + .build(); + } + + @Before + public final void wipeBefore() throws StorageException, ServiceException, URISyntaxException { + wipeRepositories(); + cleanRepositoryFiles(basePath); + } + + @After + public final void wipeAfter() throws StorageException, ServiceException, URISyntaxException { + wipeRepositories(); + cleanRepositoryFiles(basePath); + } + + @Test + public void testSimpleWorkflow() { + Client client = client(); + logger.info("--> creating azure repository with path [{}]", basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("azure").setSettings(ImmutableSettings.settingsBuilder() + .put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration") + .put(AzureStorageService.Fields.BASE_PATH, basePath) + .put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000)) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); + index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i); + index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete some data"); + for (int i = 0; i < 50; i++) { + client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get(); + } + for (int i = 50; i < 100; i++) { + client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get(); + } + for (int i = 0; i < 100; i += 2) { + client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get(); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); + + logger.info("--> close indices"); + client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); + + // Test restore after index deletion + logger.info("--> delete indices"); + wipeIndices("test-idx-1", "test-idx-2"); + logger.info("--> restore one index after deletion"); + restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); + assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + } + + @Test + public void testMultipleRepositories() { + Client client = client(); + logger.info("--> creating azure repository with path [{}]", basePath); + PutRepositoryResponse putRepositoryResponse1 = client.admin().cluster().preparePutRepository("test-repo1") + .setType("azure").setSettings(ImmutableSettings.settingsBuilder() + .put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration1") + .put(AzureStorageService.Fields.BASE_PATH, basePath) + .put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000)) + ).get(); + assertThat(putRepositoryResponse1.isAcknowledged(), equalTo(true)); + PutRepositoryResponse putRepositoryResponse2 = client.admin().cluster().preparePutRepository("test-repo2") + .setType("azure").setSettings(ImmutableSettings.settingsBuilder() + .put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration2") + .put(AzureStorageService.Fields.BASE_PATH, basePath) + .put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000)) + ).get(); + assertThat(putRepositoryResponse2.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); + index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + + logger.info("--> snapshot 1"); + CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot("test-repo1", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").get(); + assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards())); + + logger.info("--> snapshot 2"); + CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot("test-repo2", "test-snap").setWaitForCompletion(true).setIndices("test-idx-2").get(); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo1").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo2").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + // Test restore after index deletion + logger.info("--> delete indices"); + wipeIndices("test-idx-1", "test-idx-2"); + logger.info("--> restore one index after deletion from snapshot 1"); + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin().cluster().prepareRestoreSnapshot("test-repo1", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1").execute().actionGet(); + assertThat(restoreSnapshotResponse1.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); + assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + + logger.info("--> restore other index after deletion from snapshot 2"); + RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin().cluster().prepareRestoreSnapshot("test-repo2", "test-snap").setWaitForCompletion(true).setIndices("test-idx-2").execute().actionGet(); + assertThat(restoreSnapshotResponse2.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + clusterState = client.admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); + assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(true)); + } + + /** + * Deletes repositories, supports wildcard notation. + */ + public static void wipeRepositories(String... repositories) { + // if nothing is provided, delete all + if (repositories.length == 0) { + repositories = new String[]{"*"}; + } + for (String repository : repositories) { + try { + client().admin().cluster().prepareDeleteRepository(repository).execute().actionGet(); + } catch (RepositoryMissingException ex) { + // ignore + } + } + } + + /** + * Purge the test container + */ + public void cleanRepositoryFiles(String path) throws StorageException, ServiceException, URISyntaxException { + String container = cluster().getInstance(Settings.class).get("repositories.azure.container", + AzureRepository.CONTAINER_DEFAULT); + logger.info("--> remove blobs in container [{}], path [{}]", container, path); + AzureStorageService client = cluster().getInstance(AzureStorageService.class); + + // Remove starting / if any + path = Strings.trimLeadingCharacter(path, '/'); + + client.deleteFiles(container, path); + } +} diff --git a/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTest.java b/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTest.java new file mode 100644 index 00000000000..03b808cd654 --- /dev/null +++ b/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.azure; + + +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@ElasticsearchIntegrationTest.ClusterScope( + scope = ElasticsearchIntegrationTest.Scope.SUITE, + numNodes = 1, + transportClientRatio = 0.0) +public class AzureSnapshotRestoreTest extends AbstractAzureRepositoryServiceTest { + + public AzureSnapshotRestoreTest() { + super(AzureStorageServiceMock.class, "/snapshot-test/repo-" + randomInt()); + } + + @Test + public void testSimpleWorkflow() { + Client client = client(); + logger.info("--> creating azure repository with path [{}]", basePath); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("azure").setSettings(ImmutableSettings.settingsBuilder() + .put("base_path", basePath) + .put("chunk_size", randomIntBetween(1000, 10000)) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i); + index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i); + index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L)); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete some data"); + for (int i = 0; i < 50; i++) { + client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get(); + } + for (int i = 50; i < 100; i++) { + client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get(); + } + for (int i = 0; i < 100; i += 2) { + client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get(); + } + refresh(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); + + logger.info("--> close indices"); + client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L)); + assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L)); + + // Test restore after index deletion + logger.info("--> delete indices"); + wipeIndices("test-idx-1", "test-idx-2"); + logger.info("--> restore one index after deletion"); + restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L)); + ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); + assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + } + +} diff --git a/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java b/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java new file mode 100644 index 00000000000..689bee26b21 --- /dev/null +++ b/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.azure; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cloud.azure.AzureStorageService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * In memory storage for unit tests + */ +public class AzureStorageServiceMock extends AbstractLifecycleComponent + implements AzureStorageService { + + protected Map blobs = new ConcurrentHashMap(); + + @Inject + protected AzureStorageServiceMock(Settings settings) { + super(settings); + } + + @Override + public boolean doesContainerExist(String container) { + return true; + } + + @Override + public void removeContainer(String container) { + } + + @Override + public void createContainer(String container) { + } + + @Override + public void deleteFiles(String container, String path) { + } + + @Override + public boolean blobExists(String container, String blob) { + return blobs.containsKey(blob); + } + + @Override + public void deleteBlob(String container, String blob) { + blobs.remove(blob); + } + + @Override + public InputStream getInputStream(String container, String blob) { + return new ByteArrayInputStream(blobs.get(blob)); + } + + @Override + public ImmutableMap listBlobsByPrefix(String container, String prefix) { + ImmutableMap.Builder blobsBuilder = ImmutableMap.builder(); + for (String blobName : blobs.keySet()) { + if (Strings.startsWithIgnoreCase(blobName, prefix)) { + blobsBuilder.put(blobName, new PlainBlobMetaData(blobName, blobs.get(blobName).length)); + } + } + ImmutableMap map = blobsBuilder.build(); + return map; + } + + @Override + public void putObject(String container, String blob, InputStream is, long length) { + try { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + int nRead; + byte[] data = new byte[65535]; + + while ((nRead = is.read(data, 0, data.length)) != -1) { + buffer.write(data, 0, nRead); + } + + buffer.flush(); + + blobs.put(blob, buffer.toByteArray()); + } catch (IOException e) { + } + } + + @Override + protected void doStart() throws ElasticsearchException { + } + + @Override + protected void doStop() throws ElasticsearchException { + } + + @Override + protected void doClose() throws ElasticsearchException { + } +} diff --git a/src/test/resources/elasticsearch.yml b/src/test/resources/elasticsearch.yml index 07536b8c77b..d84f690e5a8 100644 --- a/src/test/resources/elasticsearch.yml +++ b/src/test/resources/elasticsearch.yml @@ -1,4 +1,4 @@ -# Licensed to ElasticSearch under one or more contributor +# Licensed to Elasticsearch under one or more contributor # license agreements. See the NOTICE file distributed with this work for additional # information regarding copyright ownership. ElasticSearch licenses this file to you # under the Apache License, Version 2.0 (the "License"); you may not use this @@ -25,6 +25,16 @@ # password: YOUR-PASSWORD # subscription_id: YOUR-AZURE-SUBSCRIPTION-ID # service_name: YOUR-AZURE-SERVICE-NAME +# storage_account: "YOUR-AZURE-STORAGE-NAME" +# storage_key: "YOUR-AZURE-STORAGE-KEY" # # discovery: # type: azure +# +# repositories: +# azure: +# container: "NAME-OF-CONTAINER-USED-FOR-SNAPSHOTS" #optional default to "elasticsearch-snapshots" +# base_path: "path/to/snapshots" #optional default to empty +# concurrent_streams: 5 #optional default to 5 +# chunk_size: 64mb #optional default to "64mb" +# compress: false #optional default to false diff --git a/src/test/resources/log4j.xml b/src/test/resources/log4j.xml index 95b3bf51c77..be428fd906e 100644 --- a/src/test/resources/log4j.xml +++ b/src/test/resources/log4j.xml @@ -1,5 +1,5 @@ - + + + - +