From a0a714e6a5e806638372ed2e963c09028c8ab0df Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 11 Jan 2011 22:03:44 +0200 Subject: [PATCH] Shared Gateway: Allow to set the number of concurrent streams doing snapshot operations, closes #621. --- .../elasticsearch/gateway/fs/FsGateway.java | 18 +++++++++++++++++- .../elasticsearch/gateway/s3/S3Gateway.java | 18 ++++++++++++++---- .../gateway/hdfs/HdfsGateway.java | 14 ++++++++++++-- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java index 133885f5c37..e0a28286c8a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java @@ -19,6 +19,7 @@ package org.elasticsearch.gateway.fs; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; @@ -26,6 +27,9 @@ import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.DynamicExecutors; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import org.elasticsearch.index.gateway.fs.FsIndexGatewayModule; @@ -33,12 +37,15 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.io.IOException; +import java.util.concurrent.ExecutorService; /** * @author kimchy (shay.banon) */ public class FsGateway extends BlobStoreGateway { + private final ExecutorService concurrentStreamPool; + @Inject public FsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException { super(settings, clusterService, createIndexService); @@ -51,7 +58,11 @@ public class FsGateway extends BlobStoreGateway { } else { gatewayFile = new File(location); } - initialize(new FsBlobStore(componentSettings, threadPool.cached(), gatewayFile), clusterName, null); + + int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); + this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]")); + + initialize(new FsBlobStore(componentSettings, concurrentStreamPool, gatewayFile), clusterName, null); } @Override public String type() { @@ -61,4 +72,9 @@ public class FsGateway extends BlobStoreGateway { @Override public Class suggestIndexGateway() { return FsIndexGatewayModule.class; } + + @Override protected void doClose() throws ElasticSearchException { + super.doClose(); + concurrentStreamPool.shutdown(); + } } diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java index 8a10e0280fa..332d5dd87c7 100644 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java @@ -31,17 +31,23 @@ import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.DynamicExecutors; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import org.elasticsearch.index.gateway.s3.S3IndexGatewayModule; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.concurrent.ExecutorService; /** * @author kimchy (shay.banon) */ public class S3Gateway extends BlobStoreGateway { + private final ExecutorService concurrentStreamPool; + @Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException { super(settings, clusterService, createIndexService); @@ -76,13 +82,17 @@ public class S3Gateway extends BlobStoreGateway { } ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)); - logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize); + int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); + this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]")); - initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, threadPool.cached()), clusterName, chunkSize); + logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams); + + initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, concurrentStreamPool), clusterName, chunkSize); } - @Override public void close() throws ElasticSearchException { - super.close(); + @Override protected void doClose() throws ElasticSearchException { + super.doClose(); + concurrentStreamPool.shutdown(); } @Override public String type() { diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java index 6e1535dd529..75af81c166e 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java @@ -31,12 +31,16 @@ import org.elasticsearch.common.blobstore.hdfs.HdfsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.DynamicExecutors; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.URI; import java.util.Map; +import java.util.concurrent.ExecutorService; /** * @author kimchy (shay.banon) @@ -47,6 +51,8 @@ public class HdfsGateway extends BlobStoreGateway { private final FileSystem fileSystem; + private final ExecutorService concurrentStreamPool; + @Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, ClusterName clusterName, ThreadPool threadPool) throws IOException { super(settings, clusterService, createIndexService); @@ -62,7 +68,10 @@ public class HdfsGateway extends BlobStoreGateway { } Path hPath = new Path(new Path(path), clusterName.value()); - logger.debug("Using uri [{}], path [{}]", uri, hPath); + int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); + this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]")); + + logger.debug("Using uri [{}], path [{}], concurrent_streams [{}]", uri, hPath, concurrentStreams); Configuration conf = new Configuration(); Settings hdfsSettings = settings.getByPrefix("hdfs.conf."); @@ -72,7 +81,7 @@ public class HdfsGateway extends BlobStoreGateway { fileSystem = FileSystem.get(URI.create(uri), conf); - initialize(new HdfsBlobStore(settings, fileSystem, threadPool.cached(), hPath), clusterName, null); + initialize(new HdfsBlobStore(settings, fileSystem, concurrentStreamPool, hPath), clusterName, null); } @Override public String type() { @@ -92,5 +101,6 @@ public class HdfsGateway extends BlobStoreGateway { // ignore } } + concurrentStreamPool.shutdown(); } }