cloud gateway implemenation working against s3

This commit is contained in:
kimchy 2010-05-04 14:02:36 +03:00
parent aae4e4ab61
commit 7d9f55309e
14 changed files with 1150 additions and 32 deletions

View File

@ -47,6 +47,7 @@
<w>lucene</w>
<w>memcached</w>
<w>metadata</w>
<w>metadatas</w>
<w>millis</w>
<w>mmap</w>
<w>multi</w>

View File

@ -17,15 +17,19 @@
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/com.google.code.gson/gson/jars/gson-1.4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/com.google.code.guice/guice/jars/guice-2.1-r1128.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-rackspace/jars/jclouds-rackspace-1.0-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-compute/jars/jclouds-compute-1.0-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-blobstore/jars/jclouds-blobstore-1.0-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-scriptbuilder/jars/jclouds-scriptbuilder-1.0-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-core/jars/jclouds-core-1.0-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-aws/jars/jclouds-aws-1.0-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/com.jamesmurty.utils/java-xmlbuilder/jars/java-xmlbuilder-0.3.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-compute/jars/jclouds-compute-1.0-beta-4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-aws/jars/jclouds-aws-1.0-beta-4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-scriptbuilder/jars/jclouds-scriptbuilder-1.0-beta-4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-blobstore/jars/jclouds-blobstore-1.0-beta-4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-core/jars/jclouds-core-1.0-beta-4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.apache.httpcomponents/httpcore/jars/httpcore-4.1-alpha1.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.apache.httpcomponents/httpcore-nio/jars/httpcore-nio-4.1-alpha1.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-httpnio/jars/jclouds-httpnio-1.0-beta-4.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/com.google.guava/guava/jars/guava-r03.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/javax.annotation/jsr250-api/jars/jsr250-api-1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/javax.inject/inject/jars/inject-1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/aopalliance/aopalliance/jars/aopalliance-1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/resteasy-jaxrs-client/jars/resteasy-jaxrs-client-1.2.1.GA-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/jaxrs-api/jars/jaxrs-api-1.2.1.GA.jar!/" />
</CLASSES>
<JAVADOC>
<root url="http://jclouds.rimuhosting.com/apidocs/" />

View File

@ -131,8 +131,57 @@ public class Directories {
}
copyFile(copyFrom, destinationFile);
} else {
copyToDirectory(new FileInputStream(copyFrom), dir.createOutput(fileName));
FileInputStream is = null;
IndexOutput output = null;
try {
is = new FileInputStream(copyFrom);
output = dir.createOutput(fileName);
copyToDirectory(is, output);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
// ignore
}
}
if (output != null) {
try {
output.close();
} catch (IOException e) {
// ignore
}
}
}
}
sync(dir, fileName);
}
public static void copyToDirectory(InputStream is, Directory dir, String fileName) throws IOException {
IndexOutput output = null;
try {
output = dir.createOutput(fileName);
copyToDirectory(is, output);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
// ignore
}
}
if (output != null) {
try {
output.close();
} catch (IOException e) {
// ignore
}
}
}
sync(dir, fileName);
}
public static void sync(Directory dir, String fileName) throws IOException {
if (dir instanceof ForceSyncDirectory) {
((ForceSyncDirectory) dir).forceSync(fileName);
} else {

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.lucene.store;
import org.apache.lucene.store.IndexInput;
import java.io.IOException;
import java.io.InputStream;
/**
* @author kimchy (shay.banon)
*/
public class InputStreamIndexInput extends InputStream {
private final IndexInput indexInput;
private final long limit;
private final long actualSizeToRead;
private long counter = 0;
public InputStreamIndexInput(IndexInput indexInput, long limit) {
this.indexInput = indexInput;
this.limit = limit;
if ((indexInput.length() - indexInput.getFilePointer()) > limit) {
actualSizeToRead = limit;
} else {
actualSizeToRead = indexInput.length() - indexInput.getFilePointer();
}
}
public long actualSizeToRead() {
return actualSizeToRead;
}
@Override public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (indexInput.getFilePointer() >= indexInput.length()) {
return -1;
}
if (indexInput.getFilePointer() + len > indexInput.length()) {
len = (int) (indexInput.length() - indexInput.getFilePointer());
}
if (counter + len > limit) {
len = (int) (limit - counter);
}
if (len <= 0) {
return -1;
}
indexInput.readBytes(b, off, len, false);
counter += len;
return len;
}
@Override public int read() throws IOException {
if (counter++ >= limit) {
return -1;
}
return (indexInput.getFilePointer() < indexInput.length()) ? (indexInput.readByte() & 0xff) : -1;
}
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.lucene.store;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class InputStreamIndexInputTests {
@Test public void testSingleReadSingleByteLimit() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexOutput output = dir.createOutput("test");
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 1);
}
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 2);
}
output.close();
IndexInput input = dir.openInput("test");
for (int i = 0; i < 3; i++) {
InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
assertThat(input.getFilePointer(), lessThan(input.length()));
assertThat(is.actualSizeToRead(), equalTo(1l));
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(-1));
}
for (int i = 0; i < 3; i++) {
InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
assertThat(input.getFilePointer(), lessThan(input.length()));
assertThat(is.actualSizeToRead(), equalTo(1l));
assertThat(is.read(), equalTo(2));
assertThat(is.read(), equalTo(-1));
}
assertThat(input.getFilePointer(), equalTo(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
assertThat(is.actualSizeToRead(), equalTo(0l));
assertThat(is.read(), equalTo(-1));
}
@Test public void testReadMultiSingleByteLimit1() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexOutput output = dir.createOutput("test");
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 1);
}
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 2);
}
output.close();
IndexInput input = dir.openInput("test");
byte[] read = new byte[2];
for (int i = 0; i < 3; i++) {
assertThat(input.getFilePointer(), lessThan(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
assertThat(is.actualSizeToRead(), equalTo(1l));
assertThat(is.read(read), equalTo(1));
assertThat(read[0], equalTo((byte) 1));
}
for (int i = 0; i < 3; i++) {
assertThat(input.getFilePointer(), lessThan(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
assertThat(is.actualSizeToRead(), equalTo(1l));
assertThat(is.read(read), equalTo(1));
assertThat(read[0], equalTo((byte) 2));
}
assertThat(input.getFilePointer(), equalTo(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
assertThat(is.actualSizeToRead(), equalTo(0l));
assertThat(is.read(read), equalTo(-1));
}
@Test public void testSingleReadTwoBytesLimit() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexOutput output = dir.createOutput("test");
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 1);
}
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 2);
}
output.close();
IndexInput input = dir.openInput("test");
assertThat(input.getFilePointer(), lessThan(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(-1));
assertThat(input.getFilePointer(), lessThan(input.length()));
is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(2));
assertThat(is.read(), equalTo(-1));
assertThat(input.getFilePointer(), lessThan(input.length()));
is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(), equalTo(2));
assertThat(is.read(), equalTo(2));
assertThat(is.read(), equalTo(-1));
assertThat(input.getFilePointer(), equalTo(input.length()));
is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(0l));
assertThat(is.read(), equalTo(-1));
}
@Test public void testReadMultiTwoBytesLimit1() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexOutput output = dir.createOutput("test");
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 1);
}
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 2);
}
output.close();
IndexInput input = dir.openInput("test");
byte[] read = new byte[2];
assertThat(input.getFilePointer(), lessThan(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(read), equalTo(2));
assertThat(read[0], equalTo((byte) 1));
assertThat(read[1], equalTo((byte) 1));
assertThat(input.getFilePointer(), lessThan(input.length()));
is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(read), equalTo(2));
assertThat(read[0], equalTo((byte) 1));
assertThat(read[1], equalTo((byte) 2));
assertThat(input.getFilePointer(), lessThan(input.length()));
is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(read), equalTo(2));
assertThat(read[0], equalTo((byte) 2));
assertThat(read[1], equalTo((byte) 2));
assertThat(input.getFilePointer(), equalTo(input.length()));
is = new InputStreamIndexInput(input, 2);
assertThat(is.actualSizeToRead(), equalTo(0l));
assertThat(is.read(read), equalTo(-1));
}
@Test public void testReadMultiFourBytesLimit() throws IOException {
RAMDirectory dir = new RAMDirectory();
IndexOutput output = dir.createOutput("test");
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 1);
}
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 2);
}
output.close();
IndexInput input = dir.openInput("test");
byte[] read = new byte[4];
assertThat(input.getFilePointer(), lessThan(input.length()));
InputStreamIndexInput is = new InputStreamIndexInput(input, 4);
assertThat(is.actualSizeToRead(), equalTo(4l));
assertThat(is.read(read), equalTo(4));
assertThat(read[0], equalTo((byte) 1));
assertThat(read[1], equalTo((byte) 1));
assertThat(read[2], equalTo((byte) 1));
assertThat(read[3], equalTo((byte) 2));
assertThat(input.getFilePointer(), lessThan(input.length()));
is = new InputStreamIndexInput(input, 4);
assertThat(is.actualSizeToRead(), equalTo(2l));
assertThat(is.read(read), equalTo(2));
assertThat(read[0], equalTo((byte) 2));
assertThat(read[1], equalTo((byte) 2));
assertThat(input.getFilePointer(), equalTo(input.length()));
is = new InputStreamIndexInput(input, 4);
assertThat(is.actualSizeToRead(), equalTo(0l));
assertThat(is.read(read), equalTo(-1));
}
}

View File

@ -34,17 +34,17 @@ repositories {
mavenRepo urls: "http://java-xmlbuilder.googlecode.com/svn/repo"
}
jcloudsVersion = "1.0-beta-4"
jcloudsVersion = "1.0-SNAPSHOT"
dependencies {
compile project(':elasticsearch')
compile("org.jclouds:jclouds-httpnio:$jcloudsVersion")
compile("org.jclouds:jclouds-blobstore:$jcloudsVersion")
compile("org.jclouds:jclouds-aws:$jcloudsVersion")
compile("org.jclouds:jclouds-rackspace:$jcloudsVersion")
distLib("org.jclouds:jclouds-httpnio:$jcloudsVersion")
distLib("org.jclouds:jclouds-blobstore:$jcloudsVersion")
distLib("org.jclouds:jclouds-aws:$jcloudsVersion")
distLib("org.jclouds:jclouds-rackspace:$jcloudsVersion")
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }

View File

@ -60,7 +60,7 @@ public class CloudBlobStoreService extends AbstractLifecycleComponent<CloudBlobS
if (type != null) {
blobStoreContext = new BlobStoreContextFactory().createContext(type, account, key, JCloudsUtils.buildModules(settings));
logger.info("Connected to [{}] blob store service");
logger.info("Connected to {}/{} blob store service", type, account);
} else {
blobStoreContext = null;
}

View File

@ -60,7 +60,7 @@ public class CloudComputeService extends AbstractLifecycleComponent<CloudCompute
if (type != null) {
computeServiceContext = new ComputeServiceContextFactory().createContext(type, account, key, JCloudsUtils.buildModules(settings));
logger.info("Connected to [{}] compute service");
logger.info("Connected to {}/{} compute service", type, account);
} else {
computeServiceContext = null;
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.util.settings.Settings;
*/
public class JCloudsUtils {
public static final String BLOB_CONTAINER_SEP = "-";
public static Iterable<? extends Module> buildModules(Settings settings) {
return ImmutableList.of(new JCloudsLoggingModule(settings));
}

View File

@ -32,6 +32,8 @@ import org.jclouds.compute.ComputeService;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeState;
import org.jclouds.compute.options.GetNodesOptions;
import org.jclouds.domain.Location;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -51,11 +53,14 @@ public class CloudZenPing extends UnicastZenPing {
private final String tag;
private final String location;
public CloudZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
CloudComputeService computeService) {
super(settings, threadPool, transportService, clusterName);
this.computeService = computeService.context().getComputeService();
this.tag = componentSettings.get("tag");
this.location = componentSettings.get("location");
this.ports = componentSettings.get("ports", "9300-9302");
// parse the ports just to see that they are valid
new PortsRange(ports).ports();
@ -63,10 +68,31 @@ public class CloudZenPing extends UnicastZenPing {
@Override protected List<DiscoveryNode> buildDynamicNodes() {
List<DiscoveryNode> discoNodes = newArrayList();
Map<String, ? extends ComputeMetadata> nodes = computeService.getNodes();
Map<String, ? extends ComputeMetadata> nodes = computeService.getNodes(GetNodesOptions.Builder.withDetails());
logger.trace("Processing Nodes {}", nodes);
for (Map.Entry<String, ? extends ComputeMetadata> node : nodes.entrySet()) {
NodeMetadata nodeMetadata = computeService.getNodeMetadata(node.getValue());
NodeMetadata nodeMetadata = (NodeMetadata) node.getValue();
if (tag != null && !nodeMetadata.getTag().equals(tag)) {
logger.trace("Filtering node {} with unmatched tag {}", nodeMetadata.getName(), nodeMetadata.getTag());
continue;
}
boolean filteredByLocation = true;
if (location != null) {
Location nodeLocation = nodeMetadata.getLocation();
if (location.equals(nodeLocation.getId())) {
filteredByLocation = false;
} else {
if (nodeLocation.getParent() != null) {
if (location.equals(nodeLocation.getParent().getId())) {
filteredByLocation = false;
}
}
}
} else {
filteredByLocation = false;
}
if (filteredByLocation) {
logger.trace("Filtering node {} with unmatched location {}", nodeMetadata.getName(), nodeMetadata.getLocation());
continue;
}
if (nodeMetadata.getState() == NodeState.PENDING || nodeMetadata.getState() == NodeState.RUNNING) {

View File

@ -22,11 +22,15 @@ package org.elasticsearch.gateway.cloud;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.util.io.FastByteArrayInputStream;
import org.elasticsearch.util.settings.Settings;
@ -39,6 +43,7 @@ import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.domain.Location;
import java.io.IOException;
@ -47,35 +52,63 @@ import java.io.IOException;
*/
public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
private final ClusterName clusterName;
private final BlobStoreContext blobStoreContext;
private final String container;
private final String location;
private final Location location;
private final SizeValue chunkSize;
private final String metadataContainer;
private volatile int currentIndex;
public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) {
@Inject public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) {
super(settings);
this.clusterName = clusterName;
this.blobStoreContext = blobStoreService.context();
this.chunkSize = componentSettings.getAsSize("chunk_size", null);
String location = componentSettings.get("location");
if (location == null) {
this.location = null;
} else {
this.location = blobStoreContext.getBlobStore().getAssignableLocations().get(location);
if (this.location == null) {
throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + blobStoreContext.getBlobStore().getAssignableLocations().keySet());
}
}
String container = componentSettings.get("container");
if (container == null) {
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
}
this.location = componentSettings.get("location");
this.container = container + "." + clusterName.value();
blobStoreContext.getBlobStore().createContainerInLocation(location, container);
this.container = container + JCloudsUtils.BLOB_CONTAINER_SEP + clusterName.value();
this.metadataContainer = this.container + JCloudsUtils.BLOB_CONTAINER_SEP + "metadata";
logger.debug("Using location [{}], container [{}], metadata_container [{}]", this.location, this.container, metadataContainer);
blobStoreContext.getBlobStore().createContainerInLocation(this.location, metadataContainer);
this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]");
}
public String container() {
return this.container;
}
public Location location() {
return this.location;
}
public SizeValue chunkSize() {
return this.chunkSize;
}
@Override protected void doStart() throws ElasticSearchException {
}
@ -99,14 +132,14 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()));
blob.setContentLength(builder.unsafeBytesLength());
blobStoreContext.getBlobStore().putBlob(container, blob);
blobStoreContext.getBlobStore().putBlob(metadataContainer, blob);
currentIndex++;
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-") && !name.equals(storageMetadata.getName())) {
blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
blobStoreContext.getAsyncBlobStore().removeBlob(metadataContainer, storageMetadata.getName());
}
}
} catch (IOException e) {
@ -128,14 +161,14 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public Class<? extends Module> suggestIndexGateway() {
return null; //To change body of implemented methods use File | Settings | File Templates.
return CloudIndexGatewayModule.class;
}
@Override public void reset() {
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-")) {
blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
blobStoreContext.getBlobStore().removeBlob(metadataContainer, storageMetadata.getName());
}
}
currentIndex = -1;
@ -143,7 +176,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private int findLatestIndex() {
int index = -1;
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
for (StorageMetadata storageMetadata : pageSet) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]");
@ -168,7 +201,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private MetaData readMetaData(String name) throws IOException {
XContentParser parser = null;
try {
Blob blob = blobStoreContext.getBlobStore().getBlob(container, name);
Blob blob = blobStoreContext.getBlobStore().getBlob(metadataContainer, name);
parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent());
return MetaData.Builder.fromXContent(parser, settings);
} finally {

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.cloud;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.cloud.CloudGateway;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.domain.Location;
/**
* @author kimchy (shay.banon)
*/
public class CloudIndexGateway extends AbstractIndexComponent implements IndexGateway {
private final Gateway gateway;
private final String indexContainer;
private final Location location;
private final SizeValue chunkSize;
private final BlobStoreContext blobStoreContext;
@Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, CloudBlobStoreService blobStoreService, Gateway gateway) {
super(index, indexSettings);
this.blobStoreContext = blobStoreService.context();
this.gateway = gateway;
String location = componentSettings.get("location");
String container = componentSettings.get("container");
SizeValue chunkSize = componentSettings.getAsSize("chunk_size", null);
if (gateway instanceof CloudGateway) {
CloudGateway cloudGateway = (CloudGateway) gateway;
if (container == null) {
container = cloudGateway.container() + JCloudsUtils.BLOB_CONTAINER_SEP + index.name();
}
if (chunkSize == null) {
chunkSize = cloudGateway.chunkSize();
}
}
if (chunkSize == null) {
chunkSize = new SizeValue(4, SizeUnit.GB);
}
if (location == null) {
if (gateway instanceof CloudGateway) {
CloudGateway cloudGateway = (CloudGateway) gateway;
this.location = cloudGateway.location();
} else {
this.location = null;
}
} else {
this.location = blobStoreContext.getBlobStore().getAssignableLocations().get(location);
if (this.location == null) {
throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + blobStoreContext.getBlobStore().getAssignableLocations().keySet());
}
}
this.indexContainer = container;
this.chunkSize = chunkSize;
logger.debug("Using location [{}], container [{}], chunk_size [{}]", this.location, this.indexContainer, this.chunkSize);
// blobStoreContext.getBlobStore().createContainerInLocation(this.location, this.indexContainer);
}
public Location indexLocation() {
return this.location;
}
public String indexContainer() {
return this.indexContainer;
}
public SizeValue chunkSize() {
return this.chunkSize;
}
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
return CloudIndexShardGateway.class;
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (!delete) {
return;
}
// blobStoreContext.getBlobStore().deleteContainer(indexContainer);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.cloud;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.util.guice.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class CloudIndexGatewayModule extends AbstractModule {
@Override protected void configure() {
bind(IndexGateway.class).to(CloudIndexGateway.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,533 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.cloud;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.gcommon.collect.Lists;
import org.elasticsearch.util.gcommon.collect.Maps;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.io.FastByteArrayInputStream;
import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.InputStreamStreamInput;
import org.elasticsearch.util.lucene.Directories;
import org.elasticsearch.util.lucene.store.InputStreamIndexInput;
import org.elasticsearch.util.settings.Settings;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.translog.TranslogStreams.*;
/**
* @author kimchy (shay.banon)
*/
public class CloudIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private final InternalIndexShard indexShard;
private final ThreadPool threadPool;
private final Store store;
private final Location shardLocation;
private final String shardContainer;
private final String shardIndexContainer;
private final String shardTranslogContainer;
private final BlobStoreContext blobStoreContext;
private final SizeValue chunkSize;
private volatile int currentTranslogPartToWrite = 1;
@Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, ThreadPool threadPool,
Store store, CloudIndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
this.threadPool = threadPool;
this.store = store;
this.blobStoreContext = blobStoreService.context();
this.chunkSize = cloudIndexGateway.chunkSize();
this.shardLocation = cloudIndexGateway.indexLocation();
this.shardContainer = cloudIndexGateway.indexContainer() + JCloudsUtils.BLOB_CONTAINER_SEP + shardId.id();
this.shardIndexContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "index";
this.shardTranslogContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "translog";
logger.trace("Using location [{}], container [{}]", this.shardLocation, this.shardContainer);
blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardTranslogContainer);
blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardIndexContainer);
}
@Override public boolean requiresSnapshotScheduling() {
return true;
}
@Override public String toString() {
StringBuilder sb = new StringBuilder("cloud[");
if (shardLocation != null) {
sb.append(shardLocation).append("/");
}
sb.append(shardContainer).append("]");
return sb.toString();
}
@Override public void close(boolean delete) {
if (!delete) {
return;
}
blobStoreContext.getBlobStore().deleteContainer(shardIndexContainer);
blobStoreContext.getBlobStore().deleteContainer(shardTranslogContainer);
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
}
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
long totalTimeStart = System.currentTimeMillis();
boolean indexDirty = false;
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
Map<String, StorageMetadata> allIndicesMetadata = null;
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
long indexTime = 0;
if (snapshot.indexChanged()) {
long time = System.currentTimeMillis();
indexDirty = true;
allIndicesMetadata = listAllMetadatas(shardIndexContainer);
final Map<String, StorageMetadata> allIndicesMetadataF = allIndicesMetadata;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String fileName : snapshotIndexCommit.getFiles()) {
// don't copy over the segments file, it will be copied over later on as part of the
// final snapshot phase
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
latch.countDown();
continue;
}
IndexInput indexInput = null;
try {
indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
StorageMetadata metadata = allIndicesMetadata.get(fileName);
if (metadata != null && (metadata.getSize() == indexInput.length())) {
// we assume its the same one, no need to copy
latch.countDown();
continue;
}
} catch (Exception e) {
logger.debug("Failed to verify file equality based on length, copying...", e);
} finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
}
indexNumberOfFiles++;
try {
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
} catch (IOException e) {
// ignore...
}
deleteFile(fileName, allIndicesMetadata);
threadPool.execute(new Runnable() {
@Override public void run() {
try {
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, allIndicesMetadataF);
} catch (Exception e) {
lastException.set(e);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
}
if (lastException.get() != null) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
}
indexTime = System.currentTimeMillis() - time;
}
int translogNumberOfOperations = 0;
long translogTime = 0;
if (snapshot.newTranslogCreated()) {
currentTranslogPartToWrite = 1;
String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
try {
long time = System.currentTimeMillis();
BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached();
streamOutput.writeInt(translogSnapshot.size());
for (Translog.Operation operation : translogSnapshot) {
translogNumberOfOperations++;
writeTranslogOperation(streamOutput, operation);
}
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob);
currentTranslogPartToWrite++;
translogTime = System.currentTimeMillis() - time;
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e);
}
} else if (snapshot.sameTranslogNewOperations()) {
String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
try {
long time = System.currentTimeMillis();
BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached();
streamOutput.writeInt(translogSnapshot.size() - snapshot.lastTranslogSize());
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
translogNumberOfOperations++;
writeTranslogOperation(streamOutput, operation);
}
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob);
currentTranslogPartToWrite++;
translogTime = System.currentTimeMillis() - time;
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogBlobName + "]", e);
}
}
// now write the segments file
try {
if (indexDirty) {
indexNumberOfFiles++;
deleteFile(snapshotIndexCommit.getSegmentsFileName(), allIndicesMetadata);
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
long time = System.currentTimeMillis();
IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName());
try {
Blob blob = blobStoreContext.getBlobStore().newBlob(snapshotIndexCommit.getSegmentsFileName());
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE);
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
} finally {
try {
indexInput.close();
} catch (Exception e) {
// ignore
}
}
indexTime += (System.currentTimeMillis() - time);
}
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e);
}
// delete the old translog
if (snapshot.newTranslogCreated()) {
String currentTranslogPrefix = String.valueOf(translogSnapshot.translogId()) + ".";
Map<String, StorageMetadata> allMetadatas = listAllMetadatas(shardTranslogContainer);
for (Map.Entry<String, StorageMetadata> entry : allMetadatas.entrySet()) {
if (!entry.getKey().startsWith(currentTranslogPrefix)) {
blobStoreContext.getAsyncBlobStore().removeBlob(shardTranslogContainer, entry.getKey());
}
}
}
if (indexDirty) {
for (Map.Entry<String, StorageMetadata> entry : allIndicesMetadata.entrySet()) {
String blobName = entry.getKey();
if (blobName.contains(".part")) {
blobName = blobName.substring(0, blobName.indexOf(".part"));
}
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
if (blobName.equals(fileName)) {
found = true;
break;
}
}
if (!found) {
blobStoreContext.getAsyncBlobStore().removeBlob(shardIndexContainer, entry.getKey());
}
}
}
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
new SnapshotStatus.Index(indexNumberOfFiles, new SizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
}
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(shardIndexContainer);
// filter out to only have actual files
final Map<String, StorageMetadata> filesMetaDatas = Maps.newHashMap();
for (Map.Entry<String, StorageMetadata> entry : allMetaDatas.entrySet()) {
if (entry.getKey().contains(".part")) {
continue;
}
filesMetaDatas.put(entry.getKey(), entry.getValue());
}
final CountDownLatch latch = new CountDownLatch(filesMetaDatas.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final Map.Entry<String, StorageMetadata> entry : filesMetaDatas.entrySet()) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
copyToDirectory(entry.getValue(), allMetaDatas);
} catch (Exception e) {
logger.debug("Failed to read [" + entry.getKey() + "] into [" + store + "]", e);
lastException.set(e);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
}
long totalSize = 0;
for (Map.Entry<String, StorageMetadata> entry : allMetaDatas.entrySet()) {
totalSize += entry.getValue().getSize();
}
long version = -1;
try {
if (IndexReader.indexExists(store.directory())) {
version = IndexReader.getCurrentVersion(store.directory());
}
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, filesMetaDatas.size(), new SizeValue(totalSize, SizeUnit.BYTES));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(shardTranslogContainer);
long latestTranslogId = -1;
for (String name : allMetaDatas.keySet()) {
long translogId = Long.parseLong(name.substring(0, name.indexOf('.')));
if (translogId > latestTranslogId) {
latestTranslogId = translogId;
}
}
if (latestTranslogId == -1) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES));
}
try {
ArrayList<Translog.Operation> operations = Lists.newArrayList();
long size = 0;
int index = 1;
while (true) {
String translogPartName = String.valueOf(latestTranslogId) + "." + index;
if (!allMetaDatas.containsKey(translogPartName)) {
break;
}
Blob blob = blobStoreContext.getBlobStore().getBlob(shardTranslogContainer, translogPartName);
if (blob == null) {
break;
}
size += blob.getContentLength();
InputStreamStreamInput streamInput = new InputStreamStreamInput(blob.getContent());
int numberOfOperations = streamInput.readInt();
for (int i = 0; i < numberOfOperations; i++) {
operations.add(readTranslogOperation(streamInput));
}
index++;
}
currentTranslogPartToWrite = index;
indexShard.performRecovery(operations);
return new RecoveryStatus.Translog(latestTranslogId, operations.size(), new SizeValue(size, SizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
}
}
private Map<String, StorageMetadata> listAllMetadatas(String container) {
final Map<String, StorageMetadata> allMetaDatas = Maps.newHashMap();
String nextMarker = null;
while (true) {
ListContainerOptions options = ListContainerOptions.Builder.maxResults(10000);
if (nextMarker != null) {
options.afterMarker(nextMarker);
}
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, options);
for (StorageMetadata metadata : pageSet) {
allMetaDatas.put(metadata.getName(), metadata);
}
nextMarker = pageSet.getNextMarker();
if (nextMarker == null) {
break;
}
}
return allMetaDatas;
}
private void deleteFile(String fileName, Map<String, StorageMetadata> allIndicesMetadata) {
// first, check and delete all files with this name
for (Map.Entry<String, StorageMetadata> entry : allIndicesMetadata.entrySet()) {
String blobName = entry.getKey();
if (blobName.contains(".part")) {
blobName = blobName.substring(0, blobName.indexOf(".part"));
}
if (blobName.equals(fileName)) {
blobStoreContext.getBlobStore().removeBlob(shardIndexContainer, blobName);
}
}
}
private void copyFromDirectory(Directory dir, String fileName, Map<String, StorageMetadata> allIndicesMetadata) throws IOException {
IndexInput indexInput = dir.openInput(fileName);
try {
Blob blob = blobStoreContext.getBlobStore().newBlob(fileName);
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
int part = 1;
while (indexInput.getFilePointer() < indexInput.length()) {
is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
if (is.actualSizeToRead() <= 0) {
break;
}
blob = blobStoreContext.getBlobStore().newBlob(fileName + ".part" + part);
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
part++;
}
} finally {
try {
indexInput.close();
} catch (Exception e) {
// ignore
}
}
}
private void copyToDirectory(StorageMetadata metadata, Map<String, StorageMetadata> allMetadatas) throws IOException {
Blob blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, metadata.getName());
byte[] buffer = new byte[16384];
IndexOutput indexOutput = store.directory().createOutput(metadata.getName());
copy(blob.getContent(), indexOutput, buffer);
blob.getContent().close();
// check the metadatas we have
int part = 1;
while (true) {
String partName = metadata.getName() + ".part" + part;
if (!allMetadatas.containsKey(partName)) {
break;
}
blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, partName);
copy(blob.getContent(), indexOutput, buffer);
blob.getContent().close();
part++;
}
indexOutput.close();
Directories.sync(store.directory(), metadata.getName());
}
private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException {
int len;
while ((len = is.read(buffer)) != -1) {
indexOutput.writeBytes(buffer, len);
}
}
}