From 7d9f55309e4296001bd90ef381f8091d07381b84 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 4 May 2010 14:02:36 +0300 Subject: [PATCH] cloud gateway implemenation working against s3 --- .idea/dictionaries/kimchy.xml | 1 + .idea/modules/plugins-cloud.iml | 20 +- .../util/lucene/Directories.java | 51 +- .../lucene/store/InputStreamIndexInput.java | 83 +++ .../store/InputStreamIndexInputTests.java | 232 ++++++++ plugins/cloud/build.gradle | 6 +- .../blobstore/CloudBlobStoreService.java | 2 +- .../cloud/compute/CloudComputeService.java | 2 +- .../cloud/jclouds/JCloudsUtils.java | 2 + .../discovery/cloud/CloudZenPing.java | 30 +- .../gateway/cloud/CloudGateway.java | 65 ++- .../gateway/cloud/CloudIndexGateway.java | 122 ++++ .../cloud/CloudIndexGatewayModule.java | 33 ++ .../gateway/cloud/CloudIndexShardGateway.java | 533 ++++++++++++++++++ 14 files changed, 1150 insertions(+), 32 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java create mode 100644 plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java create mode 100644 plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java create mode 100644 plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml index 977350ca2e9..f34d210ca8f 100644 --- a/.idea/dictionaries/kimchy.xml +++ b/.idea/dictionaries/kimchy.xml @@ -47,6 +47,7 @@ lucene memcached metadata + metadatas millis mmap multi diff --git a/.idea/modules/plugins-cloud.iml b/.idea/modules/plugins-cloud.iml index d3d504d4316..460fd5e5002 100644 --- a/.idea/modules/plugins-cloud.iml +++ b/.idea/modules/plugins-cloud.iml @@ -17,15 +17,19 @@ + + + + + + - - - - - - - - + + + + + + diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java index cbfca61bb08..b7182ca4545 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/Directories.java @@ -131,8 +131,57 @@ public class Directories { } copyFile(copyFrom, destinationFile); } else { - copyToDirectory(new FileInputStream(copyFrom), dir.createOutput(fileName)); + FileInputStream is = null; + IndexOutput output = null; + try { + is = new FileInputStream(copyFrom); + output = dir.createOutput(fileName); + copyToDirectory(is, output); + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + // ignore + } + } + if (output != null) { + try { + output.close(); + } catch (IOException e) { + // ignore + } + } + } } + sync(dir, fileName); + } + + public static void copyToDirectory(InputStream is, Directory dir, String fileName) throws IOException { + IndexOutput output = null; + try { + output = dir.createOutput(fileName); + copyToDirectory(is, output); + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + // ignore + } + } + if (output != null) { + try { + output.close(); + } catch (IOException e) { + // ignore + } + } + } + sync(dir, fileName); + } + + public static void sync(Directory dir, String fileName) throws IOException { if (dir instanceof ForceSyncDirectory) { ((ForceSyncDirectory) dir).forceSync(fileName); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java new file mode 100644 index 00000000000..25efc84bc79 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/lucene/store/InputStreamIndexInput.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.lucene.store; + +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; +import java.io.InputStream; + +/** + * @author kimchy (shay.banon) + */ +public class InputStreamIndexInput extends InputStream { + + private final IndexInput indexInput; + + private final long limit; + + private final long actualSizeToRead; + + private long counter = 0; + + public InputStreamIndexInput(IndexInput indexInput, long limit) { + this.indexInput = indexInput; + this.limit = limit; + if ((indexInput.length() - indexInput.getFilePointer()) > limit) { + actualSizeToRead = limit; + } else { + actualSizeToRead = indexInput.length() - indexInput.getFilePointer(); + } + } + + public long actualSizeToRead() { + return actualSizeToRead; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (indexInput.getFilePointer() >= indexInput.length()) { + return -1; + } + if (indexInput.getFilePointer() + len > indexInput.length()) { + len = (int) (indexInput.length() - indexInput.getFilePointer()); + } + if (counter + len > limit) { + len = (int) (limit - counter); + } + if (len <= 0) { + return -1; + } + indexInput.readBytes(b, off, len, false); + counter += len; + return len; + } + + @Override public int read() throws IOException { + if (counter++ >= limit) { + return -1; + } + return (indexInput.getFilePointer() < indexInput.length()) ? (indexInput.readByte() & 0xff) : -1; + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java new file mode 100644 index 00000000000..a9c82e82b57 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/util/lucene/store/InputStreamIndexInputTests.java @@ -0,0 +1,232 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.lucene.store; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RAMDirectory; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class InputStreamIndexInputTests { + + @Test public void testSingleReadSingleByteLimit() throws IOException { + RAMDirectory dir = new RAMDirectory(); + IndexOutput output = dir.createOutput("test"); + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 1); + } + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 2); + } + + output.close(); + + IndexInput input = dir.openInput("test"); + + for (int i = 0; i < 3; i++) { + InputStreamIndexInput is = new InputStreamIndexInput(input, 1); + assertThat(input.getFilePointer(), lessThan(input.length())); + assertThat(is.actualSizeToRead(), equalTo(1l)); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(-1)); + } + + for (int i = 0; i < 3; i++) { + InputStreamIndexInput is = new InputStreamIndexInput(input, 1); + assertThat(input.getFilePointer(), lessThan(input.length())); + assertThat(is.actualSizeToRead(), equalTo(1l)); + assertThat(is.read(), equalTo(2)); + assertThat(is.read(), equalTo(-1)); + } + + assertThat(input.getFilePointer(), equalTo(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 1); + assertThat(is.actualSizeToRead(), equalTo(0l)); + assertThat(is.read(), equalTo(-1)); + } + + @Test public void testReadMultiSingleByteLimit1() throws IOException { + RAMDirectory dir = new RAMDirectory(); + IndexOutput output = dir.createOutput("test"); + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 1); + } + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 2); + } + + output.close(); + + IndexInput input = dir.openInput("test"); + + byte[] read = new byte[2]; + + for (int i = 0; i < 3; i++) { + assertThat(input.getFilePointer(), lessThan(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 1); + assertThat(is.actualSizeToRead(), equalTo(1l)); + assertThat(is.read(read), equalTo(1)); + assertThat(read[0], equalTo((byte) 1)); + } + + for (int i = 0; i < 3; i++) { + assertThat(input.getFilePointer(), lessThan(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 1); + assertThat(is.actualSizeToRead(), equalTo(1l)); + assertThat(is.read(read), equalTo(1)); + assertThat(read[0], equalTo((byte) 2)); + } + + assertThat(input.getFilePointer(), equalTo(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 1); + assertThat(is.actualSizeToRead(), equalTo(0l)); + assertThat(is.read(read), equalTo(-1)); + } + + @Test public void testSingleReadTwoBytesLimit() throws IOException { + RAMDirectory dir = new RAMDirectory(); + IndexOutput output = dir.createOutput("test"); + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 1); + } + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 2); + } + + output.close(); + + IndexInput input = dir.openInput("test"); + + assertThat(input.getFilePointer(), lessThan(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(-1)); + + assertThat(input.getFilePointer(), lessThan(input.length())); + is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(2)); + assertThat(is.read(), equalTo(-1)); + + assertThat(input.getFilePointer(), lessThan(input.length())); + is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(), equalTo(2)); + assertThat(is.read(), equalTo(2)); + assertThat(is.read(), equalTo(-1)); + + assertThat(input.getFilePointer(), equalTo(input.length())); + is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(0l)); + assertThat(is.read(), equalTo(-1)); + } + + @Test public void testReadMultiTwoBytesLimit1() throws IOException { + RAMDirectory dir = new RAMDirectory(); + IndexOutput output = dir.createOutput("test"); + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 1); + } + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 2); + } + + output.close(); + + IndexInput input = dir.openInput("test"); + + byte[] read = new byte[2]; + + assertThat(input.getFilePointer(), lessThan(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(read), equalTo(2)); + assertThat(read[0], equalTo((byte) 1)); + assertThat(read[1], equalTo((byte) 1)); + + assertThat(input.getFilePointer(), lessThan(input.length())); + is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(read), equalTo(2)); + assertThat(read[0], equalTo((byte) 1)); + assertThat(read[1], equalTo((byte) 2)); + + assertThat(input.getFilePointer(), lessThan(input.length())); + is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(read), equalTo(2)); + assertThat(read[0], equalTo((byte) 2)); + assertThat(read[1], equalTo((byte) 2)); + + assertThat(input.getFilePointer(), equalTo(input.length())); + is = new InputStreamIndexInput(input, 2); + assertThat(is.actualSizeToRead(), equalTo(0l)); + assertThat(is.read(read), equalTo(-1)); + } + + @Test public void testReadMultiFourBytesLimit() throws IOException { + RAMDirectory dir = new RAMDirectory(); + IndexOutput output = dir.createOutput("test"); + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 1); + } + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 2); + } + + output.close(); + + IndexInput input = dir.openInput("test"); + + byte[] read = new byte[4]; + + assertThat(input.getFilePointer(), lessThan(input.length())); + InputStreamIndexInput is = new InputStreamIndexInput(input, 4); + assertThat(is.actualSizeToRead(), equalTo(4l)); + assertThat(is.read(read), equalTo(4)); + assertThat(read[0], equalTo((byte) 1)); + assertThat(read[1], equalTo((byte) 1)); + assertThat(read[2], equalTo((byte) 1)); + assertThat(read[3], equalTo((byte) 2)); + + assertThat(input.getFilePointer(), lessThan(input.length())); + is = new InputStreamIndexInput(input, 4); + assertThat(is.actualSizeToRead(), equalTo(2l)); + assertThat(is.read(read), equalTo(2)); + assertThat(read[0], equalTo((byte) 2)); + assertThat(read[1], equalTo((byte) 2)); + + assertThat(input.getFilePointer(), equalTo(input.length())); + is = new InputStreamIndexInput(input, 4); + assertThat(is.actualSizeToRead(), equalTo(0l)); + assertThat(is.read(read), equalTo(-1)); + } +} diff --git a/plugins/cloud/build.gradle b/plugins/cloud/build.gradle index b939d20ce95..7ae679504d2 100644 --- a/plugins/cloud/build.gradle +++ b/plugins/cloud/build.gradle @@ -34,17 +34,17 @@ repositories { mavenRepo urls: "http://java-xmlbuilder.googlecode.com/svn/repo" } -jcloudsVersion = "1.0-beta-4" +jcloudsVersion = "1.0-SNAPSHOT" dependencies { compile project(':elasticsearch') - compile("org.jclouds:jclouds-httpnio:$jcloudsVersion") compile("org.jclouds:jclouds-blobstore:$jcloudsVersion") compile("org.jclouds:jclouds-aws:$jcloudsVersion") + compile("org.jclouds:jclouds-rackspace:$jcloudsVersion") - distLib("org.jclouds:jclouds-httpnio:$jcloudsVersion") distLib("org.jclouds:jclouds-blobstore:$jcloudsVersion") distLib("org.jclouds:jclouds-aws:$jcloudsVersion") + distLib("org.jclouds:jclouds-rackspace:$jcloudsVersion") testCompile project(':test-testng') testCompile('org.testng:testng:5.10:jdk15') { transitive = false } diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java index ca2a95456b2..9899555c1fd 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/cloud/blobstore/CloudBlobStoreService.java @@ -60,7 +60,7 @@ public class CloudBlobStoreService extends AbstractLifecycleComponent buildModules(Settings settings) { return ImmutableList.of(new JCloudsLoggingModule(settings)); } diff --git a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java b/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java index 4cb76829c5c..7172a24acca 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/discovery/cloud/CloudZenPing.java @@ -32,6 +32,8 @@ import org.jclouds.compute.ComputeService; import org.jclouds.compute.domain.ComputeMetadata; import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.NodeState; +import org.jclouds.compute.options.GetNodesOptions; +import org.jclouds.domain.Location; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -51,11 +53,14 @@ public class CloudZenPing extends UnicastZenPing { private final String tag; + private final String location; + public CloudZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, CloudComputeService computeService) { super(settings, threadPool, transportService, clusterName); this.computeService = computeService.context().getComputeService(); this.tag = componentSettings.get("tag"); + this.location = componentSettings.get("location"); this.ports = componentSettings.get("ports", "9300-9302"); // parse the ports just to see that they are valid new PortsRange(ports).ports(); @@ -63,10 +68,31 @@ public class CloudZenPing extends UnicastZenPing { @Override protected List buildDynamicNodes() { List discoNodes = newArrayList(); - Map nodes = computeService.getNodes(); + Map nodes = computeService.getNodes(GetNodesOptions.Builder.withDetails()); + logger.trace("Processing Nodes {}", nodes); for (Map.Entry node : nodes.entrySet()) { - NodeMetadata nodeMetadata = computeService.getNodeMetadata(node.getValue()); + NodeMetadata nodeMetadata = (NodeMetadata) node.getValue(); if (tag != null && !nodeMetadata.getTag().equals(tag)) { + logger.trace("Filtering node {} with unmatched tag {}", nodeMetadata.getName(), nodeMetadata.getTag()); + continue; + } + boolean filteredByLocation = true; + if (location != null) { + Location nodeLocation = nodeMetadata.getLocation(); + if (location.equals(nodeLocation.getId())) { + filteredByLocation = false; + } else { + if (nodeLocation.getParent() != null) { + if (location.equals(nodeLocation.getParent().getId())) { + filteredByLocation = false; + } + } + } + } else { + filteredByLocation = false; + } + if (filteredByLocation) { + logger.trace("Filtering node {} with unmatched location {}", nodeMetadata.getName(), nodeMetadata.getLocation()); continue; } if (nodeMetadata.getState() == NodeState.PENDING || nodeMetadata.getState() == NodeState.RUNNING) { diff --git a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java index 2394d5b0be2..b39b7248499 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java @@ -22,11 +22,15 @@ package org.elasticsearch.gateway.cloud; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; +import org.elasticsearch.cloud.jclouds.JCloudsUtils; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule; +import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; +import org.elasticsearch.util.guice.inject.Inject; import org.elasticsearch.util.guice.inject.Module; import org.elasticsearch.util.io.FastByteArrayInputStream; import org.elasticsearch.util.settings.Settings; @@ -39,6 +43,7 @@ import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.domain.Location; import java.io.IOException; @@ -47,35 +52,63 @@ import java.io.IOException; */ public class CloudGateway extends AbstractLifecycleComponent implements Gateway { - private final ClusterName clusterName; - private final BlobStoreContext blobStoreContext; private final String container; - private final String location; + private final Location location; + private final SizeValue chunkSize; + + private final String metadataContainer; private volatile int currentIndex; - public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) { + @Inject public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) { super(settings); - this.clusterName = clusterName; this.blobStoreContext = blobStoreService.context(); + this.chunkSize = componentSettings.getAsSize("chunk_size", null); + + String location = componentSettings.get("location"); + if (location == null) { + this.location = null; + } else { + this.location = blobStoreContext.getBlobStore().getAssignableLocations().get(location); + if (this.location == null) { + throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + blobStoreContext.getBlobStore().getAssignableLocations().keySet()); + } + } + String container = componentSettings.get("container"); if (container == null) { throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting"); } - this.location = componentSettings.get("location"); - this.container = container + "." + clusterName.value(); - blobStoreContext.getBlobStore().createContainerInLocation(location, container); + this.container = container + JCloudsUtils.BLOB_CONTAINER_SEP + clusterName.value(); + + this.metadataContainer = this.container + JCloudsUtils.BLOB_CONTAINER_SEP + "metadata"; + + logger.debug("Using location [{}], container [{}], metadata_container [{}]", this.location, this.container, metadataContainer); + + blobStoreContext.getBlobStore().createContainerInLocation(this.location, metadataContainer); this.currentIndex = findLatestIndex(); logger.debug("Latest metadata found at index [" + currentIndex + "]"); } + public String container() { + return this.container; + } + + public Location location() { + return this.location; + } + + public SizeValue chunkSize() { + return this.chunkSize; + } + @Override protected void doStart() throws ElasticSearchException { } @@ -99,14 +132,14 @@ public class CloudGateway extends AbstractLifecycleComponent implements blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength())); blob.setContentLength(builder.unsafeBytesLength()); - blobStoreContext.getBlobStore().putBlob(container, blob); + blobStoreContext.getBlobStore().putBlob(metadataContainer, blob); currentIndex++; - PageSet pageSet = blobStoreContext.getBlobStore().list(container); + PageSet pageSet = blobStoreContext.getBlobStore().list(metadataContainer); for (StorageMetadata storageMetadata : pageSet) { if (storageMetadata.getName().startsWith("metadata-") && !name.equals(storageMetadata.getName())) { - blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName()); + blobStoreContext.getAsyncBlobStore().removeBlob(metadataContainer, storageMetadata.getName()); } } } catch (IOException e) { @@ -128,14 +161,14 @@ public class CloudGateway extends AbstractLifecycleComponent implements } @Override public Class suggestIndexGateway() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return CloudIndexGatewayModule.class; } @Override public void reset() { - PageSet pageSet = blobStoreContext.getBlobStore().list(container); + PageSet pageSet = blobStoreContext.getBlobStore().list(metadataContainer); for (StorageMetadata storageMetadata : pageSet) { if (storageMetadata.getName().startsWith("metadata-")) { - blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName()); + blobStoreContext.getBlobStore().removeBlob(metadataContainer, storageMetadata.getName()); } } currentIndex = -1; @@ -143,7 +176,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements private int findLatestIndex() { int index = -1; - PageSet pageSet = blobStoreContext.getBlobStore().list(container); + PageSet pageSet = blobStoreContext.getBlobStore().list(metadataContainer); for (StorageMetadata storageMetadata : pageSet) { if (logger.isTraceEnabled()) { logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]"); @@ -168,7 +201,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements private MetaData readMetaData(String name) throws IOException { XContentParser parser = null; try { - Blob blob = blobStoreContext.getBlobStore().getBlob(container, name); + Blob blob = blobStoreContext.getBlobStore().getBlob(metadataContainer, name); parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent()); return MetaData.Builder.fromXContent(parser, settings); } finally { diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java new file mode 100644 index 00000000000..a89dac69b98 --- /dev/null +++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.cloud; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; +import org.elasticsearch.cloud.jclouds.JCloudsUtils; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.gateway.cloud.CloudGateway; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.gateway.IndexGateway; +import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.util.SizeUnit; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.guice.inject.Inject; +import org.elasticsearch.util.settings.Settings; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.domain.Location; + +/** + * @author kimchy (shay.banon) + */ +public class CloudIndexGateway extends AbstractIndexComponent implements IndexGateway { + + private final Gateway gateway; + + private final String indexContainer; + + private final Location location; + + private final SizeValue chunkSize; + + private final BlobStoreContext blobStoreContext; + + + @Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, CloudBlobStoreService blobStoreService, Gateway gateway) { + super(index, indexSettings); + this.blobStoreContext = blobStoreService.context(); + this.gateway = gateway; + + String location = componentSettings.get("location"); + String container = componentSettings.get("container"); + SizeValue chunkSize = componentSettings.getAsSize("chunk_size", null); + + if (gateway instanceof CloudGateway) { + CloudGateway cloudGateway = (CloudGateway) gateway; + if (container == null) { + container = cloudGateway.container() + JCloudsUtils.BLOB_CONTAINER_SEP + index.name(); + } + if (chunkSize == null) { + chunkSize = cloudGateway.chunkSize(); + } + } + + if (chunkSize == null) { + chunkSize = new SizeValue(4, SizeUnit.GB); + } + + if (location == null) { + if (gateway instanceof CloudGateway) { + CloudGateway cloudGateway = (CloudGateway) gateway; + this.location = cloudGateway.location(); + } else { + this.location = null; + } + } else { + this.location = blobStoreContext.getBlobStore().getAssignableLocations().get(location); + if (this.location == null) { + throw new ElasticSearchIllegalArgumentException("Not a valid location [" + location + "], available locations " + blobStoreContext.getBlobStore().getAssignableLocations().keySet()); + } + } + this.indexContainer = container; + this.chunkSize = chunkSize; + + logger.debug("Using location [{}], container [{}], chunk_size [{}]", this.location, this.indexContainer, this.chunkSize); + +// blobStoreContext.getBlobStore().createContainerInLocation(this.location, this.indexContainer); + } + + public Location indexLocation() { + return this.location; + } + + public String indexContainer() { + return this.indexContainer; + } + + public SizeValue chunkSize() { + return this.chunkSize; + } + + @Override public Class shardGatewayClass() { + return CloudIndexShardGateway.class; + } + + @Override public void close(boolean delete) throws ElasticSearchException { + if (!delete) { + return; + } +// blobStoreContext.getBlobStore().deleteContainer(indexContainer); + } +} diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java new file mode 100644 index 00000000000..77b3a732120 --- /dev/null +++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGatewayModule.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.cloud; + +import org.elasticsearch.index.gateway.IndexGateway; +import org.elasticsearch.util.guice.inject.AbstractModule; + +/** + * @author kimchy (shay.banon) + */ +public class CloudIndexGatewayModule extends AbstractModule { + + @Override protected void configure() { + bind(IndexGateway.class).to(CloudIndexGateway.class).asEagerSingleton(); + } +} \ No newline at end of file diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java new file mode 100644 index 00000000000..e9cc3c30b5a --- /dev/null +++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java @@ -0,0 +1,533 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.cloud; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; +import org.elasticsearch.cloud.jclouds.JCloudsUtils; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; +import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.SizeUnit; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.TimeValue; +import org.elasticsearch.util.gcommon.collect.Lists; +import org.elasticsearch.util.gcommon.collect.Maps; +import org.elasticsearch.util.guice.inject.Inject; +import org.elasticsearch.util.io.FastByteArrayInputStream; +import org.elasticsearch.util.io.stream.BytesStreamOutput; +import org.elasticsearch.util.io.stream.InputStreamStreamInput; +import org.elasticsearch.util.lucene.Directories; +import org.elasticsearch.util.lucene.store.InputStreamIndexInput; +import org.elasticsearch.util.settings.Settings; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.PageSet; +import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.options.ListContainerOptions; +import org.jclouds.domain.Location; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.index.translog.TranslogStreams.*; + +/** + * @author kimchy (shay.banon) + */ +public class CloudIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { + + private final InternalIndexShard indexShard; + + private final ThreadPool threadPool; + + private final Store store; + + private final Location shardLocation; + + private final String shardContainer; + + private final String shardIndexContainer; + + private final String shardTranslogContainer; + + private final BlobStoreContext blobStoreContext; + + private final SizeValue chunkSize; + + private volatile int currentTranslogPartToWrite = 1; + + @Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, ThreadPool threadPool, + Store store, CloudIndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) { + super(shardId, indexSettings); + this.indexShard = (InternalIndexShard) indexShard; + this.threadPool = threadPool; + this.store = store; + this.blobStoreContext = blobStoreService.context(); + + this.chunkSize = cloudIndexGateway.chunkSize(); + this.shardLocation = cloudIndexGateway.indexLocation(); + this.shardContainer = cloudIndexGateway.indexContainer() + JCloudsUtils.BLOB_CONTAINER_SEP + shardId.id(); + + this.shardIndexContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "index"; + this.shardTranslogContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "translog"; + + logger.trace("Using location [{}], container [{}]", this.shardLocation, this.shardContainer); + + blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardTranslogContainer); + blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardIndexContainer); + } + + @Override public boolean requiresSnapshotScheduling() { + return true; + } + + @Override public String toString() { + StringBuilder sb = new StringBuilder("cloud["); + if (shardLocation != null) { + sb.append(shardLocation).append("/"); + } + sb.append(shardContainer).append("]"); + return sb.toString(); + } + + @Override public void close(boolean delete) { + if (!delete) { + return; + } + blobStoreContext.getBlobStore().deleteContainer(shardIndexContainer); + blobStoreContext.getBlobStore().deleteContainer(shardTranslogContainer); + } + + @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { + RecoveryStatus.Index recoveryStatusIndex = recoverIndex(); + RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog(); + return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); + } + + @Override public SnapshotStatus snapshot(Snapshot snapshot) { + long totalTimeStart = System.currentTimeMillis(); + boolean indexDirty = false; + + final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit(); + final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot(); + + Map allIndicesMetadata = null; + + int indexNumberOfFiles = 0; + long indexTotalFilesSize = 0; + long indexTime = 0; + if (snapshot.indexChanged()) { + long time = System.currentTimeMillis(); + indexDirty = true; + allIndicesMetadata = listAllMetadatas(shardIndexContainer); + final Map allIndicesMetadataF = allIndicesMetadata; + // snapshot into the index + final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); + final AtomicReference lastException = new AtomicReference(); + for (final String fileName : snapshotIndexCommit.getFiles()) { + // don't copy over the segments file, it will be copied over later on as part of the + // final snapshot phase + if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) { + latch.countDown(); + continue; + } + IndexInput indexInput = null; + try { + indexInput = snapshotIndexCommit.getDirectory().openInput(fileName); + StorageMetadata metadata = allIndicesMetadata.get(fileName); + if (metadata != null && (metadata.getSize() == indexInput.length())) { + // we assume its the same one, no need to copy + latch.countDown(); + continue; + } + } catch (Exception e) { + logger.debug("Failed to verify file equality based on length, copying...", e); + } finally { + if (indexInput != null) { + try { + indexInput.close(); + } catch (IOException e) { + // ignore + } + } + } + indexNumberOfFiles++; + try { + indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName); + } catch (IOException e) { + // ignore... + } + deleteFile(fileName, allIndicesMetadata); + threadPool.execute(new Runnable() { + @Override public void run() { + try { + copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, allIndicesMetadataF); + } catch (Exception e) { + lastException.set(e); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + lastException.set(e); + } + if (lastException.get() != null) { + throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get()); + } + indexTime = System.currentTimeMillis() - time; + } + + int translogNumberOfOperations = 0; + long translogTime = 0; + if (snapshot.newTranslogCreated()) { + currentTranslogPartToWrite = 1; + String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite; + + try { + long time = System.currentTimeMillis(); + + BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached(); + streamOutput.writeInt(translogSnapshot.size()); + for (Translog.Operation operation : translogSnapshot) { + translogNumberOfOperations++; + writeTranslogOperation(streamOutput, operation); + } + + Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName); + blob.setContentLength(streamOutput.size()); + blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size())); + blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob); + + currentTranslogPartToWrite++; + + translogTime = System.currentTimeMillis() - time; + } catch (Exception e) { + throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e); + } + } else if (snapshot.sameTranslogNewOperations()) { + String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite; + try { + long time = System.currentTimeMillis(); + + BytesStreamOutput streamOutput = BytesStreamOutput.Cached.cached(); + streamOutput.writeInt(translogSnapshot.size() - snapshot.lastTranslogSize()); + for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) { + translogNumberOfOperations++; + writeTranslogOperation(streamOutput, operation); + } + + Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName); + blob.setContentLength(streamOutput.size()); + blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size())); + blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob); + + currentTranslogPartToWrite++; + + translogTime = System.currentTimeMillis() - time; + } catch (Exception e) { + throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogBlobName + "]", e); + } + } + + // now write the segments file + try { + if (indexDirty) { + indexNumberOfFiles++; + deleteFile(snapshotIndexCommit.getSegmentsFileName(), allIndicesMetadata); + indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName()); + long time = System.currentTimeMillis(); + + + IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName()); + try { + Blob blob = blobStoreContext.getBlobStore().newBlob(snapshotIndexCommit.getSegmentsFileName()); + InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE); + blob.setPayload(is); + blob.setContentLength(is.actualSizeToRead()); + blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob); + } finally { + try { + indexInput.close(); + } catch (Exception e) { + // ignore + } + } + indexTime += (System.currentTimeMillis() - time); + } + } catch (Exception e) { + throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e); + } + + // delete the old translog + if (snapshot.newTranslogCreated()) { + String currentTranslogPrefix = String.valueOf(translogSnapshot.translogId()) + "."; + Map allMetadatas = listAllMetadatas(shardTranslogContainer); + for (Map.Entry entry : allMetadatas.entrySet()) { + if (!entry.getKey().startsWith(currentTranslogPrefix)) { + blobStoreContext.getAsyncBlobStore().removeBlob(shardTranslogContainer, entry.getKey()); + } + } + } + + if (indexDirty) { + for (Map.Entry entry : allIndicesMetadata.entrySet()) { + String blobName = entry.getKey(); + if (blobName.contains(".part")) { + blobName = blobName.substring(0, blobName.indexOf(".part")); + } + boolean found = false; + for (final String fileName : snapshotIndexCommit.getFiles()) { + if (blobName.equals(fileName)) { + found = true; + break; + } + } + if (!found) { + blobStoreContext.getAsyncBlobStore().removeBlob(shardIndexContainer, entry.getKey()); + } + } + } + + return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart), + new SnapshotStatus.Index(indexNumberOfFiles, new SizeValue(indexTotalFilesSize), new TimeValue(indexTime)), + new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime))); + } + + private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { + final Map allMetaDatas = listAllMetadatas(shardIndexContainer); + + // filter out to only have actual files + final Map filesMetaDatas = Maps.newHashMap(); + for (Map.Entry entry : allMetaDatas.entrySet()) { + if (entry.getKey().contains(".part")) { + continue; + } + filesMetaDatas.put(entry.getKey(), entry.getValue()); + } + + final CountDownLatch latch = new CountDownLatch(filesMetaDatas.size()); + final AtomicReference lastException = new AtomicReference(); + for (final Map.Entry entry : filesMetaDatas.entrySet()) { + threadPool.execute(new Runnable() { + @Override public void run() { + try { + copyToDirectory(entry.getValue(), allMetaDatas); + } catch (Exception e) { + logger.debug("Failed to read [" + entry.getKey() + "] into [" + store + "]", e); + lastException.set(e); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + lastException.set(e); + } + + long totalSize = 0; + for (Map.Entry entry : allMetaDatas.entrySet()) { + totalSize += entry.getValue().getSize(); + } + + long version = -1; + try { + if (IndexReader.indexExists(store.directory())) { + version = IndexReader.getCurrentVersion(store.directory()); + } + } catch (IOException e) { + throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); + } + + return new RecoveryStatus.Index(version, filesMetaDatas.size(), new SizeValue(totalSize, SizeUnit.BYTES)); + } + + private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { + final Map allMetaDatas = listAllMetadatas(shardTranslogContainer); + + long latestTranslogId = -1; + for (String name : allMetaDatas.keySet()) { + long translogId = Long.parseLong(name.substring(0, name.indexOf('.'))); + if (translogId > latestTranslogId) { + latestTranslogId = translogId; + } + } + + if (latestTranslogId == -1) { + // no recovery file found, start the shard and bail + indexShard.start(); + return new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)); + } + + + try { + ArrayList operations = Lists.newArrayList(); + + long size = 0; + int index = 1; + while (true) { + String translogPartName = String.valueOf(latestTranslogId) + "." + index; + if (!allMetaDatas.containsKey(translogPartName)) { + break; + } + Blob blob = blobStoreContext.getBlobStore().getBlob(shardTranslogContainer, translogPartName); + if (blob == null) { + break; + } + size += blob.getContentLength(); + InputStreamStreamInput streamInput = new InputStreamStreamInput(blob.getContent()); + int numberOfOperations = streamInput.readInt(); + for (int i = 0; i < numberOfOperations; i++) { + operations.add(readTranslogOperation(streamInput)); + } + index++; + } + currentTranslogPartToWrite = index; + + indexShard.performRecovery(operations); + return new RecoveryStatus.Translog(latestTranslogId, operations.size(), new SizeValue(size, SizeUnit.BYTES)); + } catch (Exception e) { + throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e); + } + } + + private Map listAllMetadatas(String container) { + final Map allMetaDatas = Maps.newHashMap(); + + String nextMarker = null; + while (true) { + ListContainerOptions options = ListContainerOptions.Builder.maxResults(10000); + if (nextMarker != null) { + options.afterMarker(nextMarker); + } + PageSet pageSet = blobStoreContext.getBlobStore().list(container, options); + for (StorageMetadata metadata : pageSet) { + allMetaDatas.put(metadata.getName(), metadata); + } + nextMarker = pageSet.getNextMarker(); + if (nextMarker == null) { + break; + } + } + return allMetaDatas; + } + + private void deleteFile(String fileName, Map allIndicesMetadata) { + // first, check and delete all files with this name + for (Map.Entry entry : allIndicesMetadata.entrySet()) { + String blobName = entry.getKey(); + if (blobName.contains(".part")) { + blobName = blobName.substring(0, blobName.indexOf(".part")); + } + if (blobName.equals(fileName)) { + blobStoreContext.getBlobStore().removeBlob(shardIndexContainer, blobName); + } + } + } + + + private void copyFromDirectory(Directory dir, String fileName, Map allIndicesMetadata) throws IOException { + IndexInput indexInput = dir.openInput(fileName); + + try { + Blob blob = blobStoreContext.getBlobStore().newBlob(fileName); + InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes()); + blob.setPayload(is); + blob.setContentLength(is.actualSizeToRead()); + blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob); + + int part = 1; + while (indexInput.getFilePointer() < indexInput.length()) { + is = new InputStreamIndexInput(indexInput, chunkSize.bytes()); + if (is.actualSizeToRead() <= 0) { + break; + } + blob = blobStoreContext.getBlobStore().newBlob(fileName + ".part" + part); + blob.setPayload(is); + blob.setContentLength(is.actualSizeToRead()); + blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob); + part++; + } + } finally { + try { + indexInput.close(); + } catch (Exception e) { + // ignore + } + } + } + + private void copyToDirectory(StorageMetadata metadata, Map allMetadatas) throws IOException { + Blob blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, metadata.getName()); + + byte[] buffer = new byte[16384]; + IndexOutput indexOutput = store.directory().createOutput(metadata.getName()); + + copy(blob.getContent(), indexOutput, buffer); + blob.getContent().close(); + + // check the metadatas we have + int part = 1; + while (true) { + String partName = metadata.getName() + ".part" + part; + if (!allMetadatas.containsKey(partName)) { + break; + } + blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, partName); + copy(blob.getContent(), indexOutput, buffer); + blob.getContent().close(); + part++; + } + + indexOutput.close(); + + Directories.sync(store.directory(), metadata.getName()); + } + + private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException { + int len; + while ((len = is.read(buffer)) != -1) { + indexOutput.writeBytes(buffer, len); + } + } +}