diff --git a/README.md b/README.md index e9720a94ca6..f5970e9b236 100644 --- a/README.md +++ b/README.md @@ -378,7 +378,6 @@ The Azure repository supports following settings: * `container`: Container name. Defaults to `elasticsearch-snapshots` * `base_path`: Specifies the path within container to repository data. Defaults to empty (root directory). -* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`. * `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `64m` (64m max) * `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index @@ -398,7 +397,6 @@ $ curl -XPUT 'http://localhost:9200/_snapshot/my_backup2' -d '{ "settings": { "container": "backup_container", "base_path": "backups", - "concurrent_streams": 2, "chunk_size": "32m", "compress": true } diff --git a/pom.xml b/pom.xml index e0baedb018c..1e0fa3aa07d 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ governing permissions and limitations under the License. --> 2.0.0-SNAPSHOT - 4.9.0 + 4.10.1 onerror true onerror diff --git a/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java index 700fc6ab4ac..27a0ec04bf5 100644 --- a/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java +++ b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageService.java @@ -24,8 +24,8 @@ import com.microsoft.windowsazure.services.core.storage.StorageException; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.collect.ImmutableMap; -import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URISyntaxException; /** @@ -38,7 +38,6 @@ public interface AzureStorageService { public static final String KEY = "storage_key"; public static final String CONTAINER = "container"; public static final String BASE_PATH = "base_path"; - public static final String CONCURRENT_STREAMS = "concurrent_streams"; public static final String CHUNK_SIZE = "chunk_size"; public static final String COMPRESS = "compress"; } @@ -57,9 +56,7 @@ public interface AzureStorageService { InputStream getInputStream(String container, String blob) throws ServiceException; + OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException; + ImmutableMap listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException; - - void putObject(String container, String blob, InputStream is, long length) throws URISyntaxException, StorageException, IOException; - - } diff --git a/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java index e968ec7a4f5..df0b8f20aaf 100644 --- a/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java +++ b/src/main/java/org/elasticsearch/cloud/azure/AzureStorageServiceImpl.java @@ -43,8 +43,8 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.List; @@ -197,6 +197,11 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException { logger.debug("listBlobsByPrefix container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix); @@ -223,16 +228,6 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent listBlobsByPrefix(@Nullable String prefix) throws IOException { diff --git a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java index 3958e94a956..4c4d21f757c 100644 --- a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java +++ b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobStore.java @@ -22,16 +22,13 @@ package org.elasticsearch.cloud.azure.blobstore; import com.microsoft.windowsazure.services.core.ServiceException; import com.microsoft.windowsazure.services.core.storage.StorageException; import org.elasticsearch.cloud.azure.AzureStorageService; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; import java.net.URISyntaxException; -import java.util.concurrent.Executor; /** * @@ -42,17 +39,10 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore { private final String container; - private final Executor executor; - - private final int bufferSizeInBytes; - - public AzureBlobStore(Settings settings, AzureStorageService client, String container, Executor executor) throws URISyntaxException, StorageException { + public AzureBlobStore(Settings settings, AzureStorageService client, String container) throws URISyntaxException, StorageException { super(settings); this.client = client; this.container = container; - this.executor = executor; - - this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); if (!client.doesContainerExist(container)) { client.createContainer(container); @@ -72,17 +62,9 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore { return container; } - public Executor executor() { - return executor; - } - - public int bufferSizeInBytes() { - return bufferSizeInBytes; - } - @Override - public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { - return new AzureImmutableBlobContainer(path, this); + public BlobContainer blobContainer(BlobPath path) { + return new AzureBlobContainer(path, this); } @Override diff --git a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureImmutableBlobContainer.java b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureImmutableBlobContainer.java deleted file mode 100644 index 2b4f80ba88b..00000000000 --- a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureImmutableBlobContainer.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.azure.blobstore; - -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; -import org.elasticsearch.common.blobstore.support.BlobStores; - -import java.io.IOException; -import java.io.InputStream; - -/** - * - */ -public class AzureImmutableBlobContainer extends AbstractAzureBlobContainer implements ImmutableBlobContainer { - - public AzureImmutableBlobContainer(BlobPath path, AzureBlobStore blobStore) { - super(path, blobStore); - } - - @Override - public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { - blobStore.executor().execute(new Runnable() { - @Override - public void run() { - try { - blobStore.client().putObject(blobStore.container(), buildKey(blobName), is, sizeInBytes); - listener.onCompleted(); - } catch (Throwable e) { - listener.onFailure(e); - } - } - }); - } - - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { - BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes); - } -} diff --git a/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureOutputStream.java b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureOutputStream.java new file mode 100644 index 00000000000..6a95eeba778 --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureOutputStream.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.azure.blobstore; + +import java.io.IOException; +import java.io.OutputStream; + +public class AzureOutputStream extends OutputStream { + + private final OutputStream blobOutputStream; + + public AzureOutputStream(OutputStream blobOutputStream) { + this.blobOutputStream = blobOutputStream; + } + + @Override + public void write(int b) throws IOException { + blobOutputStream.write(b); + } + + @Override + public void close() throws IOException { + try { + blobOutputStream.close(); + } catch (IOException e) { + // Azure is sending a "java.io.IOException: Stream is already closed." + } + } +} diff --git a/src/main/java/org/elasticsearch/discovery/azure/AzureDiscovery.java b/src/main/java/org/elasticsearch/discovery/azure/AzureDiscovery.java index b890b327e56..7843880c719 100755 --- a/src/main/java/org/elasticsearch/discovery/azure/AzureDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/azure/AzureDiscovery.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.azure; -import org.elasticsearch.Version; import org.elasticsearch.cloud.azure.AzureComputeService; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; @@ -51,7 +50,7 @@ public class AzureDiscovery extends ZenDiscovery { DiscoveryNodeService discoveryNodeService, AzureComputeService azureService, NetworkService networkService, DiscoverySettings discoverySettings, ElectMasterService electMasterService) { super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, - discoveryNodeService, pingService, electMasterService, Version.CURRENT, discoverySettings); + discoveryNodeService, pingService, electMasterService, discoverySettings); if (settings.getAsBoolean("cloud.enabled", true)) { ImmutableList zenPings = pingService.zenPings(); UnicastZenPing unicastZenPing = null; diff --git a/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 795ff123982..f8eb8442e32 100644 --- a/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; @@ -36,8 +35,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; import java.net.URISyntaxException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * Azure file system implementation of the BlobStoreRepository @@ -46,7 +43,6 @@ import java.util.concurrent.TimeUnit; *
*
{@code container}
Azure container name. Defaults to elasticsearch-snapshots
*
{@code base_path}
Specifies the path within bucket to repository data. Defaults to root directory.
- *
{@code concurrent_streams}
Number of concurrent read/write stream (per repository on each node). Defaults to 5.
*
{@code chunk_size}
Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to 64mb.
*
{@code compress}
If set to true metadata files will be stored compressed. Defaults to false.
*
@@ -80,12 +76,7 @@ public class AzureRepository extends BlobStoreRepository { String container = repositorySettings.settings().get(AzureStorageService.Fields.CONTAINER, componentSettings.get(AzureStorageService.Fields.CONTAINER, CONTAINER_DEFAULT)); - int concurrentStreams = repositorySettings.settings().getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS, - componentSettings.getAsInt(AzureStorageService.Fields.CONCURRENT_STREAMS, 5)); - ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory(settings, "[azure_stream]")); - - this.blobStore = new AzureBlobStore(settings, azureStorageService, container, concurrentStreamPool); + this.blobStore = new AzureBlobStore(settings, azureStorageService, container); this.chunkSize = repositorySettings.settings().getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE, componentSettings.getAsBytesSize(AzureStorageService.Fields.CHUNK_SIZE, new ByteSizeValue(64, ByteSizeUnit.MB))); @@ -109,8 +100,8 @@ public class AzureRepository extends BlobStoreRepository { } else { this.basePath = BlobPath.cleanPath(); } - logger.debug("using container [{}], chunk_size [{}], concurrent_streams [{}], compress [{}], base_path [{}]", - container, chunkSize, concurrentStreams, compress, basePath); + logger.debug("using container [{}], chunk_size [{}], compress [{}], base_path [{}]", + container, chunkSize, compress, basePath); } /** diff --git a/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java b/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java index 064823ee530..2c1aa616894 100644 --- a/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java +++ b/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.azure; +import com.microsoft.windowsazure.services.core.storage.StorageException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cloud.azure.AzureStorageService; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -28,10 +29,8 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; +import java.net.URISyntaxException; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -42,7 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; public class AzureStorageServiceMock extends AbstractLifecycleComponent implements AzureStorageService { - protected Map blobs = new ConcurrentHashMap(); + protected Map blobs = new ConcurrentHashMap(); @Inject protected AzureStorageServiceMock(Settings settings) { @@ -78,7 +77,14 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent blobsBuilder = ImmutableMap.builder(); for (String blobName : blobs.keySet()) { if (startsWithIgnoreCase(blobName, prefix)) { - blobsBuilder.put(blobName, new PlainBlobMetaData(blobName, blobs.get(blobName).length)); + blobsBuilder.put(blobName, new PlainBlobMetaData(blobName, blobs.get(blobName).size())); } } ImmutableMap map = blobsBuilder.build(); return map; } - @Override - public void putObject(String container, String blob, InputStream is, long length) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - - int nRead; - byte[] data = new byte[65535]; - - while ((nRead = is.read(data, 0, data.length)) != -1) { - buffer.write(data, 0, nRead); - } - - buffer.flush(); - - blobs.put(blob, buffer.toByteArray()); - } catch (IOException e) { - } - } - @Override protected void doStart() throws ElasticsearchException { }