diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 977350ca2e9..f34d210ca8f 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -47,6 +47,7 @@
lucene
memcached
metadata
+ metadatas
millis
mmap
multi
diff --git a/.idea/modules/plugins-cloud.iml b/.idea/modules/plugins-cloud.iml
index d3d504d4316..460fd5e5002 100644
--- a/.idea/modules/plugins-cloud.iml
+++ b/.idea/modules/plugins-cloud.iml
@@ -17,15 +17,19 @@
+
+
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java
index cbfca61bb08..b7182ca4545 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java
@@ -131,8 +131,57 @@ public class Directories {
}
copyFile(copyFrom, destinationFile);
} else {
- copyToDirectory(new FileInputStream(copyFrom), dir.createOutput(fileName));
+ FileInputStream is = null;
+ IndexOutput output = null;
+ try {
+ is = new FileInputStream(copyFrom);
+ output = dir.createOutput(fileName);
+ copyToDirectory(is, output);
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (output != null) {
+ try {
+ output.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
}
+ sync(dir, fileName);
+ }
+
+ public static void copyToDirectory(InputStream is, Directory dir, String fileName) throws IOException {
+ IndexOutput output = null;
+ try {
+ output = dir.createOutput(fileName);
+ copyToDirectory(is, output);
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ if (output != null) {
+ try {
+ output.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ sync(dir, fileName);
+ }
+
+ public static void sync(Directory dir, String fileName) throws IOException {
if (dir instanceof ForceSyncDirectory) {
((ForceSyncDirectory) dir).forceSync(fileName);
} else {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java
new file mode 100644
index 00000000000..25efc84bc79
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.util.lucene.store;
+
+import org.apache.lucene.store.IndexInput;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class InputStreamIndexInput extends InputStream {
+
+ private final IndexInput indexInput;
+
+ private final long limit;
+
+ private final long actualSizeToRead;
+
+ private long counter = 0;
+
+ public InputStreamIndexInput(IndexInput indexInput, long limit) {
+ this.indexInput = indexInput;
+ this.limit = limit;
+ if ((indexInput.length() - indexInput.getFilePointer()) > limit) {
+ actualSizeToRead = limit;
+ } else {
+ actualSizeToRead = indexInput.length() - indexInput.getFilePointer();
+ }
+ }
+
+ public long actualSizeToRead() {
+ return actualSizeToRead;
+ }
+
+ @Override public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (indexInput.getFilePointer() >= indexInput.length()) {
+ return -1;
+ }
+ if (indexInput.getFilePointer() + len > indexInput.length()) {
+ len = (int) (indexInput.length() - indexInput.getFilePointer());
+ }
+ if (counter + len > limit) {
+ len = (int) (limit - counter);
+ }
+ if (len <= 0) {
+ return -1;
+ }
+ indexInput.readBytes(b, off, len, false);
+ counter += len;
+ return len;
+ }
+
+ @Override public int read() throws IOException {
+ if (counter++ >= limit) {
+ return -1;
+ }
+ return (indexInput.getFilePointer() < indexInput.length()) ? (indexInput.readByte() & 0xff) : -1;
+ }
+}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java
new file mode 100644
index 00000000000..a9c82e82b57
--- /dev/null
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.util.lucene.store;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.MatcherAssert.*;
+import static org.hamcrest.Matchers.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class InputStreamIndexInputTests {
+
+ @Test public void testSingleReadSingleByteLimit() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexOutput output = dir.createOutput("test");
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 1);
+ }
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 2);
+ }
+
+ output.close();
+
+ IndexInput input = dir.openInput("test");
+
+ for (int i = 0; i < 3; i++) {
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ assertThat(is.actualSizeToRead(), equalTo(1l));
+ assertThat(is.read(), equalTo(1));
+ assertThat(is.read(), equalTo(-1));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ assertThat(is.actualSizeToRead(), equalTo(1l));
+ assertThat(is.read(), equalTo(2));
+ assertThat(is.read(), equalTo(-1));
+ }
+
+ assertThat(input.getFilePointer(), equalTo(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
+ assertThat(is.actualSizeToRead(), equalTo(0l));
+ assertThat(is.read(), equalTo(-1));
+ }
+
+ @Test public void testReadMultiSingleByteLimit1() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexOutput output = dir.createOutput("test");
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 1);
+ }
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 2);
+ }
+
+ output.close();
+
+ IndexInput input = dir.openInput("test");
+
+ byte[] read = new byte[2];
+
+ for (int i = 0; i < 3; i++) {
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
+ assertThat(is.actualSizeToRead(), equalTo(1l));
+ assertThat(is.read(read), equalTo(1));
+ assertThat(read[0], equalTo((byte) 1));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
+ assertThat(is.actualSizeToRead(), equalTo(1l));
+ assertThat(is.read(read), equalTo(1));
+ assertThat(read[0], equalTo((byte) 2));
+ }
+
+ assertThat(input.getFilePointer(), equalTo(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 1);
+ assertThat(is.actualSizeToRead(), equalTo(0l));
+ assertThat(is.read(read), equalTo(-1));
+ }
+
+ @Test public void testSingleReadTwoBytesLimit() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexOutput output = dir.createOutput("test");
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 1);
+ }
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 2);
+ }
+
+ output.close();
+
+ IndexInput input = dir.openInput("test");
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(), equalTo(1));
+ assertThat(is.read(), equalTo(1));
+ assertThat(is.read(), equalTo(-1));
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(), equalTo(1));
+ assertThat(is.read(), equalTo(2));
+ assertThat(is.read(), equalTo(-1));
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(), equalTo(2));
+ assertThat(is.read(), equalTo(2));
+ assertThat(is.read(), equalTo(-1));
+
+ assertThat(input.getFilePointer(), equalTo(input.length()));
+ is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(0l));
+ assertThat(is.read(), equalTo(-1));
+ }
+
+ @Test public void testReadMultiTwoBytesLimit1() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexOutput output = dir.createOutput("test");
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 1);
+ }
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 2);
+ }
+
+ output.close();
+
+ IndexInput input = dir.openInput("test");
+
+ byte[] read = new byte[2];
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(read), equalTo(2));
+ assertThat(read[0], equalTo((byte) 1));
+ assertThat(read[1], equalTo((byte) 1));
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(read), equalTo(2));
+ assertThat(read[0], equalTo((byte) 1));
+ assertThat(read[1], equalTo((byte) 2));
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(read), equalTo(2));
+ assertThat(read[0], equalTo((byte) 2));
+ assertThat(read[1], equalTo((byte) 2));
+
+ assertThat(input.getFilePointer(), equalTo(input.length()));
+ is = new InputStreamIndexInput(input, 2);
+ assertThat(is.actualSizeToRead(), equalTo(0l));
+ assertThat(is.read(read), equalTo(-1));
+ }
+
+ @Test public void testReadMultiFourBytesLimit() throws IOException {
+ RAMDirectory dir = new RAMDirectory();
+ IndexOutput output = dir.createOutput("test");
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 1);
+ }
+ for (int i = 0; i < 3; i++) {
+ output.writeByte((byte) 2);
+ }
+
+ output.close();
+
+ IndexInput input = dir.openInput("test");
+
+ byte[] read = new byte[4];
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ InputStreamIndexInput is = new InputStreamIndexInput(input, 4);
+ assertThat(is.actualSizeToRead(), equalTo(4l));
+ assertThat(is.read(read), equalTo(4));
+ assertThat(read[0], equalTo((byte) 1));
+ assertThat(read[1], equalTo((byte) 1));
+ assertThat(read[2], equalTo((byte) 1));
+ assertThat(read[3], equalTo((byte) 2));
+
+ assertThat(input.getFilePointer(), lessThan(input.length()));
+ is = new InputStreamIndexInput(input, 4);
+ assertThat(is.actualSizeToRead(), equalTo(2l));
+ assertThat(is.read(read), equalTo(2));
+ assertThat(read[0], equalTo((byte) 2));
+ assertThat(read[1], equalTo((byte) 2));
+
+ assertThat(input.getFilePointer(), equalTo(input.length()));
+ is = new InputStreamIndexInput(input, 4);
+ assertThat(is.actualSizeToRead(), equalTo(0l));
+ assertThat(is.read(read), equalTo(-1));
+ }
+}
diff --git a/plugins/cloud/build.gradle b/plugins/cloud/build.gradle
index b939d20ce95..7ae679504d2 100644
--- a/plugins/cloud/build.gradle
+++ b/plugins/cloud/build.gradle
@@ -34,17 +34,17 @@ repositories {
mavenRepo urls: "http://java-xmlbuilder.googlecode.com/svn/repo"
}
-jcloudsVersion = "1.0-beta-4"
+jcloudsVersion = "1.0-SNAPSHOT"
dependencies {
compile project(':elasticsearch')
- compile("org.jclouds:jclouds-httpnio:$jcloudsVersion")
compile("org.jclouds:jclouds-blobstore:$jcloudsVersion")
compile("org.jclouds:jclouds-aws:$jcloudsVersion")
+ compile("org.jclouds:jclouds-rackspace:$jcloudsVersion")
- distLib("org.jclouds:jclouds-httpnio:$jcloudsVersion")
distLib("org.jclouds:jclouds-blobstore:$jcloudsVersion")
distLib("org.jclouds:jclouds-aws:$jcloudsVersion")
+ distLib("org.jclouds:jclouds-rackspace:$jcloudsVersion")
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java
index ca2a95456b2..9899555c1fd 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java
@@ -60,7 +60,7 @@ public class CloudBlobStoreService extends AbstractLifecycleComponent buildModules(Settings settings) {
return ImmutableList.of(new JCloudsLoggingModule(settings));
}
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java b/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java
index 4cb76829c5c..7172a24acca 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java
@@ -32,6 +32,8 @@ import org.jclouds.compute.ComputeService;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeState;
+import org.jclouds.compute.options.GetNodesOptions;
+import org.jclouds.domain.Location;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -51,11 +53,14 @@ public class CloudZenPing extends UnicastZenPing {
private final String tag;
+ private final String location;
+
public CloudZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
CloudComputeService computeService) {
super(settings, threadPool, transportService, clusterName);
this.computeService = computeService.context().getComputeService();
this.tag = componentSettings.get("tag");
+ this.location = componentSettings.get("location");
this.ports = componentSettings.get("ports", "9300-9302");
// parse the ports just to see that they are valid
new PortsRange(ports).ports();
@@ -63,10 +68,31 @@ public class CloudZenPing extends UnicastZenPing {
@Override protected List buildDynamicNodes() {
List discoNodes = newArrayList();
- Map nodes = computeService.getNodes();
+ Map nodes = computeService.getNodes(GetNodesOptions.Builder.withDetails());
+ logger.trace("Processing Nodes {}", nodes);
for (Map.Entry node : nodes.entrySet()) {
- NodeMetadata nodeMetadata = computeService.getNodeMetadata(node.getValue());
+ NodeMetadata nodeMetadata = (NodeMetadata) node.getValue();
if (tag != null && !nodeMetadata.getTag().equals(tag)) {
+ logger.trace("Filtering node {} with unmatched tag {}", nodeMetadata.getName(), nodeMetadata.getTag());
+ continue;
+ }
+ boolean filteredByLocation = true;
+ if (location != null) {
+ Location nodeLocation = nodeMetadata.getLocation();
+ if (location.equals(nodeLocation.getId())) {
+ filteredByLocation = false;
+ } else {
+ if (nodeLocation.getParent() != null) {
+ if (location.equals(nodeLocation.getParent().getId())) {
+ filteredByLocation = false;
+ }
+ }
+ }
+ } else {
+ filteredByLocation = false;
+ }
+ if (filteredByLocation) {
+ logger.trace("Filtering node {} with unmatched location {}", nodeMetadata.getName(), nodeMetadata.getLocation());
continue;
}
if (nodeMetadata.getState() == NodeState.PENDING || nodeMetadata.getState() == NodeState.RUNNING) {
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java
index 2394d5b0be2..b39b7248499 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java
@@ -22,11 +22,15 @@ package org.elasticsearch.gateway.cloud;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
+import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
+import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule;
+import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
+import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.util.io.FastByteArrayInputStream;
import org.elasticsearch.util.settings.Settings;
@@ -39,6 +43,7 @@ import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.domain.Location;
import java.io.IOException;
@@ -47,35 +52,63 @@ import java.io.IOException;
*/
public class CloudGateway extends AbstractLifecycleComponent implements Gateway {
- private final ClusterName clusterName;
-
private final BlobStoreContext blobStoreContext;
private final String container;
- private final String location;
+ private final Location location;
+ private final SizeValue chunkSize;
+
+ private final String metadataContainer;
private volatile int currentIndex;
- public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) {
+ @Inject public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) {
super(settings);
- this.clusterName = clusterName;
this.blobStoreContext = blobStoreService.context();
+ this.chunkSize = componentSettings.getAsSize("chunk_size", null);
+
+ String location = componentSettings.get("location");
+ if (location == null) {
+ this.location = null;
+ } else {
+ this.location = blobStoreContext.getBlobStore().getAssignableLocations().get(location);
+ if (this.location == null) {
+ throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + blobStoreContext.getBlobStore().getAssignableLocations().keySet());
+ }
+ }
+
String container = componentSettings.get("container");
if (container == null) {
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
}
- this.location = componentSettings.get("location");
- this.container = container + "." + clusterName.value();
- blobStoreContext.getBlobStore().createContainerInLocation(location, container);
+ this.container = container + JCloudsUtils.BLOB_CONTAINER_SEP + clusterName.value();
+
+ this.metadataContainer = this.container + JCloudsUtils.BLOB_CONTAINER_SEP + "metadata";
+
+ logger.debug("Using location [{}], container [{}], metadata_container [{}]", this.location, this.container, metadataContainer);
+
+ blobStoreContext.getBlobStore().createContainerInLocation(this.location, metadataContainer);
this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]");
}
+ public String container() {
+ return this.container;
+ }
+
+ public Location location() {
+ return this.location;
+ }
+
+ public SizeValue chunkSize() {
+ return this.chunkSize;
+ }
+
@Override protected void doStart() throws ElasticSearchException {
}
@@ -99,14 +132,14 @@ public class CloudGateway extends AbstractLifecycleComponent implements
blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()));
blob.setContentLength(builder.unsafeBytesLength());
- blobStoreContext.getBlobStore().putBlob(container, blob);
+ blobStoreContext.getBlobStore().putBlob(metadataContainer, blob);
currentIndex++;
- PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-") && !name.equals(storageMetadata.getName())) {
- blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
+ blobStoreContext.getAsyncBlobStore().removeBlob(metadataContainer, storageMetadata.getName());
}
}
} catch (IOException e) {
@@ -128,14 +161,14 @@ public class CloudGateway extends AbstractLifecycleComponent implements
}
@Override public Class extends Module> suggestIndexGateway() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return CloudIndexGatewayModule.class;
}
@Override public void reset() {
- PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-")) {
- blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
+ blobStoreContext.getBlobStore().removeBlob(metadataContainer, storageMetadata.getName());
}
}
currentIndex = -1;
@@ -143,7 +176,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements
private int findLatestIndex() {
int index = -1;
- PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container);
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
for (StorageMetadata storageMetadata : pageSet) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]");
@@ -168,7 +201,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements
private MetaData readMetaData(String name) throws IOException {
XContentParser parser = null;
try {
- Blob blob = blobStoreContext.getBlobStore().getBlob(container, name);
+ Blob blob = blobStoreContext.getBlobStore().getBlob(metadataContainer, name);
parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent());
return MetaData.Builder.fromXContent(parser, settings);
} finally {
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java
new file mode 100644
index 00000000000..a89dac69b98
--- /dev/null
+++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.gateway.cloud;
+
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.ElasticSearchIllegalArgumentException;
+import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
+import org.elasticsearch.cloud.jclouds.JCloudsUtils;
+import org.elasticsearch.gateway.Gateway;
+import org.elasticsearch.gateway.cloud.CloudGateway;
+import org.elasticsearch.index.AbstractIndexComponent;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.gateway.IndexGateway;
+import org.elasticsearch.index.gateway.IndexShardGateway;
+import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.util.SizeUnit;
+import org.elasticsearch.util.SizeValue;
+import org.elasticsearch.util.guice.inject.Inject;
+import org.elasticsearch.util.settings.Settings;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.domain.Location;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CloudIndexGateway extends AbstractIndexComponent implements IndexGateway {
+
+ private final Gateway gateway;
+
+ private final String indexContainer;
+
+ private final Location location;
+
+ private final SizeValue chunkSize;
+
+ private final BlobStoreContext blobStoreContext;
+
+
+ @Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, CloudBlobStoreService blobStoreService, Gateway gateway) {
+ super(index, indexSettings);
+ this.blobStoreContext = blobStoreService.context();
+ this.gateway = gateway;
+
+ String location = componentSettings.get("location");
+ String container = componentSettings.get("container");
+ SizeValue chunkSize = componentSettings.getAsSize("chunk_size", null);
+
+ if (gateway instanceof CloudGateway) {
+ CloudGateway cloudGateway = (CloudGateway) gateway;
+ if (container == null) {
+ container = cloudGateway.container() + JCloudsUtils.BLOB_CONTAINER_SEP + index.name();
+ }
+ if (chunkSize == null) {
+ chunkSize = cloudGateway.chunkSize();
+ }
+ }
+
+ if (chunkSize == null) {
+ chunkSize = new SizeValue(4, SizeUnit.GB);
+ }
+
+ if (location == null) {
+ if (gateway instanceof CloudGateway) {
+ CloudGateway cloudGateway = (CloudGateway) gateway;
+ this.location = cloudGateway.location();
+ } else {
+ this.location = null;
+ }
+ } else {
+ this.location = blobStoreContext.getBlobStore().getAssignableLocations().get(location);
+ if (this.location == null) {
+ throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + blobStoreContext.getBlobStore().getAssignableLocations().keySet());
+ }
+ }
+ this.indexContainer = container;
+ this.chunkSize = chunkSize;
+
+ logger.debug("Using location [{}], container [{}], chunk_size [{}]", this.location, this.indexContainer, this.chunkSize);
+
+// blobStoreContext.getBlobStore().createContainerInLocation(this.location, this.indexContainer);
+ }
+
+ public Location indexLocation() {
+ return this.location;
+ }
+
+ public String indexContainer() {
+ return this.indexContainer;
+ }
+
+ public SizeValue chunkSize() {
+ return this.chunkSize;
+ }
+
+ @Override public Class extends IndexShardGateway> shardGatewayClass() {
+ return CloudIndexShardGateway.class;
+ }
+
+ @Override public void close(boolean delete) throws ElasticSearchException {
+ if (!delete) {
+ return;
+ }
+// blobStoreContext.getBlobStore().deleteContainer(indexContainer);
+ }
+}
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java
new file mode 100644
index 00000000000..77b3a732120
--- /dev/null
+++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.gateway.cloud;
+
+import org.elasticsearch.index.gateway.IndexGateway;
+import org.elasticsearch.util.guice.inject.AbstractModule;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CloudIndexGatewayModule extends AbstractModule {
+
+ @Override protected void configure() {
+ bind(IndexGateway.class).to(CloudIndexGateway.class).asEagerSingleton();
+ }
+}
\ No newline at end of file
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java
new file mode 100644
index 00000000000..e9cc3c30b5a
--- /dev/null
+++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.gateway.cloud;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
+import org.elasticsearch.cloud.jclouds.JCloudsUtils;
+import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
+import org.elasticsearch.index.gateway.IndexShardGateway;
+import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
+import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
+import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.service.IndexShard;
+import org.elasticsearch.index.shard.service.InternalIndexShard;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.util.SizeUnit;
+import org.elasticsearch.util.SizeValue;
+import org.elasticsearch.util.TimeValue;
+import org.elasticsearch.util.gcommon.collect.Lists;
+import org.elasticsearch.util.gcommon.collect.Maps;
+import org.elasticsearch.util.guice.inject.Inject;
+import org.elasticsearch.util.io.FastByteArrayInputStream;
+import org.elasticsearch.util.io.stream.BytesStreamOutput;
+import org.elasticsearch.util.io.stream.InputStreamStreamInput;
+import org.elasticsearch.util.lucene.Directories;
+import org.elasticsearch.util.lucene.store.InputStreamIndexInput;
+import org.elasticsearch.util.settings.Settings;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.blobstore.options.ListContainerOptions;
+import org.jclouds.domain.Location;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.index.translog.TranslogStreams.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CloudIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
+
+ private final InternalIndexShard indexShard;
+
+ private final ThreadPool threadPool;
+
+ private final Store store;
+
+ private final Location shardLocation;
+
+ private final String shardContainer;
+
+ private final String shardIndexContainer;
+
+ private final String shardTranslogContainer;
+
+ private final BlobStoreContext blobStoreContext;
+
+ private final SizeValue chunkSize;
+
+ private volatile int currentTranslogPartToWrite = 1;
+
+ @Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, ThreadPool threadPool,
+ Store store, CloudIndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
+ super(shardId, indexSettings);
+ this.indexShard = (InternalIndexShard) indexShard;
+ this.threadPool = threadPool;
+ this.store = store;
+ this.blobStoreContext = blobStoreService.context();
+
+ this.chunkSize = cloudIndexGateway.chunkSize();
+ this.shardLocation = cloudIndexGateway.indexLocation();
+ this.shardContainer = cloudIndexGateway.indexContainer() + JCloudsUtils.BLOB_CONTAINER_SEP + shardId.id();
+
+ this.shardIndexContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "index";
+ this.shardTranslogContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "translog";
+
+ logger.trace("Using location [{}], container [{}]", this.shardLocation, this.shardContainer);
+
+ blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardTranslogContainer);
+ blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardIndexContainer);
+ }
+
+ @Override public boolean requiresSnapshotScheduling() {
+ return true;
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder("cloud[");
+ if (shardLocation != null) {
+ sb.append(shardLocation).append("/");
+ }
+ sb.append(shardContainer).append("]");
+ return sb.toString();
+ }
+
+ @Override public void close(boolean delete) {
+ if (!delete) {
+ return;
+ }
+ blobStoreContext.getBlobStore().deleteContainer(shardIndexContainer);
+ blobStoreContext.getBlobStore().deleteContainer(shardTranslogContainer);
+ }
+
+ @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
+ RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
+ RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
+ return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
+ }
+
+ @Override public SnapshotStatus snapshot(Snapshot snapshot) {
+ long totalTimeStart = System.currentTimeMillis();
+ boolean indexDirty = false;
+
+ final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
+ final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
+
+ Map allIndicesMetadata = null;
+
+ int indexNumberOfFiles = 0;
+ long indexTotalFilesSize = 0;
+ long indexTime = 0;
+ if (snapshot.indexChanged()) {
+ long time = System.currentTimeMillis();
+ indexDirty = true;
+ allIndicesMetadata = listAllMetadatas(shardIndexContainer);
+ final Map allIndicesMetadataF = allIndicesMetadata;
+ // snapshot into the index
+ final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
+ final AtomicReference lastException = new AtomicReference();
+ for (final String fileName : snapshotIndexCommit.getFiles()) {
+ // don't copy over the segments file, it will be copied over later on as part of the
+ // final snapshot phase
+ if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
+ latch.countDown();
+ continue;
+ }
+ IndexInput indexInput = null;
+ try {
+ indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
+ StorageMetadata metadata = allIndicesMetadata.get(fileName);
+ if (metadata != null && (metadata.getSize() == indexInput.length())) {
+ // we assume its the same one, no need to copy
+ latch.countDown();
+ continue;
+ }
+ } catch (Exception e) {
+ logger.debug("Failed to verify file equality based on length, copying...", e);
+ } finally {
+ if (indexInput != null) {
+ try {
+ indexInput.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ indexNumberOfFiles++;
+ try {
+ indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
+ } catch (IOException e) {
+ // ignore...
+ }
+ deleteFile(fileName, allIndicesMetadata);
+ threadPool.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, allIndicesMetadataF);
+ } catch (Exception e) {
+ lastException.set(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ lastException.set(e);
+ }
+ if (lastException.get() != null) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
+ }
+ indexTime = System.currentTimeMillis() - time;
+ }
+
+ int translogNumberOfOperations = 0;
+ long translogTime = 0;
+ if (snapshot.newTranslogCreated()) {
+ currentTranslogPartToWrite = 1;
+ String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
+
+ try {
+ long time = System.currentTimeMillis();
+
+ BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached();
+ streamOutput.writeInt(translogSnapshot.size());
+ for (Translog.Operation operation : translogSnapshot) {
+ translogNumberOfOperations++;
+ writeTranslogOperation(streamOutput, operation);
+ }
+
+ Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
+ blob.setContentLength(streamOutput.size());
+ blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
+ blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob);
+
+ currentTranslogPartToWrite++;
+
+ translogTime = System.currentTimeMillis() - time;
+ } catch (Exception e) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e);
+ }
+ } else if (snapshot.sameTranslogNewOperations()) {
+ String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
+ try {
+ long time = System.currentTimeMillis();
+
+ BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached();
+ streamOutput.writeInt(translogSnapshot.size() - snapshot.lastTranslogSize());
+ for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
+ translogNumberOfOperations++;
+ writeTranslogOperation(streamOutput, operation);
+ }
+
+ Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
+ blob.setContentLength(streamOutput.size());
+ blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
+ blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob);
+
+ currentTranslogPartToWrite++;
+
+ translogTime = System.currentTimeMillis() - time;
+ } catch (Exception e) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogBlobName + "]", e);
+ }
+ }
+
+ // now write the segments file
+ try {
+ if (indexDirty) {
+ indexNumberOfFiles++;
+ deleteFile(snapshotIndexCommit.getSegmentsFileName(), allIndicesMetadata);
+ indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
+ long time = System.currentTimeMillis();
+
+
+ IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName());
+ try {
+ Blob blob = blobStoreContext.getBlobStore().newBlob(snapshotIndexCommit.getSegmentsFileName());
+ InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE);
+ blob.setPayload(is);
+ blob.setContentLength(is.actualSizeToRead());
+ blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
+ } finally {
+ try {
+ indexInput.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ indexTime += (System.currentTimeMillis() - time);
+ }
+ } catch (Exception e) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e);
+ }
+
+ // delete the old translog
+ if (snapshot.newTranslogCreated()) {
+ String currentTranslogPrefix = String.valueOf(translogSnapshot.translogId()) + ".";
+ Map allMetadatas = listAllMetadatas(shardTranslogContainer);
+ for (Map.Entry entry : allMetadatas.entrySet()) {
+ if (!entry.getKey().startsWith(currentTranslogPrefix)) {
+ blobStoreContext.getAsyncBlobStore().removeBlob(shardTranslogContainer, entry.getKey());
+ }
+ }
+ }
+
+ if (indexDirty) {
+ for (Map.Entry entry : allIndicesMetadata.entrySet()) {
+ String blobName = entry.getKey();
+ if (blobName.contains(".part")) {
+ blobName = blobName.substring(0, blobName.indexOf(".part"));
+ }
+ boolean found = false;
+ for (final String fileName : snapshotIndexCommit.getFiles()) {
+ if (blobName.equals(fileName)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ blobStoreContext.getAsyncBlobStore().removeBlob(shardIndexContainer, entry.getKey());
+ }
+ }
+ }
+
+ return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
+ new SnapshotStatus.Index(indexNumberOfFiles, new SizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
+ new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
+ }
+
+ private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
+ final Map allMetaDatas = listAllMetadatas(shardIndexContainer);
+
+ // filter out to only have actual files
+ final Map filesMetaDatas = Maps.newHashMap();
+ for (Map.Entry entry : allMetaDatas.entrySet()) {
+ if (entry.getKey().contains(".part")) {
+ continue;
+ }
+ filesMetaDatas.put(entry.getKey(), entry.getValue());
+ }
+
+ final CountDownLatch latch = new CountDownLatch(filesMetaDatas.size());
+ final AtomicReference lastException = new AtomicReference();
+ for (final Map.Entry entry : filesMetaDatas.entrySet()) {
+ threadPool.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ copyToDirectory(entry.getValue(), allMetaDatas);
+ } catch (Exception e) {
+ logger.debug("Failed to read [" + entry.getKey() + "] into [" + store + "]", e);
+ lastException.set(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ lastException.set(e);
+ }
+
+ long totalSize = 0;
+ for (Map.Entry entry : allMetaDatas.entrySet()) {
+ totalSize += entry.getValue().getSize();
+ }
+
+ long version = -1;
+ try {
+ if (IndexReader.indexExists(store.directory())) {
+ version = IndexReader.getCurrentVersion(store.directory());
+ }
+ } catch (IOException e) {
+ throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
+ }
+
+ return new RecoveryStatus.Index(version, filesMetaDatas.size(), new SizeValue(totalSize, SizeUnit.BYTES));
+ }
+
+ private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
+ final Map allMetaDatas = listAllMetadatas(shardTranslogContainer);
+
+ long latestTranslogId = -1;
+ for (String name : allMetaDatas.keySet()) {
+ long translogId = Long.parseLong(name.substring(0, name.indexOf('.')));
+ if (translogId > latestTranslogId) {
+ latestTranslogId = translogId;
+ }
+ }
+
+ if (latestTranslogId == -1) {
+ // no recovery file found, start the shard and bail
+ indexShard.start();
+ return new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES));
+ }
+
+
+ try {
+ ArrayList operations = Lists.newArrayList();
+
+ long size = 0;
+ int index = 1;
+ while (true) {
+ String translogPartName = String.valueOf(latestTranslogId) + "." + index;
+ if (!allMetaDatas.containsKey(translogPartName)) {
+ break;
+ }
+ Blob blob = blobStoreContext.getBlobStore().getBlob(shardTranslogContainer, translogPartName);
+ if (blob == null) {
+ break;
+ }
+ size += blob.getContentLength();
+ InputStreamStreamInput streamInput = new InputStreamStreamInput(blob.getContent());
+ int numberOfOperations = streamInput.readInt();
+ for (int i = 0; i < numberOfOperations; i++) {
+ operations.add(readTranslogOperation(streamInput));
+ }
+ index++;
+ }
+ currentTranslogPartToWrite = index;
+
+ indexShard.performRecovery(operations);
+ return new RecoveryStatus.Translog(latestTranslogId, operations.size(), new SizeValue(size, SizeUnit.BYTES));
+ } catch (Exception e) {
+ throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
+ }
+ }
+
+ private Map listAllMetadatas(String container) {
+ final Map allMetaDatas = Maps.newHashMap();
+
+ String nextMarker = null;
+ while (true) {
+ ListContainerOptions options = ListContainerOptions.Builder.maxResults(10000);
+ if (nextMarker != null) {
+ options.afterMarker(nextMarker);
+ }
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, options);
+ for (StorageMetadata metadata : pageSet) {
+ allMetaDatas.put(metadata.getName(), metadata);
+ }
+ nextMarker = pageSet.getNextMarker();
+ if (nextMarker == null) {
+ break;
+ }
+ }
+ return allMetaDatas;
+ }
+
+ private void deleteFile(String fileName, Map allIndicesMetadata) {
+ // first, check and delete all files with this name
+ for (Map.Entry entry : allIndicesMetadata.entrySet()) {
+ String blobName = entry.getKey();
+ if (blobName.contains(".part")) {
+ blobName = blobName.substring(0, blobName.indexOf(".part"));
+ }
+ if (blobName.equals(fileName)) {
+ blobStoreContext.getBlobStore().removeBlob(shardIndexContainer, blobName);
+ }
+ }
+ }
+
+
+ private void copyFromDirectory(Directory dir, String fileName, Map allIndicesMetadata) throws IOException {
+ IndexInput indexInput = dir.openInput(fileName);
+
+ try {
+ Blob blob = blobStoreContext.getBlobStore().newBlob(fileName);
+ InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
+ blob.setPayload(is);
+ blob.setContentLength(is.actualSizeToRead());
+ blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
+
+ int part = 1;
+ while (indexInput.getFilePointer() < indexInput.length()) {
+ is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
+ if (is.actualSizeToRead() <= 0) {
+ break;
+ }
+ blob = blobStoreContext.getBlobStore().newBlob(fileName + ".part" + part);
+ blob.setPayload(is);
+ blob.setContentLength(is.actualSizeToRead());
+ blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
+ part++;
+ }
+ } finally {
+ try {
+ indexInput.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ private void copyToDirectory(StorageMetadata metadata, Map allMetadatas) throws IOException {
+ Blob blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, metadata.getName());
+
+ byte[] buffer = new byte[16384];
+ IndexOutput indexOutput = store.directory().createOutput(metadata.getName());
+
+ copy(blob.getContent(), indexOutput, buffer);
+ blob.getContent().close();
+
+ // check the metadatas we have
+ int part = 1;
+ while (true) {
+ String partName = metadata.getName() + ".part" + part;
+ if (!allMetadatas.containsKey(partName)) {
+ break;
+ }
+ blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, partName);
+ copy(blob.getContent(), indexOutput, buffer);
+ blob.getContent().close();
+ part++;
+ }
+
+ indexOutput.close();
+
+ Directories.sync(store.directory(), metadata.getName());
+ }
+
+ private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException {
+ int len;
+ while ((len = is.read(buffer)) != -1) {
+ indexOutput.writeBytes(buffer, len);
+ }
+ }
+}