more refactoring and proper exception when using cloud plugin

This commit is contained in:
kimchy 2010-05-02 23:33:57 +03:00
parent b81e3de85e
commit bac0ef98c4
6 changed files with 63 additions and 22 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.blobstore;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.guice.inject.Inject;
@ -78,6 +79,9 @@ public class CloudBlobStoreService extends AbstractLifecycleComponent<CloudBlobS
}
public BlobStoreContext context() {
if (blobStoreContext == null) {
throw new ElasticSearchIllegalStateException("No cloud blobstore service started, have you configured the 'cloud.type' setting?");
}
return blobStoreContext;
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.compute;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.guice.inject.Inject;
@ -78,6 +79,9 @@ public class CloudComputeService extends AbstractLifecycleComponent<CloudCompute
}
public ComputeServiceContext context() {
if (computeServiceContext == null) {
throw new ElasticSearchIllegalStateException("No cloud compute service started, have you configured the 'cloud.type' setting?");
}
return this.computeServiceContext;
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.gcommon.collect.ImmutableList;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
/**
@ -34,8 +35,8 @@ import org.elasticsearch.util.settings.Settings;
*/
public class CloudDiscovery extends ZenDiscovery {
public CloudDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPingService pingService, CloudComputeService computeService) {
@Inject public CloudDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPingService pingService, CloudComputeService computeService) {
super(settings, clusterName, threadPool, transportService, clusterService, pingService);
if (settings.getAsBoolean("cloud.enabled", true)) {
pingService.zenPings(ImmutableList.of(new CloudZenPing(settings, threadPool, transportService, clusterName, computeService)));

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.cloud;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.util.guice.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class CloudDiscoveryModule extends AbstractModule {
@Override protected void configure() {
bind(ZenPingService.class).asEagerSingleton();
bind(Discovery.class).to(CloudDiscovery.class).asEagerSingleton();
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.InetSocketTransportAddress;
import org.elasticsearch.util.transport.PortsRange;
import org.jclouds.compute.ComputeService;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeState;
@ -44,7 +45,7 @@ import static org.elasticsearch.util.gcommon.collect.Lists.*;
*/
public class CloudZenPing extends UnicastZenPing {
private final CloudComputeService computeService;
private final ComputeService computeService;
private final String ports;
@ -53,7 +54,7 @@ public class CloudZenPing extends UnicastZenPing {
public CloudZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
CloudComputeService computeService) {
super(settings, threadPool, transportService, clusterName);
this.computeService = computeService;
this.computeService = computeService.context().getComputeService();
this.tag = componentSettings.get("tag");
this.ports = componentSettings.get("ports", "9300-9302");
// parse the ports just to see that they are valid
@ -62,9 +63,9 @@ public class CloudZenPing extends UnicastZenPing {
@Override protected List<DiscoveryNode> buildDynamicNodes() {
List<DiscoveryNode> discoNodes = newArrayList();
Map<String, ? extends ComputeMetadata> nodes = computeService.context().getComputeService().getNodes();
Map<String, ? extends ComputeMetadata> nodes = computeService.getNodes();
for (Map.Entry<String, ? extends ComputeMetadata> node : nodes.entrySet()) {
NodeMetadata nodeMetadata = computeService.context().getComputeService().getNodeMetadata(node.getValue());
NodeMetadata nodeMetadata = computeService.getNodeMetadata(node.getValue());
if (tag != null && !nodeMetadata.getTag().equals(tag)) {
continue;
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.gateway.cloud;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData;
@ -36,6 +35,7 @@ import org.elasticsearch.util.xcontent.XContentFactory;
import org.elasticsearch.util.xcontent.XContentParser;
import org.elasticsearch.util.xcontent.XContentType;
import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
@ -49,7 +49,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private final ClusterName clusterName;
private final CloudBlobStoreService blobStoreService;
private final BlobStoreContext blobStoreContext;
private final String container;
@ -62,7 +62,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) {
super(settings);
this.clusterName = clusterName;
this.blobStoreService = blobStoreService;
this.blobStoreContext = blobStoreService.context();
String container = componentSettings.get("container");
if (container == null) {
@ -70,11 +70,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
}
this.location = componentSettings.get("location");
this.container = container + "." + clusterName.value();
blobStoreService.context().getBlobStore().createContainerInLocation(location, container);
if (blobStoreService.context() == null) {
throw new ElasticSearchIllegalStateException("No cloud setting is configure");
}
blobStoreContext.getBlobStore().createContainerInLocation(location, container);
this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]");
@ -99,18 +95,18 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
Blob blob = blobStoreService.context().getBlobStore().newBlob(name);
Blob blob = blobStoreContext.getBlobStore().newBlob(name);
blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()));
blob.setContentLength(builder.unsafeBytesLength());
blobStoreService.context().getBlobStore().putBlob(container, blob);
blobStoreContext.getBlobStore().putBlob(container, blob);
currentIndex++;
PageSet<? extends StorageMetadata> pageSet = blobStoreService.context().getBlobStore().list(container);
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-") && !name.equals(storageMetadata.getName())) {
blobStoreService.context().getBlobStore().removeBlob(container, storageMetadata.getName());
blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
}
}
} catch (IOException e) {
@ -136,10 +132,10 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public void reset() {
PageSet<? extends StorageMetadata> pageSet = blobStoreService.context().getBlobStore().list(container);
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-")) {
blobStoreService.context().getBlobStore().removeBlob(container, storageMetadata.getName());
blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
}
}
currentIndex = -1;
@ -147,7 +143,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private int findLatestIndex() {
int index = -1;
PageSet<? extends StorageMetadata> pageSet = blobStoreService.context().getBlobStore().list(container);
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
for (StorageMetadata storageMetadata : pageSet) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]");
@ -172,7 +168,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private MetaData readMetaData(String name) throws IOException {
XContentParser parser = null;
try {
Blob blob = blobStoreService.context().getBlobStore().getBlob(container, name);
Blob blob = blobStoreContext.getBlobStore().getBlob(container, name);
parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent());
return MetaData.Builder.fromXContent(parser, settings);
} finally {