remove cloud plugin, reimplement an AWS specific cloud plugin with S3 gateway and EC2 discovery

This commit is contained in:
kimchy 2010-07-24 23:39:43 +03:00
parent a6a993ff08
commit 77b6d1d8b8
40 changed files with 711 additions and 778 deletions

View File

@ -49,6 +49,7 @@
<w>hdfs</w> <w>hdfs</w>
<w>histo</w> <w>histo</w>
<w>hpux</w> <w>hpux</w>
<w>https</w>
<w>hyperic</w> <w>hyperic</w>
<w>ifconfig</w> <w>ifconfig</w>
<w>indices</w> <w>indices</w>

View File

@ -7,9 +7,9 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules/elasticsearch-root.iml" filepath="$PROJECT_DIR$/.idea/modules/elasticsearch-root.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules/elasticsearch-root.iml" filepath="$PROJECT_DIR$/.idea/modules/elasticsearch-root.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" filepath="$PROJECT_DIR$/.idea/modules/plugin-analysis-icu.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-client-groovy.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-cloud-aws.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-cloud.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-cloud.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//test-integration.iml" filepath="$PROJECT_DIR$/.idea/modules//test-integration.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//test-integration.iml" filepath="$PROJECT_DIR$/.idea/modules//test-integration.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//test-testng.iml" filepath="$PROJECT_DIR$/.idea/modules//test-testng.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/modules//test-testng.iml" filepath="$PROJECT_DIR$/.idea/modules//test-testng.iml" />

View File

@ -18,7 +18,7 @@
<orderEntry type="module" module-name="plugin-transport-memcached" /> <orderEntry type="module" module-name="plugin-transport-memcached" />
<orderEntry type="module" module-name="plugin-analysis-icu" /> <orderEntry type="module" module-name="plugin-analysis-icu" />
<orderEntry type="module" module-name="plugins-hadoop" /> <orderEntry type="module" module-name="plugins-hadoop" />
<orderEntry type="module" module-name="plugins-cloud" /> <orderEntry type="module" module-name="plugin-cloud-aws" />
<orderEntry type="module" module-name="test-integration" /> <orderEntry type="module" module-name="test-integration" />
</component> </component>
</module> </module>

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/cloud/aws/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/cloud/aws/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/cloud/aws">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/cloud/aws/src/main/java" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/cloud/aws/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module-library">
<library name="aws-java-sdk">
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/com.amazonaws/aws-java-sdk/jars/aws-java-sdk-1.0.007.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/commons-httpclient/commons-httpclient/jars/commons-httpclient-3.0.1.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/commons-codec/commons-codec/jars/commons-codec-1.3.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/commons-logging/commons-logging/jars/commons-logging-1.1.1.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$MODULE_DIR$/../../../../../opt/aws-java-sdk/1.0.007/lib/aws-java-sdk-1.0.007-sources.jar!/" />
</SOURCES>
</library>
</orderEntry>
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
</component>
</module>

View File

@ -2,6 +2,8 @@ rootLogger: INFO, console, file
logger: logger:
# log action execution errors for easier debugging # log action execution errors for easier debugging
action: DEBUG action: DEBUG
# reduce the logging for aws, too much is logged under the default INFO
com.amazonaws: WARN
# gateway # gateway
#gateway: DEBUG #gateway: DEBUG

View File

@ -50,13 +50,18 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl
@Override public void append(final AppendBlobListener listener) { @Override public void append(final AppendBlobListener listener) {
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
String partBlobName = blobName + ".a" + (part++);
try { try {
listener.withStream(out); listener.withStream(out);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
return; return;
} }
if (out.size() == 0) {
// nothing to write, bail
listener.onCompleted();
return;
}
String partBlobName = blobName + ".a" + (part++);
// use teh sync one // use teh sync one
ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size()); ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size());
container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() { container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() {
@ -120,7 +125,7 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl
ImmutableMap<String, BlobMetaData> blobs = buildVirtualBlobs(container.listBlobs()); ImmutableMap<String, BlobMetaData> blobs = buildVirtualBlobs(container.listBlobs());
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
if (filter.accept(blobName)) { if (filter.accept(blobName)) {
container.deleteBlob(blobName); deleteBlob(blobName);
} }
} }
} }

View File

@ -28,7 +28,7 @@ import org.elasticsearch.discovery.zen.ZenDiscoveryModule;
import static org.elasticsearch.common.inject.ModulesFactory.*; import static org.elasticsearch.common.inject.ModulesFactory.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class DiscoveryModule extends AbstractModule { public class DiscoveryModule extends AbstractModule {

View File

@ -30,6 +30,10 @@ public class ZenDiscoveryModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(ZenPingService.class).asEagerSingleton(); bind(ZenPingService.class).asEagerSingleton();
bindDiscovery();
}
protected void bindDiscovery() {
bind(Discovery.class).to(ZenDiscovery.class).asEagerSingleton(); bind(Discovery.class).to(ZenDiscovery.class).asEagerSingleton();
} }
} }

View File

@ -180,9 +180,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
*/ */
private Boolean readFromGateway(@Nullable TimeValue waitTimeout) { private Boolean readFromGateway(@Nullable TimeValue waitTimeout) {
logger.debug("reading state from gateway {} ...", gateway); logger.debug("reading state from gateway {} ...", gateway);
StopWatch stopWatch = new StopWatch().start();
MetaData metaData; MetaData metaData;
try { try {
metaData = gateway.read(); metaData = gateway.read();
logger.debug("read state from gateway {}, took {}", gateway, stopWatch.stop().totalTime());
} catch (Exception e) { } catch (Exception e) {
logger.error("failed to read from gateway", e); logger.error("failed to read from gateway", e);
markMetaDataAsReadFromGateway("failure"); markMetaDataAsReadFromGateway("failure");

View File

@ -44,6 +44,10 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
return TYPE; return TYPE;
} }
@Override public String toString() {
return "_none_";
}
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
} }

View File

@ -140,7 +140,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
throttlingWaitTime.stop(); throttlingWaitTime.stop();
try { try {
logger.debug("starting recovery from {}", shardGateway); logger.debug("starting recovery from {} ...", shardGateway);
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover(); IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();

View File

@ -323,10 +323,21 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// delete the old translog // delete the old translog
if (snapshot.newTranslogCreated()) { if (snapshot.newTranslogCreated()) {
try { try {
translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId()); translogContainer.deleteBlobsByFilter(new BlobContainer.BlobNameFilter() {
@Override public boolean accept(String blobName) {
// delete all the ones that are not this translog
return !blobName.equals("translog-" + translogSnapshot.translogId());
}
});
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }
// NOT doing this one, the above allows us to clean the translog properly
// try {
// translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId());
// } catch (Exception e) {
// // ignore
// }
} }
// delete old index files // delete old index files

View File

@ -45,7 +45,7 @@ public class NoneIndexGateway extends AbstractIndexComponent implements IndexGat
} }
@Override public String toString() { @Override public String toString() {
return "none"; return "_none_";
} }
@Override public void close(boolean delete) { @Override public void close(boolean delete) {

View File

@ -44,6 +44,10 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
this.indexShard = (InternalIndexShard) indexShard; this.indexShard = (InternalIndexShard) indexShard;
} }
@Override public String toString() {
return "_none_";
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
// in the none case, we simply start the shard // in the none case, we simply start the shard
// clean the store, there should be nothing there... // clean the store, there should be nothing there...

View File

@ -3,7 +3,7 @@ dependsOn(':elasticsearch')
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'maven' apply plugin: 'maven'
archivesBaseName = "elasticsearch-cloud" archivesBaseName = "elasticsearch-cloud-aws"
explodedDistDir = new File(distsDir, 'exploded') explodedDistDir = new File(distsDir, 'exploded')
@ -14,51 +14,32 @@ configurations.testCompile.transitive = true
sourceSets.main.resources.srcDirs 'src/main/java' sourceSets.main.resources.srcDirs 'src/main/java'
sourceSets.test.resources.srcDirs 'src/test/java' sourceSets.test.resources.srcDirs 'src/test/java'
// add the source files to the dist jar jar {
//jar {
// from sourceSets.main.allJava // from sourceSets.main.allJava
//} manifest {
attributes("Implementation-Title": "ElasticSearch", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
}
}
configurations { configurations {
dists dists
distLib { distLib {
visible = false visible = false
transitive = false
} }
} }
repositories {
mavenRepo urls: "http://jclouds.googlecode.com/svn/repo"
mavenRepo urls: "http://jclouds.rimuhosting.com/maven2/snapshots"
mavenRepo urls: "http://java-xmlbuilder.googlecode.com/svn/repo"
}
jcloudsVersion = "1.0-beta-6"
dependencies { dependencies {
compile project(':elasticsearch') compile project(':elasticsearch')
compile("org.jclouds:jclouds-blobstore:$jcloudsVersion") compile("com.amazonaws:aws-java-sdk:1.0.007") { transitive = false }
compile("org.jclouds:jclouds-aws:$jcloudsVersion") runtime("commons-logging:commons-logging:1.1.1") { transitive = false }
compile("org.jclouds:jclouds-rackspace:$jcloudsVersion") runtime("commons-codec:commons-codec:1.3") { transitive = false }
compile("org.jclouds:jclouds-atmos:$jcloudsVersion") runtime("commons-httpclient:commons-httpclient:3.0.1") { transitive = false }
compile("org.jclouds:jclouds-azure:$jcloudsVersion")
compile("org.jclouds:jclouds-gogrid:$jcloudsVersion")
compile("org.jclouds:jclouds-vcloud:$jcloudsVersion")
compile("org.jclouds:jclouds-rimuhosting:$jcloudsVersion")
compile("org.jclouds:jclouds-terremark:$jcloudsVersion")
compile("org.jclouds:jclouds-bluelock:$jcloudsVersion")
compile("org.jclouds:jclouds-hostingdotcom:$jcloudsVersion")
distLib("org.jclouds:jclouds-blobstore:$jcloudsVersion") distLib("com.amazonaws:aws-java-sdk:1.0.007") { transitive = false }
distLib("org.jclouds:jclouds-aws:$jcloudsVersion") distLib("commons-codec:commons-codec:1.3") { transitive = false }
distLib("org.jclouds:jclouds-rackspace:$jcloudsVersion") distLib("commons-logging:commons-logging:1.1.1") { transitive = false }
distLib("org.jclouds:jclouds-atmos:$jcloudsVersion") distLib("commons-httpclient:commons-httpclient:3.0.1") { transitive = false }
distLib("org.jclouds:jclouds-azure:$jcloudsVersion")
distLib("org.jclouds:jclouds-gogrid:$jcloudsVersion")
distLib("org.jclouds:jclouds-vcloud:$jcloudsVersion")
distLib("org.jclouds:jclouds-rimuhosting:$jcloudsVersion")
distLib("org.jclouds:jclouds-terremark:$jcloudsVersion")
distLib("org.jclouds:jclouds-bluelock:$jcloudsVersion")
distLib("org.jclouds:jclouds-hostingdotcom:$jcloudsVersion")
testCompile project(':test-testng') testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false } testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
@ -124,13 +105,6 @@ task javadocJar(type: Jar, dependsOn: javadoc) {
from javadoc.destinationDir from javadoc.destinationDir
} }
jar {
// from sourceSets.main.allJava
manifest {
attributes("Implementation-Title": "ElasticSearch", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
}
}
artifacts { artifacts {
archives sourcesJar archives sourcesJar
archives javadocJar archives javadocJar
@ -148,8 +122,8 @@ uploadArchives {
pom.project { pom.project {
inceptionYear '2009' inceptionYear '2009'
name 'elasticsearch-plugins-cloud' name 'elasticsearch-plugins-cloud-aws'
description 'Clouud Plugin for ElasticSearch' description 'Cloud AWS Plugin for ElasticSearch'
licenses { licenses {
license { license {
name 'The Apache Software License, Version 2.0' name 'The Apache Software License, Version 2.0'

View File

@ -0,0 +1,2 @@
plugin=org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
/**
* @author kimchy (shay.banon)
*/
public class AwsEc2Service extends AbstractLifecycleComponent<AwsEc2Service> {
private AmazonEC2Client client;
@Inject public AwsEc2Service(Settings settings, SettingsFilter settingsFilter) {
super(settings);
settingsFilter.addFilter(new AwsSettingsFilter());
}
public synchronized AmazonEC2 client() {
if (client != null) {
return client;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
String protocol = componentSettings.get("protocol", "http").toLowerCase();
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new ElasticSearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
if (account == null) {
throw new ElasticSearchIllegalArgumentException("No s3 access_key defined for s3 gateway");
}
if (key == null) {
throw new ElasticSearchIllegalArgumentException("No s3 secret_key defined for s3 gateway");
}
this.client = new AmazonEC2Client(new BasicAWSCredentials(account, key), clientConfiguration);
if (componentSettings.get("endpoint") != null) {
client.setEndpoint(componentSettings.get("endpoint"));
}
return this.client;
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
client.shutdown();
}
}

View File

@ -17,19 +17,17 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.cloud; package org.elasticsearch.cloud.aws;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.compute.CloudComputeService;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudModule extends AbstractModule { public class AwsModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(CloudComputeService.class).asEagerSingleton(); bind(AwsS3Service.class).asEagerSingleton();
bind(CloudBlobStoreService.class).asEagerSingleton(); bind(AwsEc2Service.class).asEagerSingleton();
} }
} }

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
/**
* @author kimchy (shay.banon)
*/
public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
private AmazonS3Client client;
@Inject public AwsS3Service(Settings settings, SettingsFilter settingsFilter) {
super(settings);
settingsFilter.addFilter(new AwsSettingsFilter());
}
public synchronized AmazonS3 client() {
if (client != null) {
return client;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
String protocol = componentSettings.get("protocol", "http").toLowerCase();
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new ElasticSearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
if (account == null) {
throw new ElasticSearchIllegalArgumentException("No s3 access_key defined for s3 gateway");
}
if (key == null) {
throw new ElasticSearchIllegalArgumentException("No s3 secret_key defined for s3 gateway");
}
this.client = new AmazonS3Client(new BasicAWSCredentials(account, key), clientConfiguration);
if (componentSettings.get("endpoint") != null) {
client.setEndpoint(componentSettings.get("endpoint"));
}
return this.client;
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
if (client != null) {
client.shutdown();
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.SettingsFilter;
/**
* @author kimchy (shay.banon)
*/
public class AwsSettingsFilter implements SettingsFilter.Filter {
@Override public void filter(ImmutableSettings.Builder settings) {
settings.remove("cloud.key");
settings.remove("cloud.account");
settings.remove("cloud.aws.access_key");
settings.remove("cloud.aws.secret_key");
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
/**
* @author kimchy (shay.banon)
*/
public class AbstarctS3BlobContainer extends AbstractBlobContainer {
protected final S3BlobStore blobStore;
protected final String keyPath;
public AbstarctS3BlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path);
this.blobStore = blobStore;
this.keyPath = path.buildAsString("/") + "/";
}
@Override public boolean blobExists(String blobName) {
try {
blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
return true;
} catch (Exception e) {
return false;
}
}
@Override public boolean deleteBlob(String blobName) throws IOException {
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
return true;
}
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
InputStream is;
try {
S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
is = object.getObjectContent();
} catch (Exception e) {
listener.onFailure(e);
return;
}
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
try {
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
listener.onCompleted();
} catch (Exception e) {
try {
is.close();
} catch (IOException e1) {
// ignore
}
listener.onFailure(e);
}
}
});
}
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
list = blobStore.client().listNextBatchOfObjects(prevListing);
} else {
if (blobNamePrefix != null) {
list = blobStore.client().listObjects(blobStore.bucket(), buildKey(blobNamePrefix));
} else {
list = blobStore.client().listObjects(blobStore.bucket(), keyPath);
}
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String name = summary.getKey().substring(keyPath.length());
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize(), null));
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
return blobsBuilder.build();
}
@Override public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}
protected String buildKey(String blobName) {
return keyPath + blobName;
}
}

View File

@ -17,9 +17,11 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.cloud.blobstore; package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.common.blobstore.AppendableBlobContainer; import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
@ -29,86 +31,65 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.domain.Location;
import java.util.Set; import javax.annotation.Nullable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudBlobStore extends AbstractComponent implements BlobStore { public class S3BlobStore extends AbstractComponent implements BlobStore {
private final BlobStoreContext blobStoreContext; private final AmazonS3 client;
private final String container; private final String bucket;
private final Location location; private final String region;
private final Executor executor; private final Executor executor;
private final int bufferSizeInBytes; private final int bufferSizeInBytes;
public CloudBlobStore(Settings settings, BlobStoreContext blobStoreContext, Executor executor, String container, String location) { public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, Executor executor) {
super(settings); super(settings);
this.blobStoreContext = blobStoreContext; this.client = client;
this.container = container; this.bucket = bucket;
this.region = region;
this.executor = executor; this.executor = executor;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
if (location == null) { if (!client.doesBucketExist(bucket)) {
this.location = null; if (region != null) {
} else { client.createBucket(bucket, region);
Location matchedLocation = null; } else {
Set<? extends Location> assignableLocations = blobStoreContext.getBlobStore().listAssignableLocations(); client.createBucket(bucket);
for (Location oLocation : assignableLocations) {
if (oLocation.getId().equals(location)) {
matchedLocation = oLocation;
break;
}
}
this.location = matchedLocation;
if (this.location == null) {
throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + assignableLocations);
} }
} }
logger.debug("Using location [{}], container [{}]", this.location, this.container);
sync().createContainerInLocation(this.location, container);
} }
@Override public String toString() { @Override public String toString() {
return container; return (region == null ? "" : region + "/") + bucket;
} }
public int bufferSizeInBytes() { public AmazonS3 client() {
return this.bufferSizeInBytes; return client;
}
public String bucket() {
return bucket;
} }
public Executor executor() { public Executor executor() {
return executor; return executor;
} }
public String container() { public int bufferSizeInBytes() {
return this.container; return bufferSizeInBytes;
}
public Location location() {
return this.location;
}
public AsyncBlobStore async() {
return blobStoreContext.getAsyncBlobStore();
}
public org.jclouds.blobstore.BlobStore sync() {
return blobStoreContext.getBlobStore();
} }
@Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { @Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new CloudImmutableBlobContainer(path, this); return new S3ImmutableBlobContainer(path, this);
} }
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) { @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
@ -116,7 +97,23 @@ public class CloudBlobStore extends AbstractComponent implements BlobStore {
} }
@Override public void delete(BlobPath path) { @Override public void delete(BlobPath path) {
sync().deleteDirectory(container, path.buildAsString("/")); ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucket, path.buildAsString("/"));
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
client.deleteObject(summary.getBucketName(), summary.getKey());
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
} }
@Override public void close() { @Override public void close() {

View File

@ -17,46 +17,39 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.cloud.blobstore; package org.elasticsearch.cloud.aws.blobstore;
import com.google.common.util.concurrent.ListenableFuture; import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores; import org.elasticsearch.common.blobstore.support.BlobStores;
import org.jclouds.blobstore.domain.Blob;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.ExecutionException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer implements ImmutableBlobContainer { public class S3ImmutableBlobContainer extends AbstarctS3BlobContainer implements ImmutableBlobContainer {
public CloudImmutableBlobContainer(BlobPath path, CloudBlobStore cloudBlobStore) { public S3ImmutableBlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path, cloudBlobStore); super(path, blobStore);
} }
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) { @Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
Blob blob = cloudBlobStore.async().newBlob(buildBlobPath(blobName)); blobStore.executor().execute(new Runnable() {
blob.setPayload(is);
blob.setContentLength(sizeInBytes);
final ListenableFuture<String> future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob);
future.addListener(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
future.get(); ObjectMetadata md = new ObjectMetadata();
md.setContentLength(sizeInBytes);
PutObjectResult objectResult = blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md);
listener.onCompleted(); listener.onCompleted();
} catch (InterruptedException e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} catch (ExecutionException e) {
listener.onFailure(e.getCause());
} catch (Throwable t) {
listener.onFailure(t);
} }
} }
}, cloudBlobStore.executor()); });
} }
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { @Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {

View File

@ -0,0 +1,140 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import java.util.List;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
public class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
private static enum HostType {
PRIVATE_IP,
PUBLIC_IP,
PRIVATE_DNS,
PUBLIC_DNS
}
private final AmazonEC2 client;
private final String ports;
private final ImmutableSet<String> groups;
private final ImmutableSet<String> availabilityZones;
private final HostType hostType;
@Inject public AwsEc2UnicastHostsProvider(Settings settings, AmazonEC2 client) {
super(settings);
this.client = client;
this.hostType = HostType.valueOf(componentSettings.get("host_type", "private_ip").toUpperCase());
this.ports = componentSettings.get("ports", "9300-9302");
Set<String> groups = Sets.newHashSet(componentSettings.getAsArray("groups"));
if (componentSettings.get("groups") != null) {
groups.addAll(Strings.commaDelimitedListToSet(componentSettings.get("groups")));
}
this.groups = ImmutableSet.copyOf(groups);
Set<String> availabilityZones = Sets.newHashSet(componentSettings.getAsArray("availability_zones"));
if (componentSettings.get("availability_zones") != null) {
availabilityZones.addAll(Strings.commaDelimitedListToSet(componentSettings.get("availability_zones")));
}
this.availabilityZones = ImmutableSet.copyOf(availabilityZones);
}
@Override public List<DiscoveryNode> buildDynamicNodes() {
List<DiscoveryNode> discoNodes = Lists.newArrayList();
DescribeInstancesResult descInstances = client.describeInstances(new DescribeInstancesRequest());
logger.trace("building dynamic unicast discovery nodes...");
for (Reservation reservation : descInstances.getReservations()) {
if (!groups.isEmpty()) {
// lets see if we can filter based on groups
boolean filter = false;
for (String group : reservation.getGroupNames()) {
if (!groups.contains(group)) {
logger.trace("filtering out reservation {} based on group {}, not part of {}", reservation.getReservationId(), group, groups);
filter = true;
break;
}
}
if (filter) {
// if we are filtering, continue to the next reservation
continue;
}
}
for (Instance instance : reservation.getInstances()) {
if (!availabilityZones.isEmpty()) {
if (!availabilityZones.contains(instance.getPlacement().getAvailabilityZone())) {
logger.trace("filtering out instance {} based on availability_zone {}, not part of {}", instance.getInstanceId(), instance.getPlacement().getAvailabilityZone(), availabilityZones);
continue;
}
}
InstanceState state = instance.getState();
if (state.getName().equalsIgnoreCase("pending") || state.getName().equalsIgnoreCase("running")) {
String address = null;
switch (hostType) {
case PRIVATE_DNS:
address = instance.getPrivateDnsName();
break;
case PRIVATE_IP:
address = instance.getPrivateIpAddress();
break;
case PUBLIC_DNS:
address = instance.getPublicDnsName();
break;
case PUBLIC_IP:
address = instance.getPublicDnsName();
break;
}
for (int port : new PortsRange(ports).ports()) {
logger.trace("adding {}, address {}", instance.getInstanceId(), address);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + port, new InetSocketTransportAddress(address, port)));
}
}
}
}
logger.debug("using dynamic discovery nodes {}", discoNodes);
return discoNodes;
}
}

View File

@ -17,9 +17,9 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.discovery.cloud; package org.elasticsearch.discovery.ec2;
import org.elasticsearch.cloud.compute.CloudComputeService; import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableList;
@ -35,10 +35,10 @@ import org.elasticsearch.transport.TransportService;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudDiscovery extends ZenDiscovery { public class Ec2Discovery extends ZenDiscovery {
@Inject public CloudDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, @Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPingService pingService, CloudComputeService computeService) { ClusterService clusterService, ZenPingService pingService, AwsEc2Service ec2Service) {
super(settings, clusterName, threadPool, transportService, clusterService, pingService); super(settings, clusterName, threadPool, transportService, clusterService, pingService);
if (settings.getAsBoolean("cloud.enabled", true)) { if (settings.getAsBoolean("cloud.enabled", true)) {
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings(); ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
@ -51,7 +51,7 @@ public class CloudDiscovery extends ZenDiscovery {
} }
// update the unicast zen ping to add cloud hosts provider // update the unicast zen ping to add cloud hosts provider
// and, while we are at it, use only it and not the multicast for example // and, while we are at it, use only it and not the multicast for example
unicastZenPing.addHostsProvider(new CloudUnicastHostsProvider(settings, computeService)); unicastZenPing.addHostsProvider(new AwsEc2UnicastHostsProvider(settings, ec2Service.client()));
pingService.zenPings(ImmutableList.of(unicastZenPing)); pingService.zenPings(ImmutableList.of(unicastZenPing));
} }
} }

View File

@ -17,19 +17,17 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.discovery.cloud; package org.elasticsearch.discovery.ec2;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.ZenDiscoveryModule;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudDiscoveryModule extends AbstractModule { public class Ec2DiscoveryModule extends ZenDiscoveryModule {
@Override protected void configure() { @Override protected void bindDiscovery() {
bind(ZenPingService.class).asEagerSingleton(); bind(Discovery.class).to(Ec2Discovery.class).asEagerSingleton();
bind(Discovery.class).to(CloudDiscovery.class).asEagerSingleton();
} }
} }

View File

@ -17,11 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.gateway.cloud; package org.elasticsearch.gateway.s3;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStore; import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
@ -29,7 +30,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule; import org.elasticsearch.index.gateway.s3.S3IndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
@ -37,26 +38,33 @@ import java.io.IOException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudGateway extends BlobStoreGateway { public class S3Gateway extends BlobStoreGateway {
@Inject public CloudGateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, CloudBlobStoreService blobStoreService) throws IOException { @Inject public S3Gateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException {
super(settings); super(settings);
String location = componentSettings.get("location"); String bucket = componentSettings.get("bucket");
String container = componentSettings.get("container"); if (bucket == null) {
if (container == null) { throw new ElasticSearchIllegalArgumentException("No bucket defined for s3 gateway");
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
} }
initialize(new CloudBlobStore(settings, blobStoreService.context(), threadPool.cached(), container, location), clusterName, new ByteSizeValue(100, ByteSizeUnit.MB)); String region = componentSettings.get("region");
ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB));
logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize);
initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, threadPool.cached()), clusterName, chunkSize);
}
@Override public void close() throws ElasticSearchException {
super.close();
} }
@Override public String type() { @Override public String type() {
return "cloud"; return "s3";
} }
@Override public Class<? extends Module> suggestIndexGateway() { @Override public Class<? extends Module> suggestIndexGateway() {
return CloudIndexGatewayModule.class; return S3IndexGatewayModule.class;
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.gateway.cloud; package org.elasticsearch.gateway.s3;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
@ -25,9 +25,9 @@ import org.elasticsearch.gateway.Gateway;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudGatewayModule extends AbstractModule { public class S3GatewayModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(Gateway.class).to(CloudGateway.class).asEagerSingleton(); bind(Gateway.class).to(S3Gateway.class).asEagerSingleton();
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.gateway.cloud; package org.elasticsearch.index.gateway.s3;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -30,17 +30,18 @@ import org.elasticsearch.index.settings.IndexSettings;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudIndexGateway extends BlobStoreIndexGateway { public class S3IndexGateway extends BlobStoreIndexGateway {
@Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) { @Inject public S3IndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) {
super(index, indexSettings, gateway); super(index, indexSettings, gateway);
} }
@Override public String type() { @Override public String type() {
return "cloud"; return "s3";
} }
@Override public Class<? extends IndexShardGateway> shardGatewayClass() { @Override public Class<? extends IndexShardGateway> shardGatewayClass() {
return CloudIndexShardGateway.class; return S3IndexShardGateway.class;
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.gateway.cloud; package org.elasticsearch.index.gateway.s3;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexGateway;
@ -25,9 +25,9 @@ import org.elasticsearch.index.gateway.IndexGateway;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudIndexGatewayModule extends AbstractModule { public class S3IndexGatewayModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(IndexGateway.class).to(CloudIndexGateway.class).asEagerSingleton(); bind(IndexGateway.class).to(S3IndexGateway.class).asEagerSingleton();
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.gateway.cloud; package org.elasticsearch.index.gateway.s3;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -33,14 +33,15 @@ import org.elasticsearch.threadpool.ThreadPool;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudIndexShardGateway extends BlobStoreIndexShardGateway { public class S3IndexShardGateway extends BlobStoreIndexShardGateway {
@Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway, @Inject public S3IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings, threadPool, indexGateway, indexShard, store, recoveryThrottler); super(shardId, indexSettings, threadPool, indexGateway, indexShard, store, recoveryThrottler);
} }
@Override public String type() { @Override public String type() {
return "cloud"; return "s3";
} }
} }

View File

@ -17,11 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.plugin.cloud; package org.elasticsearch.plugin.cloud.aws;
import org.elasticsearch.cloud.CloudModule; import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; import org.elasticsearch.cloud.aws.AwsModule;
import org.elasticsearch.cloud.compute.CloudComputeService; import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -29,40 +30,38 @@ import org.elasticsearch.plugins.AbstractPlugin;
import java.util.Collection; import java.util.Collection;
import static org.elasticsearch.common.collect.Lists.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class CloudPlugin extends AbstractPlugin { public class CloudAwsPlugin extends AbstractPlugin {
private final Settings settings; private final Settings settings;
public CloudPlugin(Settings settings) { public CloudAwsPlugin(Settings settings) {
this.settings = settings; this.settings = settings;
} }
@Override public String name() { @Override public String name() {
return "cloud"; return "cloud-aws";
} }
@Override public String description() { @Override public String description() {
return "Cloud plugin"; return "Cloud AWS Plugin";
} }
@Override public Collection<Class<? extends Module>> modules() { @Override public Collection<Class<? extends Module>> modules() {
Collection<Class<? extends Module>> modules = newArrayList(); Collection<Class<? extends Module>> modules = Lists.newArrayList();
if (settings.getAsBoolean("cloud.enabled", true)) { if (settings.getAsBoolean("cloud.enabled", true)) {
modules.add(CloudModule.class); modules.add(AwsModule.class);
} }
return modules; return modules;
} }
@Override public Collection<Class<? extends LifecycleComponent>> services() { @Override public Collection<Class<? extends LifecycleComponent>> services() {
Collection<Class<? extends LifecycleComponent>> services = newArrayList(); Collection<Class<? extends LifecycleComponent>> services = Lists.newArrayList();
if (settings.getAsBoolean("cloud.enabled", true)) { if (settings.getAsBoolean("cloud.enabled", true)) {
services.add(CloudComputeService.class); services.add(AwsS3Service.class);
services.add(CloudBlobStoreService.class); services.add(AwsEc2Service.class);
} }
return services; return services;
} }

View File

@ -1 +0,0 @@
plugin=org.elasticsearch.plugin.cloud.CloudPlugin

View File

@ -1,124 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.blobstore;
import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.options.ListContainerOptions;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
/**
* @author kimchy (shay.banon)
*/
public class AbstractCloudBlobContainer extends AbstractBlobContainer {
protected final CloudBlobStore cloudBlobStore;
protected final String cloudPath;
public AbstractCloudBlobContainer(BlobPath path, CloudBlobStore cloudBlobStore) {
super(path);
this.cloudBlobStore = cloudBlobStore;
this.cloudPath = path.buildAsString("/");
}
@Override public boolean deleteBlob(String blobName) throws IOException {
cloudBlobStore.sync().removeBlob(cloudBlobStore.container(), buildBlobPath(blobName));
return true;
}
@Override public boolean blobExists(String blobName) {
return cloudBlobStore.sync().blobExists(cloudBlobStore.container(), buildBlobPath(blobName));
}
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
final ListenableFuture<? extends Blob> future = cloudBlobStore.async().getBlob(cloudBlobStore.container(), buildBlobPath(blobName));
future.addListener(new Runnable() {
@Override public void run() {
Blob blob;
try {
blob = future.get();
if (blob == null) {
listener.onFailure(new BlobStoreException("No blob found for [" + buildBlobPath(blobName) + "]"));
return;
}
} catch (InterruptedException e) {
listener.onFailure(e);
return;
} catch (ExecutionException e) {
listener.onFailure(e.getCause());
return;
}
byte[] buffer = new byte[cloudBlobStore.bufferSizeInBytes()];
InputStream is = blob.getContent();
try {
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
listener.onCompleted();
} catch (Exception e) {
try {
is.close();
} catch (IOException e1) {
// ignore
}
listener.onFailure(e);
}
}
}, cloudBlobStore.executor());
}
// inDirectory expects a directory, not a blob prefix
// @Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
// PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(buildBlobPath(blobNamePrefix)));
// ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
// for (StorageMetadata storageMetadata : list) {
// String name = storageMetadata.getName().substring(cloudPath.length() + 1);
// blobs.put(name, new PlainBlobMetaData(name, storageMetadata.getSize(), null));
// }
// return blobs.build();
// }
@Override public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(cloudPath));
ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
for (StorageMetadata storageMetadata : list) {
String name = storageMetadata.getName().substring(cloudPath.length() + 1);
blobs.put(name, new PlainBlobMetaData(name, storageMetadata.getSize(), null));
}
return blobs.build();
}
protected String buildBlobPath(String blobName) {
return cloudPath + "/" + blobName;
}
}

View File

@ -1,101 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.blobstore;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class CloudBlobStoreService extends AbstractLifecycleComponent<CloudBlobStoreService> {
private final String type;
private final BlobStoreContext blobStoreContext;
@Inject public CloudBlobStoreService(Settings settings, SettingsFilter settingsFilter) throws IOException {
super(settings);
String type = componentSettings.get("type");
if (type == null) {
// see if we can get a global type
type = settings.get("cloud.type");
}
// consolidate names
if ("aws".equalsIgnoreCase(type) || "amazon".equalsIgnoreCase(type)) {
type = "s3";
} else if ("rackspace".equalsIgnoreCase(type)) {
type = "cloudfiles";
}
this.type = type;
String account = componentSettings.get("account", settings.get("cloud.account"));
String key = componentSettings.get("key", settings.get("cloud.key"));
if (type != null) {
blobStoreContext = new BlobStoreContextFactory().createContext(type, account, key, JCloudsUtils.buildModules(settings));
logger.info("Connected to {}/{} blob store service", type, account);
} else {
blobStoreContext = null;
}
settingsFilter.addFilter(new BlobStorSettingsFilter());
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
if (blobStoreContext != null) {
blobStoreContext.close();
}
}
public BlobStoreContext context() {
if (blobStoreContext == null) {
throw new ElasticSearchIllegalStateException("No cloud blobstore service started, have you configured the 'cloud.type' setting?");
}
return blobStoreContext;
}
private static class BlobStorSettingsFilter implements SettingsFilter.Filter {
@Override public void filter(ImmutableSettings.Builder settings) {
settings.remove("cloud.key");
settings.remove("cloud.account");
settings.remove("cloud.blobstore.key");
settings.remove("cloud.blobstore.account");
}
}
}

View File

@ -1,100 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.compute;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.compute.ComputeServiceContextFactory;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class CloudComputeService extends AbstractLifecycleComponent<CloudComputeService> {
private final String type;
private final ComputeServiceContext computeServiceContext;
@Inject public CloudComputeService(Settings settings, SettingsFilter filter) throws IOException {
super(settings);
String type = componentSettings.get("type");
if (type == null) {
// see if we can get a global type
type = settings.get("cloud.type");
}
// consolidate names
if ("aws".equalsIgnoreCase(type) || "amazon".equalsIgnoreCase(type)) {
type = "ec2";
} else if ("rackspace".equalsIgnoreCase(type)) {
type = "cloudservers";
}
this.type = type;
String account = componentSettings.get("account", settings.get("cloud.account"));
String key = componentSettings.get("key", settings.get("cloud.key"));
if (type != null) {
computeServiceContext = new ComputeServiceContextFactory().createContext(type, account, key, JCloudsUtils.buildModules(settings));
logger.info("Connected to {}/{} compute service", type, account);
} else {
computeServiceContext = null;
}
filter.addFilter(new ComputeSettingsFilter());
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
if (computeServiceContext != null) {
computeServiceContext.close();
}
}
public ComputeServiceContext context() {
if (computeServiceContext == null) {
throw new ElasticSearchIllegalStateException("No cloud compute service started, have you configured the 'cloud.type' setting?");
}
return this.computeServiceContext;
}
private static class ComputeSettingsFilter implements SettingsFilter.Filter {
@Override public void filter(ImmutableSettings.Builder settings) {
settings.remove("cloud.key");
settings.remove("cloud.account");
settings.remove("cloud.compute.key");
settings.remove("cloud.compute.account");
}
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.jclouds;
import com.google.inject.Module;
import org.elasticsearch.cloud.jclouds.logging.JCloudsLoggingModule;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import java.util.concurrent.Executors;
/**
* @author kimchy (shay.banon)
*/
public class JCloudsUtils {
public static Iterable<? extends Module> buildModules(Settings settings) {
return ImmutableList.of(new JCloudsLoggingModule(settings),
new ExecutorServiceModule(
Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-user")),
Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "jclouds-io"))));
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.jclouds.logging;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.jclouds.logging.BaseLogger;
import org.jclouds.logging.Logger;
import org.jclouds.logging.config.LoggingModule;
/**
* @author kimchy (shay.banon)
*/
public class JCloudsLoggingModule extends LoggingModule {
private final Settings settings;
public JCloudsLoggingModule(Settings settings) {
this.settings = settings;
}
@Override public Logger.LoggerFactory createLoggerFactory() {
return new Logger.LoggerFactory() {
@Override public Logger getLogger(String s) {
return new JCloudsESLogger(Loggers.getLogger(s.replace("org.jclouds", "cloud.jclouds"), settings));
}
};
}
private static class JCloudsESLogger extends BaseLogger {
private final ESLogger logger;
private JCloudsESLogger(ESLogger logger) {
this.logger = logger;
}
@Override protected void logError(String s, Throwable throwable) {
logger.error(s, throwable);
}
@Override protected void logError(String s) {
logger.error(s);
}
@Override protected void logWarn(String s, Throwable throwable) {
logger.warn(s, throwable);
}
@Override protected void logWarn(String s) {
logger.warn(s);
}
@Override protected void logInfo(String s) {
logger.info(s);
}
@Override protected void logDebug(String s) {
logger.debug(s);
}
@Override protected void logTrace(String s) {
logger.trace(s);
}
@Override public String getCategory() {
return logger.getName();
}
@Override public boolean isTraceEnabled() {
return logger.isTraceEnabled();
}
@Override public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
@Override public boolean isInfoEnabled() {
return logger.isInfoEnabled();
}
@Override public boolean isWarnEnabled() {
return logger.isWarnEnabled();
}
@Override public boolean isErrorEnabled() {
return logger.isErrorEnabled();
}
}
}

View File

@ -1,114 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.cloud;
import org.elasticsearch.cloud.compute.CloudComputeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.jclouds.compute.ComputeService;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeState;
import org.jclouds.domain.Location;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.common.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
public class CloudUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
private final ComputeService computeService;
private final String ports;
private final String tag;
private final String location;
public CloudUnicastHostsProvider(Settings settings, CloudComputeService computeService) {
super(settings);
this.computeService = computeService.context().getComputeService();
this.tag = componentSettings.get("tag");
this.location = componentSettings.get("location");
this.ports = componentSettings.get("ports", "9300-9302");
// parse the ports just to see that they are valid
new PortsRange(ports).ports();
}
@Override public List<DiscoveryNode> buildDynamicNodes() {
List<DiscoveryNode> discoNodes = newArrayList();
Set<? extends ComputeMetadata> nodes = computeService.listNodes();
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("Processing Nodes:");
for (ComputeMetadata node : nodes) {
sb.append("\n -> ").append(node);
}
logger.trace(sb.toString());
}
for (ComputeMetadata node : nodes) {
NodeMetadata nodeMetadata;
if (node instanceof NodeMetadata) {
nodeMetadata = (NodeMetadata) node;
} else {
nodeMetadata = computeService.getNodeMetadata(node.getId());
}
if (tag != null && !nodeMetadata.getTag().equals(tag)) {
logger.trace("Filtering node {} with unmatched tag {}", nodeMetadata.getName(), nodeMetadata.getTag());
continue;
}
boolean filteredByLocation = true;
if (location != null) {
Location nodeLocation = nodeMetadata.getLocation();
if (location.equals(nodeLocation.getId())) {
filteredByLocation = false;
} else {
if (nodeLocation.getParent() != null) {
if (location.equals(nodeLocation.getParent().getId())) {
filteredByLocation = false;
}
}
}
} else {
filteredByLocation = false;
}
if (filteredByLocation) {
logger.trace("Filtering node {} with unmatched location {}", nodeMetadata.getName(), nodeMetadata.getLocation());
continue;
}
if (nodeMetadata.getState() == NodeState.PENDING || nodeMetadata.getState() == NodeState.RUNNING) {
logger.debug("Adding {}, addresses {}", nodeMetadata.getName(), nodeMetadata.getPrivateAddresses());
for (String inetAddress : nodeMetadata.getPrivateAddresses()) {
for (int port : new PortsRange(ports).ports()) {
discoNodes.add(new DiscoveryNode("#cloud-" + inetAddress + "-" + port, new InetSocketTransportAddress(inetAddress, port)));
}
}
}
}
return discoNodes;
}
}

View File

@ -8,7 +8,7 @@ include 'test-integration'
include 'benchmark-micro' include 'benchmark-micro'
include 'plugins-cloud' include 'plugins-cloud-aws'
include 'plugins-hadoop' include 'plugins-hadoop'
include 'plugins-analysis-icu' include 'plugins-analysis-icu'
include 'plugins-mapper-attachments' include 'plugins-mapper-attachments'