From 77b6d1d8b8d304578fd97ea159b7109671eaacc6 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 24 Jul 2010 23:39:43 +0300 Subject: [PATCH] remove cloud plugin, reimplement an AWS specific cloud plugin with S3 gateway and EC2 discovery --- .idea/dictionaries/kimchy.xml | 1 + .idea/modules.xml | 2 +- .idea/modules/elasticsearch-root.iml | 2 +- .idea/modules/plugin-cloud-aws.iml | 33 +++++ config/logging.yml | 2 + .../ImmutableAppendableBlobContainer.java | 9 +- .../discovery/DiscoveryModule.java | 2 +- .../discovery/zen/ZenDiscoveryModule.java | 4 + .../elasticsearch/gateway/GatewayService.java | 2 + .../gateway/none/NoneGateway.java | 4 + .../gateway/IndexShardGatewayService.java | 2 +- .../blobstore/BlobStoreIndexShardGateway.java | 13 +- .../index/gateway/none/NoneIndexGateway.java | 2 +- .../gateway/none/NoneIndexShardGateway.java | 4 + plugins/cloud/{ => aws}/build.gradle | 62 +++----- .../aws/src/main/java/es-plugin.properties | 2 + .../cloud/aws/AwsEc2Service.java | 90 +++++++++++ .../elasticsearch/cloud/aws/AwsModule.java} | 10 +- .../elasticsearch/cloud/aws/AwsS3Service.java | 92 ++++++++++++ .../cloud/aws/AwsSettingsFilter.java | 36 +++++ .../blobstore/AbstarctS3BlobContainer.java | 128 ++++++++++++++++ .../cloud/aws/blobstore/S3BlobStore.java} | 93 ++++++------ .../blobstore/S3ImmutableBlobContainer.java} | 33 ++--- .../ec2/AwsEc2UnicastHostsProvider.java | 140 ++++++++++++++++++ .../discovery/ec2/Ec2Discovery.java} | 12 +- .../discovery/ec2/Ec2DiscoveryModule.java} | 14 +- .../elasticsearch/gateway/s3/S3Gateway.java} | 36 +++-- .../gateway/s3/S3GatewayModule.java} | 8 +- .../index/gateway/s3/S3IndexGateway.java} | 11 +- .../gateway/s3/S3IndexGatewayModule.java} | 8 +- .../gateway/s3/S3IndexShardGateway.java} | 11 +- .../plugin/cloud/aws/CloudAwsPlugin.java} | 29 ++-- .../cloud/src/main/java/es-plugin.properties | 1 - .../blobstore/AbstractCloudBlobContainer.java | 124 ---------------- .../blobstore/CloudBlobStoreService.java | 101 ------------- .../cloud/compute/CloudComputeService.java | 100 ------------- .../cloud/jclouds/JCloudsUtils.java | 42 ------ .../jclouds/logging/JCloudsLoggingModule.java | 108 -------------- .../cloud/CloudUnicastHostsProvider.java | 114 -------------- settings.gradle | 2 +- 40 files changed, 711 insertions(+), 778 deletions(-) create mode 100644 .idea/modules/plugin-cloud-aws.iml rename plugins/cloud/{ => aws}/build.gradle (66%) create mode 100644 plugins/cloud/aws/src/main/java/es-plugin.properties create mode 100644 plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java rename plugins/cloud/{src/main/java/org/elasticsearch/cloud/CloudModule.java => aws/src/main/java/org/elasticsearch/cloud/aws/AwsModule.java} (74%) create mode 100644 plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java create mode 100644 plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsSettingsFilter.java create mode 100644 plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java rename plugins/cloud/{src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStore.java => aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java} (52%) rename plugins/cloud/{src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java => aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java} (55%) create mode 100644 plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java rename plugins/cloud/{src/main/java/org/elasticsearch/discovery/cloud/CloudDiscovery.java => aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java} (80%) rename plugins/cloud/{src/main/java/org/elasticsearch/discovery/cloud/CloudDiscoveryModule.java => aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryModule.java} (69%) rename plugins/cloud/{src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java => aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java} (55%) rename plugins/cloud/{src/main/java/org/elasticsearch/gateway/cloud/CloudGatewayModule.java => aws/src/main/java/org/elasticsearch/gateway/s3/S3GatewayModule.java} (85%) rename plugins/cloud/{src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java => aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGateway.java} (82%) rename plugins/cloud/{src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java => aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGatewayModule.java} (84%) rename plugins/cloud/{src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java => aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java} (79%) rename plugins/cloud/{src/main/java/org/elasticsearch/plugin/cloud/CloudPlugin.java => aws/src/main/java/org/elasticsearch/plugin/cloud/aws/CloudAwsPlugin.java} (71%) delete mode 100644 plugins/cloud/src/main/java/es-plugin.properties delete mode 100644 plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/AbstractCloudBlobContainer.java delete mode 100644 plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java delete mode 100644 plugins/cloud/src/main/java/org/elasticsearch/cloud/compute/CloudComputeService.java delete mode 100644 plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java delete mode 100644 plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/logging/JCloudsLoggingModule.java delete mode 100644 plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudUnicastHostsProvider.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index d475fd801a8..9d24bb5f29a 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -49,6 +49,7 @@ hdfs histo hpux + https hyperic ifconfig indices diff --git a/.idea/modules.xml b/.idea/modules.xml index 96a5c45f757..2667b1760d8 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -7,9 +7,9 @@ + - diff --git a/.idea/modules/elasticsearch-root.iml b/.idea/modules/elasticsearch-root.iml index 958921dfce5..ab33aa0ed8a 100644 --- a/.idea/modules/elasticsearch-root.iml +++ b/.idea/modules/elasticsearch-root.iml @@ -18,7 +18,7 @@ - + diff --git a/.idea/modules/plugin-cloud-aws.iml b/.idea/modules/plugin-cloud-aws.iml new file mode 100644 index 00000000000..e3f92348438 --- /dev/null +++ b/.idea/modules/plugin-cloud-aws.iml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/config/logging.yml b/config/logging.yml index 7cc496f06cf..80bd2be1815 100644 --- a/config/logging.yml +++ b/config/logging.yml @@ -2,6 +2,8 @@ rootLogger: INFO, console, file logger: # log action execution errors for easier debugging action: DEBUG + # reduce the logging for aws, too much is logged under the default INFO + com.amazonaws: WARN # gateway #gateway: DEBUG diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java index 55111e84401..4ff6e63d3ca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java @@ -50,13 +50,18 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl @Override public void append(final AppendBlobListener listener) { BytesStreamOutput out = new BytesStreamOutput(); - String partBlobName = blobName + ".a" + (part++); try { listener.withStream(out); } catch (Exception e) { listener.onFailure(e); return; } + if (out.size() == 0) { + // nothing to write, bail + listener.onCompleted(); + return; + } + String partBlobName = blobName + ".a" + (part++); // use teh sync one ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size()); container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() { @@ -120,7 +125,7 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl ImmutableMap blobs = buildVirtualBlobs(container.listBlobs()); for (String blobName : blobs.keySet()) { if (filter.accept(blobName)) { - container.deleteBlob(blobName); + deleteBlob(blobName); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 5a0f1c5f024..6d5c1b45dfa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -28,7 +28,7 @@ import org.elasticsearch.discovery.zen.ZenDiscoveryModule; import static org.elasticsearch.common.inject.ModulesFactory.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class DiscoveryModule extends AbstractModule { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java index 521d9a2569d..3f5cca3838b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java @@ -30,6 +30,10 @@ public class ZenDiscoveryModule extends AbstractModule { @Override protected void configure() { bind(ZenPingService.class).asEagerSingleton(); + bindDiscovery(); + } + + protected void bindDiscovery() { bind(Discovery.class).to(ZenDiscovery.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 88b855762b3..9b100a8572c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -180,9 +180,11 @@ public class GatewayService extends AbstractLifecycleComponent i */ private Boolean readFromGateway(@Nullable TimeValue waitTimeout) { logger.debug("reading state from gateway {} ...", gateway); + StopWatch stopWatch = new StopWatch().start(); MetaData metaData; try { metaData = gateway.read(); + logger.debug("read state from gateway {}, took {}", gateway, stopWatch.stop().totalTime()); } catch (Exception e) { logger.error("failed to read from gateway", e); markMetaDataAsReadFromGateway("failure"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java index 07dab8842bb..d4640c9eef9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java @@ -44,6 +44,10 @@ public class NoneGateway extends AbstractLifecycleComponent implements return TYPE; } + @Override public String toString() { + return "_none_"; + } + @Override protected void doStart() throws ElasticSearchException { } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 8252ce483a3..291f60373a3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -140,7 +140,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem throttlingWaitTime.stop(); try { - logger.debug("starting recovery from {}", shardGateway); + logger.debug("starting recovery from {} ...", shardGateway); StopWatch stopWatch = new StopWatch().start(); IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 6e4e638e882..20461e9541e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -323,10 +323,21 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // delete the old translog if (snapshot.newTranslogCreated()) { try { - translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId()); + translogContainer.deleteBlobsByFilter(new BlobContainer.BlobNameFilter() { + @Override public boolean accept(String blobName) { + // delete all the ones that are not this translog + return !blobName.equals("translog-" + translogSnapshot.translogId()); + } + }); } catch (Exception e) { // ignore } + // NOT doing this one, the above allows us to clean the translog properly +// try { +// translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId()); +// } catch (Exception e) { +// // ignore +// } } // delete old index files diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java index 90c454d5a40..4ce2c859600 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexGateway.java @@ -45,7 +45,7 @@ public class NoneIndexGateway extends AbstractIndexComponent implements IndexGat } @Override public String toString() { - return "none"; + return "_none_"; } @Override public void close(boolean delete) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index d06e06ac432..b7af93be285 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -44,6 +44,10 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement this.indexShard = (InternalIndexShard) indexShard; } + @Override public String toString() { + return "_none_"; + } + @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { // in the none case, we simply start the shard // clean the store, there should be nothing there... diff --git a/plugins/cloud/build.gradle b/plugins/cloud/aws/build.gradle similarity index 66% rename from plugins/cloud/build.gradle rename to plugins/cloud/aws/build.gradle index 1f0a60d5312..3cbcccf883a 100644 --- a/plugins/cloud/build.gradle +++ b/plugins/cloud/aws/build.gradle @@ -3,7 +3,7 @@ dependsOn(':elasticsearch') apply plugin: 'java' apply plugin: 'maven' -archivesBaseName = "elasticsearch-cloud" +archivesBaseName = "elasticsearch-cloud-aws" explodedDistDir = new File(distsDir, 'exploded') @@ -14,51 +14,32 @@ configurations.testCompile.transitive = true sourceSets.main.resources.srcDirs 'src/main/java' sourceSets.test.resources.srcDirs 'src/test/java' -// add the source files to the dist jar -//jar { +jar { // from sourceSets.main.allJava -//} + manifest { + attributes("Implementation-Title": "ElasticSearch", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) + } +} configurations { dists distLib { visible = false + transitive = false } } -repositories { - mavenRepo urls: "http://jclouds.googlecode.com/svn/repo" - mavenRepo urls: "http://jclouds.rimuhosting.com/maven2/snapshots" - mavenRepo urls: "http://java-xmlbuilder.googlecode.com/svn/repo" -} - -jcloudsVersion = "1.0-beta-6" - dependencies { compile project(':elasticsearch') - compile("org.jclouds:jclouds-blobstore:$jcloudsVersion") - compile("org.jclouds:jclouds-aws:$jcloudsVersion") - compile("org.jclouds:jclouds-rackspace:$jcloudsVersion") - compile("org.jclouds:jclouds-atmos:$jcloudsVersion") - compile("org.jclouds:jclouds-azure:$jcloudsVersion") - compile("org.jclouds:jclouds-gogrid:$jcloudsVersion") - compile("org.jclouds:jclouds-vcloud:$jcloudsVersion") - compile("org.jclouds:jclouds-rimuhosting:$jcloudsVersion") - compile("org.jclouds:jclouds-terremark:$jcloudsVersion") - compile("org.jclouds:jclouds-bluelock:$jcloudsVersion") - compile("org.jclouds:jclouds-hostingdotcom:$jcloudsVersion") + compile("com.amazonaws:aws-java-sdk:1.0.007") { transitive = false } + runtime("commons-logging:commons-logging:1.1.1") { transitive = false } + runtime("commons-codec:commons-codec:1.3") { transitive = false } + runtime("commons-httpclient:commons-httpclient:3.0.1") { transitive = false } - distLib("org.jclouds:jclouds-blobstore:$jcloudsVersion") - distLib("org.jclouds:jclouds-aws:$jcloudsVersion") - distLib("org.jclouds:jclouds-rackspace:$jcloudsVersion") - distLib("org.jclouds:jclouds-atmos:$jcloudsVersion") - distLib("org.jclouds:jclouds-azure:$jcloudsVersion") - distLib("org.jclouds:jclouds-gogrid:$jcloudsVersion") - distLib("org.jclouds:jclouds-vcloud:$jcloudsVersion") - distLib("org.jclouds:jclouds-rimuhosting:$jcloudsVersion") - distLib("org.jclouds:jclouds-terremark:$jcloudsVersion") - distLib("org.jclouds:jclouds-bluelock:$jcloudsVersion") - distLib("org.jclouds:jclouds-hostingdotcom:$jcloudsVersion") + distLib("com.amazonaws:aws-java-sdk:1.0.007") { transitive = false } + distLib("commons-codec:commons-codec:1.3") { transitive = false } + distLib("commons-logging:commons-logging:1.1.1") { transitive = false } + distLib("commons-httpclient:commons-httpclient:3.0.1") { transitive = false } testCompile project(':test-testng') testCompile('org.testng:testng:5.10:jdk15') { transitive = false } @@ -124,13 +105,6 @@ task javadocJar(type: Jar, dependsOn: javadoc) { from javadoc.destinationDir } -jar { -// from sourceSets.main.allJava - manifest { - attributes("Implementation-Title": "ElasticSearch", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr) - } -} - artifacts { archives sourcesJar archives javadocJar @@ -148,8 +122,8 @@ uploadArchives { pom.project { inceptionYear '2009' - name 'elasticsearch-plugins-cloud' - description 'Clouud Plugin for ElasticSearch' + name 'elasticsearch-plugins-cloud-aws' + description 'Cloud AWS Plugin for ElasticSearch' licenses { license { name 'The Apache Software License, Version 2.0' @@ -168,4 +142,4 @@ uploadArchives { pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones } } -} +} \ No newline at end of file diff --git a/plugins/cloud/aws/src/main/java/es-plugin.properties b/plugins/cloud/aws/src/main/java/es-plugin.properties new file mode 100644 index 00000000000..43b40e7fc79 --- /dev/null +++ b/plugins/cloud/aws/src/main/java/es-plugin.properties @@ -0,0 +1,2 @@ +plugin=org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin + diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java new file mode 100644 index 00000000000..2ebda552219 --- /dev/null +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.AmazonEC2Client; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; + +/** + * @author kimchy (shay.banon) + */ +public class AwsEc2Service extends AbstractLifecycleComponent { + + private AmazonEC2Client client; + + @Inject public AwsEc2Service(Settings settings, SettingsFilter settingsFilter) { + super(settings); + + settingsFilter.addFilter(new AwsSettingsFilter()); + } + + public synchronized AmazonEC2 client() { + if (client != null) { + return client; + } + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + String protocol = componentSettings.get("protocol", "http").toLowerCase(); + if ("http".equals(protocol)) { + clientConfiguration.setProtocol(Protocol.HTTP); + } else if ("https".equals(protocol)) { + clientConfiguration.setProtocol(Protocol.HTTPS); + } else { + throw new ElasticSearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]"); + } + String account = componentSettings.get("access_key", settings.get("cloud.account")); + String key = componentSettings.get("secret_key", settings.get("cloud.key")); + + if (account == null) { + throw new ElasticSearchIllegalArgumentException("No s3 access_key defined for s3 gateway"); + } + if (key == null) { + throw new ElasticSearchIllegalArgumentException("No s3 secret_key defined for s3 gateway"); + } + + this.client = new AmazonEC2Client(new BasicAWSCredentials(account, key), clientConfiguration); + + if (componentSettings.get("endpoint") != null) { + client.setEndpoint(componentSettings.get("endpoint")); + } + + return this.client; + + } + + @Override protected void doStart() throws ElasticSearchException { + } + + @Override protected void doStop() throws ElasticSearchException { + } + + @Override protected void doClose() throws ElasticSearchException { + client.shutdown(); + } +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/CloudModule.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsModule.java similarity index 74% rename from plugins/cloud/src/main/java/org/elasticsearch/cloud/CloudModule.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsModule.java index 80c6f7bd137..3d61eaacccc 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/CloudModule.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsModule.java @@ -17,19 +17,17 @@ * under the License. */ -package org.elasticsearch.cloud; +package org.elasticsearch.cloud.aws; -import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; -import org.elasticsearch.cloud.compute.CloudComputeService; import org.elasticsearch.common.inject.AbstractModule; /** * @author kimchy (shay.banon) */ -public class CloudModule extends AbstractModule { +public class AwsModule extends AbstractModule { @Override protected void configure() { - bind(CloudComputeService.class).asEagerSingleton(); - bind(CloudBlobStoreService.class).asEagerSingleton(); + bind(AwsS3Service.class).asEagerSingleton(); + bind(AwsEc2Service.class).asEagerSingleton(); } } diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java new file mode 100644 index 00000000000..e3c563f051e --- /dev/null +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; + +/** + * @author kimchy (shay.banon) + */ +public class AwsS3Service extends AbstractLifecycleComponent { + + private AmazonS3Client client; + + @Inject public AwsS3Service(Settings settings, SettingsFilter settingsFilter) { + super(settings); + + settingsFilter.addFilter(new AwsSettingsFilter()); + } + + public synchronized AmazonS3 client() { + if (client != null) { + return client; + } + + ClientConfiguration clientConfiguration = new ClientConfiguration(); + String protocol = componentSettings.get("protocol", "http").toLowerCase(); + if ("http".equals(protocol)) { + clientConfiguration.setProtocol(Protocol.HTTP); + } else if ("https".equals(protocol)) { + clientConfiguration.setProtocol(Protocol.HTTPS); + } else { + throw new ElasticSearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]"); + } + String account = componentSettings.get("access_key", settings.get("cloud.account")); + String key = componentSettings.get("secret_key", settings.get("cloud.key")); + + if (account == null) { + throw new ElasticSearchIllegalArgumentException("No s3 access_key defined for s3 gateway"); + } + if (key == null) { + throw new ElasticSearchIllegalArgumentException("No s3 secret_key defined for s3 gateway"); + } + + this.client = new AmazonS3Client(new BasicAWSCredentials(account, key), clientConfiguration); + + if (componentSettings.get("endpoint") != null) { + client.setEndpoint(componentSettings.get("endpoint")); + } + + return this.client; + } + + @Override protected void doStart() throws ElasticSearchException { + } + + @Override protected void doStop() throws ElasticSearchException { + } + + @Override protected void doClose() throws ElasticSearchException { + if (client != null) { + client.shutdown(); + } + } + +} diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsSettingsFilter.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsSettingsFilter.java new file mode 100644 index 00000000000..9e58b89ac48 --- /dev/null +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/AwsSettingsFilter.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.SettingsFilter; + +/** + * @author kimchy (shay.banon) + */ +public class AwsSettingsFilter implements SettingsFilter.Filter { + + @Override public void filter(ImmutableSettings.Builder settings) { + settings.remove("cloud.key"); + settings.remove("cloud.account"); + settings.remove("cloud.aws.access_key"); + settings.remove("cloud.aws.secret_key"); + } +} diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java new file mode 100644 index 00000000000..66e7e94a0e4 --- /dev/null +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws.blobstore; + +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.collect.ImmutableMap; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; + +/** + * @author kimchy (shay.banon) + */ +public class AbstarctS3BlobContainer extends AbstractBlobContainer { + + protected final S3BlobStore blobStore; + + protected final String keyPath; + + public AbstarctS3BlobContainer(BlobPath path, S3BlobStore blobStore) { + super(path); + this.blobStore = blobStore; + this.keyPath = path.buildAsString("/") + "/"; + } + + @Override public boolean blobExists(String blobName) { + try { + blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName)); + return true; + } catch (Exception e) { + return false; + } + } + + @Override public boolean deleteBlob(String blobName) throws IOException { + blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName)); + return true; + } + + @Override public void readBlob(final String blobName, final ReadBlobListener listener) { + blobStore.executor().execute(new Runnable() { + @Override public void run() { + InputStream is; + try { + S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); + is = object.getObjectContent(); + } catch (Exception e) { + listener.onFailure(e); + return; + } + byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; + try { + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + listener.onPartial(buffer, 0, bytesRead); + } + listener.onCompleted(); + } catch (Exception e) { + try { + is.close(); + } catch (IOException e1) { + // ignore + } + listener.onFailure(e); + } + } + }); + } + + @Override public ImmutableMap listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { + ImmutableMap.Builder blobsBuilder = ImmutableMap.builder(); + ObjectListing prevListing = null; + while (true) { + ObjectListing list; + if (prevListing != null) { + list = blobStore.client().listNextBatchOfObjects(prevListing); + } else { + if (blobNamePrefix != null) { + list = blobStore.client().listObjects(blobStore.bucket(), buildKey(blobNamePrefix)); + } else { + list = blobStore.client().listObjects(blobStore.bucket(), keyPath); + } + } + for (S3ObjectSummary summary : list.getObjectSummaries()) { + String name = summary.getKey().substring(keyPath.length()); + blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize(), null)); + } + if (list.isTruncated()) { + prevListing = list; + } else { + break; + } + } + return blobsBuilder.build(); + } + + @Override public ImmutableMap listBlobs() throws IOException { + return listBlobsByPrefix(null); + } + + protected String buildKey(String blobName) { + return keyPath + blobName; + } +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStore.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java similarity index 52% rename from plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStore.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java index 57f9b9c9e5e..fd686ce592f 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStore.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java @@ -17,9 +17,11 @@ * under the License. */ -package org.elasticsearch.cloud.blobstore; +package org.elasticsearch.cloud.aws.blobstore; -import org.elasticsearch.ElasticSearchIllegalArgumentException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; import org.elasticsearch.common.blobstore.AppendableBlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -29,86 +31,65 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.jclouds.blobstore.AsyncBlobStore; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.domain.Location; -import java.util.Set; +import javax.annotation.Nullable; import java.util.concurrent.Executor; /** * @author kimchy (shay.banon) */ -public class CloudBlobStore extends AbstractComponent implements BlobStore { +public class S3BlobStore extends AbstractComponent implements BlobStore { - private final BlobStoreContext blobStoreContext; + private final AmazonS3 client; - private final String container; + private final String bucket; - private final Location location; + private final String region; private final Executor executor; private final int bufferSizeInBytes; - public CloudBlobStore(Settings settings, BlobStoreContext blobStoreContext, Executor executor, String container, String location) { + public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, Executor executor) { super(settings); - this.blobStoreContext = blobStoreContext; - this.container = container; + this.client = client; + this.bucket = bucket; + this.region = region; this.executor = executor; this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); - if (location == null) { - this.location = null; - } else { - Location matchedLocation = null; - Set assignableLocations = blobStoreContext.getBlobStore().listAssignableLocations(); - for (Location oLocation : assignableLocations) { - if (oLocation.getId().equals(location)) { - matchedLocation = oLocation; - break; - } - } - this.location = matchedLocation; - if (this.location == null) { - throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + assignableLocations); + if (!client.doesBucketExist(bucket)) { + if (region != null) { + client.createBucket(bucket, region); + } else { + client.createBucket(bucket); } } - logger.debug("Using location [{}], container [{}]", this.location, this.container); - sync().createContainerInLocation(this.location, container); } @Override public String toString() { - return container; + return (region == null ? "" : region + "/") + bucket; } - public int bufferSizeInBytes() { - return this.bufferSizeInBytes; + public AmazonS3 client() { + return client; + } + + public String bucket() { + return bucket; } public Executor executor() { return executor; } - public String container() { - return this.container; - } - - public Location location() { - return this.location; - } - - public AsyncBlobStore async() { - return blobStoreContext.getAsyncBlobStore(); - } - - public org.jclouds.blobstore.BlobStore sync() { - return blobStoreContext.getBlobStore(); + public int bufferSizeInBytes() { + return bufferSizeInBytes; } @Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { - return new CloudImmutableBlobContainer(path, this); + return new S3ImmutableBlobContainer(path, this); } @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) { @@ -116,7 +97,23 @@ public class CloudBlobStore extends AbstractComponent implements BlobStore { } @Override public void delete(BlobPath path) { - sync().deleteDirectory(container, path.buildAsString("/")); + ObjectListing prevListing = null; + while (true) { + ObjectListing list; + if (prevListing != null) { + list = client.listNextBatchOfObjects(prevListing); + } else { + list = client.listObjects(bucket, path.buildAsString("/")); + } + for (S3ObjectSummary summary : list.getObjectSummaries()) { + client.deleteObject(summary.getBucketName(), summary.getKey()); + } + if (list.isTruncated()) { + prevListing = list; + } else { + break; + } + } } @Override public void close() { diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java similarity index 55% rename from plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java index f5ea4eba7ac..aacefec03fa 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudImmutableBlobContainer.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java @@ -17,46 +17,39 @@ * under the License. */ -package org.elasticsearch.cloud.blobstore; +package org.elasticsearch.cloud.aws.blobstore; -import com.google.common.util.concurrent.ListenableFuture; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.blobstore.support.BlobStores; -import org.jclouds.blobstore.domain.Blob; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.ExecutionException; /** * @author kimchy (shay.banon) */ -public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer implements ImmutableBlobContainer { +public class S3ImmutableBlobContainer extends AbstarctS3BlobContainer implements ImmutableBlobContainer { - public CloudImmutableBlobContainer(BlobPath path, CloudBlobStore cloudBlobStore) { - super(path, cloudBlobStore); + public S3ImmutableBlobContainer(BlobPath path, S3BlobStore blobStore) { + super(path, blobStore); } - @Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) { - Blob blob = cloudBlobStore.async().newBlob(buildBlobPath(blobName)); - blob.setPayload(is); - blob.setContentLength(sizeInBytes); - final ListenableFuture future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob); - future.addListener(new Runnable() { + @Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { + blobStore.executor().execute(new Runnable() { @Override public void run() { try { - future.get(); + ObjectMetadata md = new ObjectMetadata(); + md.setContentLength(sizeInBytes); + PutObjectResult objectResult = blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md); listener.onCompleted(); - } catch (InterruptedException e) { + } catch (Exception e) { listener.onFailure(e); - } catch (ExecutionException e) { - listener.onFailure(e.getCause()); - } catch (Throwable t) { - listener.onFailure(t); } } - }, cloudBlobStore.executor()); + }); } @Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java new file mode 100644 index 00000000000..fc77e6c624e --- /dev/null +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.ec2; + +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.model.*; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableSet; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.PortsRange; +import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; + +import java.util.List; +import java.util.Set; + +/** + * @author kimchy (shay.banon) + */ +public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { + + private static enum HostType { + PRIVATE_IP, + PUBLIC_IP, + PRIVATE_DNS, + PUBLIC_DNS + } + + private final AmazonEC2 client; + + private final String ports; + + private final ImmutableSet groups; + + private final ImmutableSet availabilityZones; + + private final HostType hostType; + + @Inject public AwsEc2UnicastHostsProvider(Settings settings, AmazonEC2 client) { + super(settings); + this.client = client; + + this.hostType = HostType.valueOf(componentSettings.get("host_type", "private_ip").toUpperCase()); + this.ports = componentSettings.get("ports", "9300-9302"); + + Set groups = Sets.newHashSet(componentSettings.getAsArray("groups")); + if (componentSettings.get("groups") != null) { + groups.addAll(Strings.commaDelimitedListToSet(componentSettings.get("groups"))); + } + this.groups = ImmutableSet.copyOf(groups); + + Set availabilityZones = Sets.newHashSet(componentSettings.getAsArray("availability_zones")); + if (componentSettings.get("availability_zones") != null) { + availabilityZones.addAll(Strings.commaDelimitedListToSet(componentSettings.get("availability_zones"))); + } + this.availabilityZones = ImmutableSet.copyOf(availabilityZones); + } + + @Override public List buildDynamicNodes() { + List discoNodes = Lists.newArrayList(); + + DescribeInstancesResult descInstances = client.describeInstances(new DescribeInstancesRequest()); + + logger.trace("building dynamic unicast discovery nodes..."); + for (Reservation reservation : descInstances.getReservations()) { + if (!groups.isEmpty()) { + // lets see if we can filter based on groups + boolean filter = false; + for (String group : reservation.getGroupNames()) { + if (!groups.contains(group)) { + logger.trace("filtering out reservation {} based on group {}, not part of {}", reservation.getReservationId(), group, groups); + filter = true; + break; + } + } + if (filter) { + // if we are filtering, continue to the next reservation + continue; + } + } + + for (Instance instance : reservation.getInstances()) { + if (!availabilityZones.isEmpty()) { + if (!availabilityZones.contains(instance.getPlacement().getAvailabilityZone())) { + logger.trace("filtering out instance {} based on availability_zone {}, not part of {}", instance.getInstanceId(), instance.getPlacement().getAvailabilityZone(), availabilityZones); + continue; + } + } + InstanceState state = instance.getState(); + if (state.getName().equalsIgnoreCase("pending") || state.getName().equalsIgnoreCase("running")) { + String address = null; + switch (hostType) { + case PRIVATE_DNS: + address = instance.getPrivateDnsName(); + break; + case PRIVATE_IP: + address = instance.getPrivateIpAddress(); + break; + case PUBLIC_DNS: + address = instance.getPublicDnsName(); + break; + case PUBLIC_IP: + address = instance.getPublicDnsName(); + break; + } + for (int port : new PortsRange(ports).ports()) { + logger.trace("adding {}, address {}", instance.getInstanceId(), address); + discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + port, new InetSocketTransportAddress(address, port))); + } + } + } + } + + logger.debug("using dynamic discovery nodes {}", discoNodes); + + return discoNodes; + } +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudDiscovery.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java similarity index 80% rename from plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudDiscovery.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java index fcddbd7c666..fe2c519368a 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudDiscovery.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java @@ -17,9 +17,9 @@ * under the License. */ -package org.elasticsearch.discovery.cloud; +package org.elasticsearch.discovery.ec2; -import org.elasticsearch.cloud.compute.CloudComputeService; +import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.collect.ImmutableList; @@ -35,10 +35,10 @@ import org.elasticsearch.transport.TransportService; /** * @author kimchy (shay.banon) */ -public class CloudDiscovery extends ZenDiscovery { +public class Ec2Discovery extends ZenDiscovery { - @Inject public CloudDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, ZenPingService pingService, CloudComputeService computeService) { + @Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, + ClusterService clusterService, ZenPingService pingService, AwsEc2Service ec2Service) { super(settings, clusterName, threadPool, transportService, clusterService, pingService); if (settings.getAsBoolean("cloud.enabled", true)) { ImmutableList zenPings = pingService.zenPings(); @@ -51,7 +51,7 @@ public class CloudDiscovery extends ZenDiscovery { } // update the unicast zen ping to add cloud hosts provider // and, while we are at it, use only it and not the multicast for example - unicastZenPing.addHostsProvider(new CloudUnicastHostsProvider(settings, computeService)); + unicastZenPing.addHostsProvider(new AwsEc2UnicastHostsProvider(settings, ec2Service.client())); pingService.zenPings(ImmutableList.of(unicastZenPing)); } } diff --git a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudDiscoveryModule.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryModule.java similarity index 69% rename from plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudDiscoveryModule.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryModule.java index 954e27cf933..09ebce0a518 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudDiscoveryModule.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryModule.java @@ -17,19 +17,17 @@ * under the License. */ -package org.elasticsearch.discovery.cloud; +package org.elasticsearch.discovery.ec2; -import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.ping.ZenPingService; +import org.elasticsearch.discovery.zen.ZenDiscoveryModule; /** * @author kimchy (shay.banon) */ -public class CloudDiscoveryModule extends AbstractModule { +public class Ec2DiscoveryModule extends ZenDiscoveryModule { - @Override protected void configure() { - bind(ZenPingService.class).asEagerSingleton(); - bind(Discovery.class).to(CloudDiscovery.class).asEagerSingleton(); + @Override protected void bindDiscovery() { + bind(Discovery.class).to(Ec2Discovery.class).asEagerSingleton(); } -} \ No newline at end of file +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java similarity index 55% rename from plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java index 41db4244602..ebe5c180434 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java @@ -17,11 +17,12 @@ * under the License. */ -package org.elasticsearch.gateway.cloud; +package org.elasticsearch.gateway.s3; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; -import org.elasticsearch.cloud.blobstore.CloudBlobStore; -import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; +import org.elasticsearch.cloud.aws.AwsS3Service; +import org.elasticsearch.cloud.aws.blobstore.S3BlobStore; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; @@ -29,7 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.gateway.blobstore.BlobStoreGateway; -import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule; +import org.elasticsearch.index.gateway.s3.S3IndexGatewayModule; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -37,26 +38,33 @@ import java.io.IOException; /** * @author kimchy (shay.banon) */ -public class CloudGateway extends BlobStoreGateway { +public class S3Gateway extends BlobStoreGateway { - @Inject public CloudGateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, CloudBlobStoreService blobStoreService) throws IOException { + @Inject public S3Gateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException { super(settings); - String location = componentSettings.get("location"); - String container = componentSettings.get("container"); - if (container == null) { - throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting"); + String bucket = componentSettings.get("bucket"); + if (bucket == null) { + throw new ElasticSearchIllegalArgumentException("No bucket defined for s3 gateway"); } - initialize(new CloudBlobStore(settings, blobStoreService.context(), threadPool.cached(), container, location), clusterName, new ByteSizeValue(100, ByteSizeUnit.MB)); + String region = componentSettings.get("region"); + ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)); + + logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize); + + initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, threadPool.cached()), clusterName, chunkSize); + } + + @Override public void close() throws ElasticSearchException { + super.close(); } @Override public String type() { - return "cloud"; + return "s3"; } @Override public Class suggestIndexGateway() { - return CloudIndexGatewayModule.class; + return S3IndexGatewayModule.class; } } - diff --git a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGatewayModule.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3GatewayModule.java similarity index 85% rename from plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGatewayModule.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3GatewayModule.java index 211d9049b70..ca84c639887 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGatewayModule.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3GatewayModule.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.gateway.cloud; +package org.elasticsearch.gateway.s3; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.gateway.Gateway; @@ -25,9 +25,9 @@ import org.elasticsearch.gateway.Gateway; /** * @author kimchy (shay.banon) */ -public class CloudGatewayModule extends AbstractModule { +public class S3GatewayModule extends AbstractModule { @Override protected void configure() { - bind(Gateway.class).to(CloudGateway.class).asEagerSingleton(); + bind(Gateway.class).to(S3Gateway.class).asEagerSingleton(); } -} \ No newline at end of file +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGateway.java similarity index 82% rename from plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGateway.java index 8ef5b86236d..a79989ff0ee 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGateway.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway.cloud; +package org.elasticsearch.index.gateway.s3; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -30,17 +30,18 @@ import org.elasticsearch.index.settings.IndexSettings; /** * @author kimchy (shay.banon) */ -public class CloudIndexGateway extends BlobStoreIndexGateway { +public class S3IndexGateway extends BlobStoreIndexGateway { - @Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) { + @Inject public S3IndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) { super(index, indexSettings, gateway); } @Override public String type() { - return "cloud"; + return "s3"; } @Override public Class shardGatewayClass() { - return CloudIndexShardGateway.class; + return S3IndexShardGateway.class; } } + diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGatewayModule.java similarity index 84% rename from plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGatewayModule.java index f4232d457ce..005f141bf49 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexGatewayModule.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway.cloud; +package org.elasticsearch.index.gateway.s3; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.index.gateway.IndexGateway; @@ -25,9 +25,9 @@ import org.elasticsearch.index.gateway.IndexGateway; /** * @author kimchy (shay.banon) */ -public class CloudIndexGatewayModule extends AbstractModule { +public class S3IndexGatewayModule extends AbstractModule { @Override protected void configure() { - bind(IndexGateway.class).to(CloudIndexGateway.class).asEagerSingleton(); + bind(IndexGateway.class).to(S3IndexGateway.class).asEagerSingleton(); } -} \ No newline at end of file +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java similarity index 79% rename from plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java index 97f3bc5d46b..502dbffdba9 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.gateway.cloud; +package org.elasticsearch.index.gateway.s3; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -33,14 +33,15 @@ import org.elasticsearch.threadpool.ThreadPool; /** * @author kimchy (shay.banon) */ -public class CloudIndexShardGateway extends BlobStoreIndexShardGateway { +public class S3IndexShardGateway extends BlobStoreIndexShardGateway { - @Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway, - IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { + @Inject public S3IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway, + IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { super(shardId, indexSettings, threadPool, indexGateway, indexShard, store, recoveryThrottler); } @Override public String type() { - return "cloud"; + return "s3"; } } + diff --git a/plugins/cloud/src/main/java/org/elasticsearch/plugin/cloud/CloudPlugin.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/plugin/cloud/aws/CloudAwsPlugin.java similarity index 71% rename from plugins/cloud/src/main/java/org/elasticsearch/plugin/cloud/CloudPlugin.java rename to plugins/cloud/aws/src/main/java/org/elasticsearch/plugin/cloud/aws/CloudAwsPlugin.java index f3c2cc78e03..af0bc2e8cc0 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/plugin/cloud/CloudPlugin.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/plugin/cloud/aws/CloudAwsPlugin.java @@ -17,11 +17,12 @@ * under the License. */ -package org.elasticsearch.plugin.cloud; +package org.elasticsearch.plugin.cloud.aws; -import org.elasticsearch.cloud.CloudModule; -import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; -import org.elasticsearch.cloud.compute.CloudComputeService; +import org.elasticsearch.cloud.aws.AwsEc2Service; +import org.elasticsearch.cloud.aws.AwsModule; +import org.elasticsearch.cloud.aws.AwsS3Service; +import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; @@ -29,40 +30,38 @@ import org.elasticsearch.plugins.AbstractPlugin; import java.util.Collection; -import static org.elasticsearch.common.collect.Lists.*; - /** * @author kimchy (shay.banon) */ -public class CloudPlugin extends AbstractPlugin { +public class CloudAwsPlugin extends AbstractPlugin { private final Settings settings; - public CloudPlugin(Settings settings) { + public CloudAwsPlugin(Settings settings) { this.settings = settings; } @Override public String name() { - return "cloud"; + return "cloud-aws"; } @Override public String description() { - return "Cloud plugin"; + return "Cloud AWS Plugin"; } @Override public Collection> modules() { - Collection> modules = newArrayList(); + Collection> modules = Lists.newArrayList(); if (settings.getAsBoolean("cloud.enabled", true)) { - modules.add(CloudModule.class); + modules.add(AwsModule.class); } return modules; } @Override public Collection> services() { - Collection> services = newArrayList(); + Collection> services = Lists.newArrayList(); if (settings.getAsBoolean("cloud.enabled", true)) { - services.add(CloudComputeService.class); - services.add(CloudBlobStoreService.class); + services.add(AwsS3Service.class); + services.add(AwsEc2Service.class); } return services; } diff --git a/plugins/cloud/src/main/java/es-plugin.properties b/plugins/cloud/src/main/java/es-plugin.properties deleted file mode 100644 index c856e3393c5..00000000000 --- a/plugins/cloud/src/main/java/es-plugin.properties +++ /dev/null @@ -1 +0,0 @@ -plugin=org.elasticsearch.plugin.cloud.CloudPlugin diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/AbstractCloudBlobContainer.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/AbstractCloudBlobContainer.java deleted file mode 100644 index 86017abd016..00000000000 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/AbstractCloudBlobContainer.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.blobstore; - -import com.google.common.util.concurrent.ListenableFuture; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStoreException; -import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; -import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; -import org.elasticsearch.common.collect.ImmutableMap; -import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.PageSet; -import org.jclouds.blobstore.domain.StorageMetadata; -import org.jclouds.blobstore.options.ListContainerOptions; - -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.ExecutionException; - -/** - * @author kimchy (shay.banon) - */ -public class AbstractCloudBlobContainer extends AbstractBlobContainer { - - protected final CloudBlobStore cloudBlobStore; - - protected final String cloudPath; - - public AbstractCloudBlobContainer(BlobPath path, CloudBlobStore cloudBlobStore) { - super(path); - this.cloudBlobStore = cloudBlobStore; - this.cloudPath = path.buildAsString("/"); - } - - @Override public boolean deleteBlob(String blobName) throws IOException { - cloudBlobStore.sync().removeBlob(cloudBlobStore.container(), buildBlobPath(blobName)); - return true; - } - - @Override public boolean blobExists(String blobName) { - return cloudBlobStore.sync().blobExists(cloudBlobStore.container(), buildBlobPath(blobName)); - } - - @Override public void readBlob(final String blobName, final ReadBlobListener listener) { - final ListenableFuture future = cloudBlobStore.async().getBlob(cloudBlobStore.container(), buildBlobPath(blobName)); - future.addListener(new Runnable() { - @Override public void run() { - Blob blob; - try { - blob = future.get(); - if (blob == null) { - listener.onFailure(new BlobStoreException("No blob found for [" + buildBlobPath(blobName) + "]")); - return; - } - } catch (InterruptedException e) { - listener.onFailure(e); - return; - } catch (ExecutionException e) { - listener.onFailure(e.getCause()); - return; - } - byte[] buffer = new byte[cloudBlobStore.bufferSizeInBytes()]; - InputStream is = blob.getContent(); - try { - int bytesRead; - while ((bytesRead = is.read(buffer)) != -1) { - listener.onPartial(buffer, 0, bytesRead); - } - listener.onCompleted(); - } catch (Exception e) { - try { - is.close(); - } catch (IOException e1) { - // ignore - } - listener.onFailure(e); - } - } - }, cloudBlobStore.executor()); - } - - // inDirectory expects a directory, not a blob prefix -// @Override public ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException { -// PageSet list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(buildBlobPath(blobNamePrefix))); -// ImmutableMap.Builder blobs = ImmutableMap.builder(); -// for (StorageMetadata storageMetadata : list) { -// String name = storageMetadata.getName().substring(cloudPath.length() + 1); -// blobs.put(name, new PlainBlobMetaData(name, storageMetadata.getSize(), null)); -// } -// return blobs.build(); -// } - - @Override public ImmutableMap listBlobs() throws IOException { - PageSet list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(cloudPath)); - ImmutableMap.Builder blobs = ImmutableMap.builder(); - for (StorageMetadata storageMetadata : list) { - String name = storageMetadata.getName().substring(cloudPath.length() + 1); - blobs.put(name, new PlainBlobMetaData(name, storageMetadata.getSize(), null)); - } - return blobs.build(); - } - - protected String buildBlobPath(String blobName) { - return cloudPath + "/" + blobName; - } -} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java deleted file mode 100644 index 5d812e6996b..00000000000 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.blobstore; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.ElasticSearchIllegalStateException; -import org.elasticsearch.cloud.jclouds.JCloudsUtils; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsFilter; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.BlobStoreContextFactory; - -import java.io.IOException; - -/** - * @author kimchy (shay.banon) - */ -public class CloudBlobStoreService extends AbstractLifecycleComponent { - - private final String type; - - private final BlobStoreContext blobStoreContext; - - @Inject public CloudBlobStoreService(Settings settings, SettingsFilter settingsFilter) throws IOException { - super(settings); - - String type = componentSettings.get("type"); - if (type == null) { - // see if we can get a global type - type = settings.get("cloud.type"); - } - // consolidate names - if ("aws".equalsIgnoreCase(type) || "amazon".equalsIgnoreCase(type)) { - type = "s3"; - } else if ("rackspace".equalsIgnoreCase(type)) { - type = "cloudfiles"; - } - this.type = type; - - String account = componentSettings.get("account", settings.get("cloud.account")); - String key = componentSettings.get("key", settings.get("cloud.key")); - - if (type != null) { - blobStoreContext = new BlobStoreContextFactory().createContext(type, account, key, JCloudsUtils.buildModules(settings)); - logger.info("Connected to {}/{} blob store service", type, account); - } else { - blobStoreContext = null; - } - - settingsFilter.addFilter(new BlobStorSettingsFilter()); - } - - @Override protected void doStart() throws ElasticSearchException { - } - - @Override protected void doStop() throws ElasticSearchException { - } - - @Override protected void doClose() throws ElasticSearchException { - if (blobStoreContext != null) { - blobStoreContext.close(); - } - } - - public BlobStoreContext context() { - if (blobStoreContext == null) { - throw new ElasticSearchIllegalStateException("No cloud blobstore service started, have you configured the 'cloud.type' setting?"); - } - return blobStoreContext; - } - - - private static class BlobStorSettingsFilter implements SettingsFilter.Filter { - @Override public void filter(ImmutableSettings.Builder settings) { - settings.remove("cloud.key"); - settings.remove("cloud.account"); - settings.remove("cloud.blobstore.key"); - settings.remove("cloud.blobstore.account"); - } - } -} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/compute/CloudComputeService.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/compute/CloudComputeService.java deleted file mode 100644 index 9fe002dfc4e..00000000000 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/compute/CloudComputeService.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.compute; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.ElasticSearchIllegalStateException; -import org.elasticsearch.cloud.jclouds.JCloudsUtils; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsFilter; -import org.jclouds.compute.ComputeServiceContext; -import org.jclouds.compute.ComputeServiceContextFactory; - -import java.io.IOException; - -/** - * @author kimchy (shay.banon) - */ -public class CloudComputeService extends AbstractLifecycleComponent { - - private final String type; - - private final ComputeServiceContext computeServiceContext; - - @Inject public CloudComputeService(Settings settings, SettingsFilter filter) throws IOException { - super(settings); - - String type = componentSettings.get("type"); - if (type == null) { - // see if we can get a global type - type = settings.get("cloud.type"); - } - // consolidate names - if ("aws".equalsIgnoreCase(type) || "amazon".equalsIgnoreCase(type)) { - type = "ec2"; - } else if ("rackspace".equalsIgnoreCase(type)) { - type = "cloudservers"; - } - this.type = type; - - String account = componentSettings.get("account", settings.get("cloud.account")); - String key = componentSettings.get("key", settings.get("cloud.key")); - - if (type != null) { - computeServiceContext = new ComputeServiceContextFactory().createContext(type, account, key, JCloudsUtils.buildModules(settings)); - logger.info("Connected to {}/{} compute service", type, account); - } else { - computeServiceContext = null; - } - - filter.addFilter(new ComputeSettingsFilter()); - } - - @Override protected void doStart() throws ElasticSearchException { - } - - @Override protected void doStop() throws ElasticSearchException { - } - - @Override protected void doClose() throws ElasticSearchException { - if (computeServiceContext != null) { - computeServiceContext.close(); - } - } - - public ComputeServiceContext context() { - if (computeServiceContext == null) { - throw new ElasticSearchIllegalStateException("No cloud compute service started, have you configured the 'cloud.type' setting?"); - } - return this.computeServiceContext; - } - - private static class ComputeSettingsFilter implements SettingsFilter.Filter { - @Override public void filter(ImmutableSettings.Builder settings) { - settings.remove("cloud.key"); - settings.remove("cloud.account"); - settings.remove("cloud.compute.key"); - settings.remove("cloud.compute.account"); - } - } -} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java deleted file mode 100644 index b920ca57cf8..00000000000 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.jclouds; - -import com.google.inject.Module; -import org.elasticsearch.cloud.jclouds.logging.JCloudsLoggingModule; -import org.elasticsearch.common.collect.ImmutableList; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.jclouds.concurrent.config.ExecutorServiceModule; - -import java.util.concurrent.Executors; - -/** - * @author kimchy (shay.banon) - */ -public class JCloudsUtils { - - public static Iterable buildModules(Settings settings) { - return ImmutableList.of(new JCloudsLoggingModule(settings), - new ExecutorServiceModule( - Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-user")), - Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-io")))); - } -} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/logging/JCloudsLoggingModule.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/logging/JCloudsLoggingModule.java deleted file mode 100644 index f4e0760fe0c..00000000000 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/logging/JCloudsLoggingModule.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.jclouds.logging; - -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; -import org.jclouds.logging.BaseLogger; -import org.jclouds.logging.Logger; -import org.jclouds.logging.config.LoggingModule; - -/** - * @author kimchy (shay.banon) - */ -public class JCloudsLoggingModule extends LoggingModule { - - private final Settings settings; - - public JCloudsLoggingModule(Settings settings) { - this.settings = settings; - } - - @Override public Logger.LoggerFactory createLoggerFactory() { - return new Logger.LoggerFactory() { - @Override public Logger getLogger(String s) { - return new JCloudsESLogger(Loggers.getLogger(s.replace("org.jclouds", "cloud.jclouds"), settings)); - } - }; - } - - private static class JCloudsESLogger extends BaseLogger { - - private final ESLogger logger; - - private JCloudsESLogger(ESLogger logger) { - this.logger = logger; - } - - @Override protected void logError(String s, Throwable throwable) { - logger.error(s, throwable); - } - - @Override protected void logError(String s) { - logger.error(s); - } - - @Override protected void logWarn(String s, Throwable throwable) { - logger.warn(s, throwable); - } - - @Override protected void logWarn(String s) { - logger.warn(s); - } - - @Override protected void logInfo(String s) { - logger.info(s); - } - - @Override protected void logDebug(String s) { - logger.debug(s); - } - - @Override protected void logTrace(String s) { - logger.trace(s); - } - - @Override public String getCategory() { - return logger.getName(); - } - - @Override public boolean isTraceEnabled() { - return logger.isTraceEnabled(); - } - - @Override public boolean isDebugEnabled() { - return logger.isDebugEnabled(); - } - - @Override public boolean isInfoEnabled() { - return logger.isInfoEnabled(); - } - - @Override public boolean isWarnEnabled() { - return logger.isWarnEnabled(); - } - - @Override public boolean isErrorEnabled() { - return logger.isErrorEnabled(); - } - } -} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudUnicastHostsProvider.java b/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudUnicastHostsProvider.java deleted file mode 100644 index 4443225ccd7..00000000000 --- a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudUnicastHostsProvider.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.cloud; - -import org.elasticsearch.cloud.compute.CloudComputeService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.PortsRange; -import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; -import org.jclouds.compute.ComputeService; -import org.jclouds.compute.domain.ComputeMetadata; -import org.jclouds.compute.domain.NodeMetadata; -import org.jclouds.compute.domain.NodeState; -import org.jclouds.domain.Location; - -import java.util.List; -import java.util.Set; - -import static org.elasticsearch.common.collect.Lists.*; - -/** - * @author kimchy (shay.banon) - */ -public class CloudUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { - - private final ComputeService computeService; - - private final String ports; - - private final String tag; - - private final String location; - - public CloudUnicastHostsProvider(Settings settings, CloudComputeService computeService) { - super(settings); - this.computeService = computeService.context().getComputeService(); - this.tag = componentSettings.get("tag"); - this.location = componentSettings.get("location"); - this.ports = componentSettings.get("ports", "9300-9302"); - // parse the ports just to see that they are valid - new PortsRange(ports).ports(); - } - - @Override public List buildDynamicNodes() { - List discoNodes = newArrayList(); - Set nodes = computeService.listNodes(); - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("Processing Nodes:"); - for (ComputeMetadata node : nodes) { - sb.append("\n -> ").append(node); - } - logger.trace(sb.toString()); - } - for (ComputeMetadata node : nodes) { - NodeMetadata nodeMetadata; - if (node instanceof NodeMetadata) { - nodeMetadata = (NodeMetadata) node; - } else { - nodeMetadata = computeService.getNodeMetadata(node.getId()); - } - if (tag != null && !nodeMetadata.getTag().equals(tag)) { - logger.trace("Filtering node {} with unmatched tag {}", nodeMetadata.getName(), nodeMetadata.getTag()); - continue; - } - boolean filteredByLocation = true; - if (location != null) { - Location nodeLocation = nodeMetadata.getLocation(); - if (location.equals(nodeLocation.getId())) { - filteredByLocation = false; - } else { - if (nodeLocation.getParent() != null) { - if (location.equals(nodeLocation.getParent().getId())) { - filteredByLocation = false; - } - } - } - } else { - filteredByLocation = false; - } - if (filteredByLocation) { - logger.trace("Filtering node {} with unmatched location {}", nodeMetadata.getName(), nodeMetadata.getLocation()); - continue; - } - if (nodeMetadata.getState() == NodeState.PENDING || nodeMetadata.getState() == NodeState.RUNNING) { - logger.debug("Adding {}, addresses {}", nodeMetadata.getName(), nodeMetadata.getPrivateAddresses()); - for (String inetAddress : nodeMetadata.getPrivateAddresses()) { - for (int port : new PortsRange(ports).ports()) { - discoNodes.add(new DiscoveryNode("#cloud-" + inetAddress + "-" + port, new InetSocketTransportAddress(inetAddress, port))); - } - } - } - } - return discoNodes; - } -} diff --git a/settings.gradle b/settings.gradle index 28ba01cb4c3..a88c55c07bf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,7 +8,7 @@ include 'test-integration' include 'benchmark-micro' -include 'plugins-cloud' +include 'plugins-cloud-aws' include 'plugins-hadoop' include 'plugins-analysis-icu' include 'plugins-mapper-attachments'