Cloud Plugin: Gateway should store meta data and indices under the same container, closes #180.

This commit is contained in:
kimchy 2010-05-18 23:38:56 +03:00
parent 6185e439c7
commit c7075c1600
6 changed files with 92 additions and 80 deletions

View File

@ -23,6 +23,7 @@
<w>coord</w> <w>coord</w>
<w>cpus</w> <w>cpus</w>
<w>datagram</w> <w>datagram</w>
<w>datas</w>
<w>desc</w> <w>desc</w>
<w>deserialize</w> <w>deserialize</w>
<w>elasticsearch</w> <w>elasticsearch</w>

View File

@ -20,7 +20,6 @@
<root url="jar://$GRADLE_REPOSITORY$/javax.annotation/jsr250-api/jars/jsr250-api-1.0.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/javax.annotation/jsr250-api/jars/jsr250-api-1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/javax.inject/inject/jars/inject-1.0.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/javax.inject/inject/jars/inject-1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/aopalliance/aopalliance/jars/aopalliance-1.0.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/aopalliance/aopalliance/jars/aopalliance-1.0.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/resteasy-jaxrs-client/jars/resteasy-jaxrs-client-1.2.1.GA-SNAPSHOT.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/jaxrs-api/jars/jaxrs-api-1.2.1.GA.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/jaxrs-api/jars/jaxrs-api-1.2.1.GA.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/com.google.guava/guava-io/jars/guava-io-r03.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/com.google.guava/guava-io/jars/guava-io-r03.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/com.google.guava/guava-primitives/jars/guava-primitives-r03.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/com.google.guava/guava-primitives/jars/guava-primitives-r03.jar!/" />
@ -44,6 +43,8 @@
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-scriptbuilder/jars/jclouds-scriptbuilder-1.0-beta-5.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-scriptbuilder/jars/jclouds-scriptbuilder-1.0-beta-5.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-vcloud/jars/jclouds-vcloud-1.0-beta-5.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-vcloud/jars/jclouds-vcloud-1.0-beta-5.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-rackspace/jars/jclouds-rackspace-1.0-beta-5.jar!/" /> <root url="jar://$GRADLE_REPOSITORY$/org.jclouds/jclouds-rackspace/jars/jclouds-rackspace-1.0-beta-5.jar!/" />
<root url="file://$GRADLE_REPOSITORY$/org.jboss.resteasy/resteasy-jaxrs-all" />
<root url="jar://$GRADLE_REPOSITORY$/org.jboss.resteasy/resteasy-jaxrs-client/jars/resteasy-jaxrs-client-1.2.1.GA.jar!/" />
</CLASSES> </CLASSES>
<JAVADOC> <JAVADOC>
<root url="http://jclouds.rimuhosting.com/apidocs/" /> <root url="http://jclouds.rimuhosting.com/apidocs/" />

View File

@ -29,8 +29,6 @@ import org.elasticsearch.util.settings.Settings;
*/ */
public class JCloudsUtils { public class JCloudsUtils {
public static final String BLOB_CONTAINER_SEP = "-";
public static Iterable<? extends Module> buildModules(Settings settings) { public static Iterable<? extends Module> buildModules(Settings settings) {
return ImmutableList.of(new JCloudsLoggingModule(settings)); return ImmutableList.of(new JCloudsLoggingModule(settings));
} }

View File

@ -22,7 +22,6 @@ package org.elasticsearch.gateway.cloud;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
@ -48,6 +47,8 @@ import org.jclouds.domain.Location;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@ -60,9 +61,9 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private final Location location; private final Location location;
private final SizeValue chunkSize; private final String metaDataDirectory;
private final String metadataContainer; private final SizeValue chunkSize;
private volatile int currentIndex; private volatile int currentIndex;
@ -90,17 +91,13 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
} }
} }
String container = componentSettings.get("container"); this.container = componentSettings.get("container");
if (container == null) { if (container == null) {
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting"); throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
} }
this.container = container + JCloudsUtils.BLOB_CONTAINER_SEP + clusterName.value(); this.metaDataDirectory = clusterName.value() + "/metadata";
logger.debug("Using location [{}], container [{}], metadata_directory [{}]", this.location, this.container, metaDataDirectory);
this.metadataContainer = this.container + JCloudsUtils.BLOB_CONTAINER_SEP + "metadata"; blobStoreContext.getBlobStore().createContainerInLocation(this.location, container);
logger.debug("Using location [{}], container [{}], metadata_container [{}]", this.location, this.container, metadataContainer);
blobStoreContext.getBlobStore().createContainerInLocation(this.location, metadataContainer);
this.currentIndex = findLatestIndex(); this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]"); logger.debug("Latest metadata found at index [" + currentIndex + "]");
@ -129,7 +126,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
@Override public void write(MetaData metaData) throws GatewayException { @Override public void write(MetaData metaData) throws GatewayException {
try { try {
String name = "metadata-" + (currentIndex + 1); String name = metaDataDirectory + "/metadata-" + (currentIndex + 1);
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON); BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
builder.prettyPrint(); builder.prettyPrint();
@ -141,14 +138,14 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength())); blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()));
blob.setContentLength(builder.unsafeBytesLength()); blob.setContentLength(builder.unsafeBytesLength());
blobStoreContext.getBlobStore().putBlob(metadataContainer, blob); blobStoreContext.getBlobStore().putBlob(container, blob);
currentIndex++; currentIndex++;
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer); PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory));
for (StorageMetadata storageMetadata : pageSet) { for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-") && !name.equals(storageMetadata.getName())) { if (storageMetadata.getName().contains("metadata-") && !name.equals(storageMetadata.getName())) {
blobStoreContext.getAsyncBlobStore().removeBlob(metadataContainer, storageMetadata.getName()); blobStoreContext.getAsyncBlobStore().removeBlob(container, storageMetadata.getName());
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -161,7 +158,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
if (currentIndex == -1) if (currentIndex == -1)
return null; return null;
return readMetaData("metadata-" + currentIndex); return readMetaData(metaDataDirectory + "/metadata-" + currentIndex);
} catch (GatewayException e) { } catch (GatewayException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -174,10 +171,10 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
} }
@Override public void reset() { @Override public void reset() {
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer); PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory));
for (StorageMetadata storageMetadata : pageSet) { for (StorageMetadata storageMetadata : pageSet) {
if (storageMetadata.getName().startsWith("metadata-")) { if (storageMetadata.getName().contains("metadata-")) {
blobStoreContext.getBlobStore().removeBlob(metadataContainer, storageMetadata.getName()); blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
} }
} }
currentIndex = -1; currentIndex = -1;
@ -185,15 +182,15 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private int findLatestIndex() { private int findLatestIndex() {
int index = -1; int index = -1;
PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer); PageSet<? extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory).maxResults(1000));
for (StorageMetadata storageMetadata : pageSet) { for (StorageMetadata storageMetadata : pageSet) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]"); logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]");
} }
if (!storageMetadata.getName().startsWith("metadata-")) { if (!storageMetadata.getName().contains("metadata-")) {
continue; continue;
} }
int fileIndex = Integer.parseInt(storageMetadata.getName().substring(storageMetadata.getName().indexOf('-') + 1)); int fileIndex = Integer.parseInt(storageMetadata.getName().substring(storageMetadata.getName().lastIndexOf('-') + 1));
if (fileIndex >= index) { if (fileIndex >= index) {
// try and read the meta data // try and read the meta data
try { try {
@ -210,7 +207,7 @@ public class CloudGateway extends AbstractLifecycleComponent<Gateway> implements
private MetaData readMetaData(String name) throws IOException { private MetaData readMetaData(String name) throws IOException {
XContentParser parser = null; XContentParser parser = null;
try { try {
Blob blob = blobStoreContext.getBlobStore().getBlob(metadataContainer, name); Blob blob = blobStoreContext.getBlobStore().getBlob(container, name);
parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent()); parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent());
return MetaData.Builder.fromXContent(parser, settings); return MetaData.Builder.fromXContent(parser, settings);
} finally { } finally {

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.gateway.cloud;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.jclouds.JCloudsUtils; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.cloud.CloudGateway; import org.elasticsearch.gateway.cloud.CloudGateway;
import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.AbstractIndexComponent;
@ -48,13 +48,15 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
private final String indexContainer; private final String indexContainer;
private final String indexDirectory;
private final Location location; private final Location location;
private final SizeValue chunkSize; private final SizeValue chunkSize;
private final BlobStoreContext blobStoreContext; private final BlobStoreContext blobStoreContext;
@Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, CloudBlobStoreService blobStoreService, Gateway gateway) { @Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, ClusterName clusterName, CloudBlobStoreService blobStoreService, Gateway gateway) {
super(index, indexSettings); super(index, indexSettings);
this.blobStoreContext = blobStoreService.context(); this.blobStoreContext = blobStoreService.context();
this.gateway = gateway; this.gateway = gateway;
@ -66,7 +68,7 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
if (gateway instanceof CloudGateway) { if (gateway instanceof CloudGateway) {
CloudGateway cloudGateway = (CloudGateway) gateway; CloudGateway cloudGateway = (CloudGateway) gateway;
if (container == null) { if (container == null) {
container = cloudGateway.container() + JCloudsUtils.BLOB_CONTAINER_SEP + index.name(); container = cloudGateway.container();
} }
if (chunkSize == null) { if (chunkSize == null) {
chunkSize = cloudGateway.chunkSize(); chunkSize = cloudGateway.chunkSize();
@ -99,11 +101,10 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
} }
} }
this.indexContainer = container; this.indexContainer = container;
this.indexDirectory = clusterName.value() + "/" + index.name();
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
logger.debug("Using location [{}], container [{}], chunk_size [{}]", this.location, this.indexContainer, this.chunkSize); logger.debug("Using location [{}], container [{}], index_directory [{}], chunk_size [{}]", this.location, this.indexContainer, this.indexDirectory, this.chunkSize);
// blobStoreContext.getBlobStore().createContainerInLocation(this.location, this.indexContainer);
} }
public Location indexLocation() { public Location indexLocation() {
@ -114,6 +115,10 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
return this.indexContainer; return this.indexContainer;
} }
public String indexDirectory() {
return this.indexDirectory;
}
public SizeValue chunkSize() { public SizeValue chunkSize() {
return this.chunkSize; return this.chunkSize;
} }
@ -126,6 +131,5 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
if (!delete) { if (!delete) {
return; return;
} }
// blobStoreContext.getBlobStore().deleteContainer(indexContainer);
} }
} }

View File

@ -24,7 +24,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService; import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
@ -82,11 +81,13 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
private final Location shardLocation; private final Location shardLocation;
private final String shardContainer; private final String container;
private final String shardIndexContainer; private final String shardDirectory;
private final String shardTranslogContainer; private final String shardIndexDirectory;
private final String shardTranslogDirectory;
private final BlobStoreContext blobStoreContext; private final BlobStoreContext blobStoreContext;
@ -105,15 +106,13 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
this.chunkSize = cloudIndexGateway.chunkSize(); this.chunkSize = cloudIndexGateway.chunkSize();
this.shardLocation = cloudIndexGateway.indexLocation(); this.shardLocation = cloudIndexGateway.indexLocation();
this.shardContainer = cloudIndexGateway.indexContainer() + JCloudsUtils.BLOB_CONTAINER_SEP + shardId.id(); this.container = cloudIndexGateway.indexContainer();
this.shardIndexContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "index"; this.shardDirectory = cloudIndexGateway.indexDirectory() + "/" + shardId.id();
this.shardTranslogContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "translog"; this.shardIndexDirectory = shardDirectory + "/index";
this.shardTranslogDirectory = shardDirectory + "/translog";
logger.trace("Using location [{}], container [{}]", this.shardLocation, this.shardContainer); logger.trace("Using location [{}], container [{}], shard_directory [{}]", this.shardLocation, this.container, this.shardDirectory);
blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardTranslogContainer);
blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardIndexContainer);
} }
@Override public boolean requiresSnapshotScheduling() { @Override public boolean requiresSnapshotScheduling() {
@ -125,7 +124,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (shardLocation != null) { if (shardLocation != null) {
sb.append(shardLocation).append("/"); sb.append(shardLocation).append("/");
} }
sb.append(shardContainer).append("]"); sb.append(container).append("]");
return sb.toString(); return sb.toString();
} }
@ -133,8 +132,15 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (!delete) { if (!delete) {
return; return;
} }
blobStoreContext.getBlobStore().deleteContainer(shardIndexContainer);
blobStoreContext.getBlobStore().deleteContainer(shardTranslogContainer); Map<String, StorageMetadata> metaDatas = listAllMetadatas(container, shardIndexDirectory);
for (Map.Entry<String, StorageMetadata> entry : metaDatas.entrySet()) {
blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
}
metaDatas = listAllMetadatas(container, shardTranslogDirectory);
for (Map.Entry<String, StorageMetadata> entry : metaDatas.entrySet()) {
blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
}
} }
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
@ -158,7 +164,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (snapshot.indexChanged()) { if (snapshot.indexChanged()) {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
indexDirty = true; indexDirty = true;
allIndicesMetadata = listAllMetadatas(shardIndexContainer); allIndicesMetadata = listAllMetadatas(container, shardIndexDirectory);
final Map<String, StorageMetadata> allIndicesMetadataF = allIndicesMetadata; final Map<String, StorageMetadata> allIndicesMetadataF = allIndicesMetadata;
// snapshot into the index // snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
@ -196,7 +202,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
} catch (IOException e) { } catch (IOException e) {
// ignore... // ignore...
} }
deleteFile(fileName, allIndicesMetadata); deleteFile(shardIndexDirectory + "/" + fileName, allIndicesMetadata);
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
@ -224,7 +230,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
long translogTime = 0; long translogTime = 0;
if (snapshot.newTranslogCreated()) { if (snapshot.newTranslogCreated()) {
currentTranslogPartToWrite = 1; currentTranslogPartToWrite = 1;
String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite; String translogBlobName = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
try { try {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
@ -239,7 +245,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName); Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size()); blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size())); blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob); blobStoreContext.getBlobStore().putBlob(container, blob);
currentTranslogPartToWrite++; currentTranslogPartToWrite++;
@ -248,7 +254,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e); throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e);
} }
} else if (snapshot.sameTranslogNewOperations()) { } else if (snapshot.sameTranslogNewOperations()) {
String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite; String translogBlobName = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
try { try {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
@ -262,7 +268,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName); Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size()); blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size())); blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob); blobStoreContext.getBlobStore().putBlob(container, blob);
currentTranslogPartToWrite++; currentTranslogPartToWrite++;
@ -283,11 +289,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName()); IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName());
try { try {
Blob blob = blobStoreContext.getBlobStore().newBlob(snapshotIndexCommit.getSegmentsFileName()); Blob blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + snapshotIndexCommit.getSegmentsFileName());
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE); InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE);
blob.setPayload(is); blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead()); blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob); blobStoreContext.getBlobStore().putBlob(container, blob);
} finally { } finally {
try { try {
indexInput.close(); indexInput.close();
@ -303,30 +309,32 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
// delete the old translog // delete the old translog
if (snapshot.newTranslogCreated()) { if (snapshot.newTranslogCreated()) {
String currentTranslogPrefix = String.valueOf(translogSnapshot.translogId()) + "."; String currentTranslogPrefix = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + ".";
Map<String, StorageMetadata> allMetadatas = listAllMetadatas(shardTranslogContainer); Map<String, StorageMetadata> allMetadatas = listAllMetadatas(container, shardTranslogDirectory);
for (Map.Entry<String, StorageMetadata> entry : allMetadatas.entrySet()) { for (Map.Entry<String, StorageMetadata> entry : allMetadatas.entrySet()) {
if (!entry.getKey().startsWith(currentTranslogPrefix)) { if (!entry.getKey().startsWith(currentTranslogPrefix)) {
blobStoreContext.getAsyncBlobStore().removeBlob(shardTranslogContainer, entry.getKey()); blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
} }
} }
} }
if (indexDirty) { if (indexDirty) {
for (Map.Entry<String, StorageMetadata> entry : allIndicesMetadata.entrySet()) { for (Map.Entry<String, StorageMetadata> entry : allIndicesMetadata.entrySet()) {
String blobName = entry.getKey(); String blobNameToMatch = entry.getKey();
if (blobName.contains(".part")) { if (blobNameToMatch.contains(".part")) {
blobName = blobName.substring(0, blobName.indexOf(".part")); blobNameToMatch = blobNameToMatch.substring(0, blobNameToMatch.indexOf(".part"));
} }
// remove the directory prefix
blobNameToMatch = blobNameToMatch.substring(shardIndexDirectory.length() + 1);
boolean found = false; boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) { for (final String fileName : snapshotIndexCommit.getFiles()) {
if (blobName.equals(fileName)) { if (blobNameToMatch.equals(fileName)) {
found = true; found = true;
break; break;
} }
} }
if (!found) { if (!found) {
blobStoreContext.getAsyncBlobStore().removeBlob(shardIndexContainer, entry.getKey()); blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
} }
} }
} }
@ -337,7 +345,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
} }
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(shardIndexContainer); final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(container, shardIndexDirectory);
// filter out to only have actual files // filter out to only have actual files
final Map<String, StorageMetadata> filesMetaDatas = Maps.newHashMap(); final Map<String, StorageMetadata> filesMetaDatas = Maps.newHashMap();
@ -395,11 +403,12 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
} }
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(shardTranslogContainer); final Map<String, StorageMetadata> allMetaDatas = listAllMetadatas(container, shardTranslogDirectory);
long latestTranslogId = -1; long latestTranslogId = -1;
for (String name : allMetaDatas.keySet()) { for (String name : allMetaDatas.keySet()) {
long translogId = Long.parseLong(name.substring(0, name.indexOf('.'))); String translogName = name.substring(shardTranslogDirectory.length() + 1);
long translogId = Long.parseLong(translogName.substring(0, translogName.lastIndexOf('.')));
if (translogId > latestTranslogId) { if (translogId > latestTranslogId) {
latestTranslogId = translogId; latestTranslogId = translogId;
} }
@ -418,11 +427,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
long size = 0; long size = 0;
int index = 1; int index = 1;
while (true) { while (true) {
String translogPartName = String.valueOf(latestTranslogId) + "." + index; String translogPartName = shardTranslogDirectory + "/" + String.valueOf(latestTranslogId) + "." + index;
if (!allMetaDatas.containsKey(translogPartName)) { if (!allMetaDatas.containsKey(translogPartName)) {
break; break;
} }
Blob blob = blobStoreContext.getBlobStore().getBlob(shardTranslogContainer, translogPartName); Blob blob = blobStoreContext.getBlobStore().getBlob(container, translogPartName);
if (blob == null) { if (blob == null) {
break; break;
} }
@ -443,12 +452,12 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
} }
} }
private Map<String, StorageMetadata> listAllMetadatas(String container) { private Map<String, StorageMetadata> listAllMetadatas(String container, String directory) {
final Map<String, StorageMetadata> allMetaDatas = Maps.newHashMap(); final Map<String, StorageMetadata> allMetaDatas = Maps.newHashMap();
String nextMarker = null; String nextMarker = null;
while (true) { while (true) {
ListContainerOptions options = ListContainerOptions.Builder.maxResults(10000); ListContainerOptions options = ListContainerOptions.Builder.inDirectory(directory).maxResults(10000);
if (nextMarker != null) { if (nextMarker != null) {
options.afterMarker(nextMarker); options.afterMarker(nextMarker);
} }
@ -472,7 +481,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
blobName = blobName.substring(0, blobName.indexOf(".part")); blobName = blobName.substring(0, blobName.indexOf(".part"));
} }
if (blobName.equals(fileName)) { if (blobName.equals(fileName)) {
blobStoreContext.getBlobStore().removeBlob(shardIndexContainer, blobName); blobStoreContext.getBlobStore().removeBlob(container, blobName);
} }
} }
} }
@ -482,11 +491,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
IndexInput indexInput = dir.openInput(fileName); IndexInput indexInput = dir.openInput(fileName);
try { try {
Blob blob = blobStoreContext.getBlobStore().newBlob(fileName); Blob blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + fileName);
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes()); InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
blob.setPayload(is); blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead()); blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob); blobStoreContext.getBlobStore().putBlob(container, blob);
int part = 1; int part = 1;
while (indexInput.getFilePointer() < indexInput.length()) { while (indexInput.getFilePointer() < indexInput.length()) {
@ -494,10 +503,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (is.actualSizeToRead() <= 0) { if (is.actualSizeToRead() <= 0) {
break; break;
} }
blob = blobStoreContext.getBlobStore().newBlob(fileName + ".part" + part); blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + fileName + ".part" + part);
blob.setPayload(is); blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead()); blob.setContentLength(is.actualSizeToRead());
blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob); blobStoreContext.getBlobStore().putBlob(container, blob);
part++; part++;
} }
} finally { } finally {
@ -510,10 +519,12 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
} }
private void copyToDirectory(StorageMetadata metadata, Map<String, StorageMetadata> allMetadatas) throws IOException { private void copyToDirectory(StorageMetadata metadata, Map<String, StorageMetadata> allMetadatas) throws IOException {
Blob blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, metadata.getName()); String fileName = metadata.getName().substring(shardIndexDirectory.length() + 1);
Blob blob = blobStoreContext.getBlobStore().getBlob(container, metadata.getName());
byte[] buffer = new byte[16384]; byte[] buffer = new byte[16384];
IndexOutput indexOutput = store.directory().createOutput(metadata.getName()); IndexOutput indexOutput = store.directory().createOutput(fileName);
copy(blob.getContent(), indexOutput, buffer); copy(blob.getContent(), indexOutput, buffer);
blob.getContent().close(); blob.getContent().close();
@ -525,7 +536,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (!allMetadatas.containsKey(partName)) { if (!allMetadatas.containsKey(partName)) {
break; break;
} }
blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, partName); blob = blobStoreContext.getBlobStore().getBlob(container, partName);
copy(blob.getContent(), indexOutput, buffer); copy(blob.getContent(), indexOutput, buffer);
blob.getContent().close(); blob.getContent().close();
part++; part++;
@ -533,7 +544,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
indexOutput.close(); indexOutput.close();
Directories.sync(store.directory(), metadata.getName()); Directories.sync(store.directory(), fileName);
} }
private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException { private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException {