diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 483234f4acd..8fb390aa125 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -23,6 +23,7 @@
coord
cpus
datagram
+ datas
desc
deserialize
elasticsearch
diff --git a/.idea/modules/plugins-cloud.iml b/.idea/modules/plugins-cloud.iml
index d49330881c1..89cf607380b 100644
--- a/.idea/modules/plugins-cloud.iml
+++ b/.idea/modules/plugins-cloud.iml
@@ -20,7 +20,6 @@
-
@@ -44,6 +43,8 @@
+
+
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java b/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java
index 817d4d9eecb..9dca4bdf219 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/cloud/jclouds/JCloudsUtils.java
@@ -29,8 +29,6 @@ import org.elasticsearch.util.settings.Settings;
*/
public class JCloudsUtils {
- public static final String BLOB_CONTAINER_SEP = "-";
-
public static Iterable extends Module> buildModules(Settings settings) {
return ImmutableList.of(new JCloudsLoggingModule(settings));
}
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java
index 084f21e2e8a..c7a3fc30d73 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/gateway/cloud/CloudGateway.java
@@ -22,7 +22,6 @@ package org.elasticsearch.gateway.cloud;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
-import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.gateway.Gateway;
@@ -48,6 +47,8 @@ import org.jclouds.domain.Location;
import java.io.IOException;
import java.util.Set;
+import static org.jclouds.blobstore.options.ListContainerOptions.Builder.*;
+
/**
* @author kimchy (shay.banon)
*/
@@ -60,9 +61,9 @@ public class CloudGateway extends AbstractLifecycleComponent implements
private final Location location;
- private final SizeValue chunkSize;
+ private final String metaDataDirectory;
- private final String metadataContainer;
+ private final SizeValue chunkSize;
private volatile int currentIndex;
@@ -90,17 +91,13 @@ public class CloudGateway extends AbstractLifecycleComponent implements
}
}
- String container = componentSettings.get("container");
+ this.container = componentSettings.get("container");
if (container == null) {
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
}
- this.container = container + JCloudsUtils.BLOB_CONTAINER_SEP + clusterName.value();
-
- this.metadataContainer = this.container + JCloudsUtils.BLOB_CONTAINER_SEP + "metadata";
-
- logger.debug("Using location [{}], container [{}], metadata_container [{}]", this.location, this.container, metadataContainer);
-
- blobStoreContext.getBlobStore().createContainerInLocation(this.location, metadataContainer);
+ this.metaDataDirectory = clusterName.value() + "/metadata";
+ logger.debug("Using location [{}], container [{}], metadata_directory [{}]", this.location, this.container, metaDataDirectory);
+ blobStoreContext.getBlobStore().createContainerInLocation(this.location, container);
this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]");
@@ -129,7 +126,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements
@Override public void write(MetaData metaData) throws GatewayException {
try {
- String name = "metadata-" + (currentIndex + 1);
+ String name = metaDataDirectory + "/metadata-" + (currentIndex + 1);
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
builder.prettyPrint();
@@ -141,14 +138,14 @@ public class CloudGateway extends AbstractLifecycleComponent implements
blob.setPayload(new FastByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()));
blob.setContentLength(builder.unsafeBytesLength());
- blobStoreContext.getBlobStore().putBlob(metadataContainer, blob);
+ blobStoreContext.getBlobStore().putBlob(container, blob);
currentIndex++;
- PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory));
for (StorageMetadata storageMetadata : pageSet) {
- if (storageMetadata.getName().startsWith("metadata-") && !name.equals(storageMetadata.getName())) {
- blobStoreContext.getAsyncBlobStore().removeBlob(metadataContainer, storageMetadata.getName());
+ if (storageMetadata.getName().contains("metadata-") && !name.equals(storageMetadata.getName())) {
+ blobStoreContext.getAsyncBlobStore().removeBlob(container, storageMetadata.getName());
}
}
} catch (IOException e) {
@@ -161,7 +158,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements
if (currentIndex == -1)
return null;
- return readMetaData("metadata-" + currentIndex);
+ return readMetaData(metaDataDirectory + "/metadata-" + currentIndex);
} catch (GatewayException e) {
throw e;
} catch (Exception e) {
@@ -174,10 +171,10 @@ public class CloudGateway extends AbstractLifecycleComponent implements
}
@Override public void reset() {
- PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory));
for (StorageMetadata storageMetadata : pageSet) {
- if (storageMetadata.getName().startsWith("metadata-")) {
- blobStoreContext.getBlobStore().removeBlob(metadataContainer, storageMetadata.getName());
+ if (storageMetadata.getName().contains("metadata-")) {
+ blobStoreContext.getBlobStore().removeBlob(container, storageMetadata.getName());
}
}
currentIndex = -1;
@@ -185,15 +182,15 @@ public class CloudGateway extends AbstractLifecycleComponent implements
private int findLatestIndex() {
int index = -1;
- PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(metadataContainer);
+ PageSet extends StorageMetadata> pageSet = blobStoreContext.getBlobStore().list(container, inDirectory(metaDataDirectory).maxResults(1000));
for (StorageMetadata storageMetadata : pageSet) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing blob [" + storageMetadata.getName() + "]");
}
- if (!storageMetadata.getName().startsWith("metadata-")) {
+ if (!storageMetadata.getName().contains("metadata-")) {
continue;
}
- int fileIndex = Integer.parseInt(storageMetadata.getName().substring(storageMetadata.getName().indexOf('-') + 1));
+ int fileIndex = Integer.parseInt(storageMetadata.getName().substring(storageMetadata.getName().lastIndexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
@@ -210,7 +207,7 @@ public class CloudGateway extends AbstractLifecycleComponent implements
private MetaData readMetaData(String name) throws IOException {
XContentParser parser = null;
try {
- Blob blob = blobStoreContext.getBlobStore().getBlob(metadataContainer, name);
+ Blob blob = blobStoreContext.getBlobStore().getBlob(container, name);
parser = XContentFactory.xContent(XContentType.JSON).createParser(blob.getContent());
return MetaData.Builder.fromXContent(parser, settings);
} finally {
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java
index 2a84d9a33a6..6208b2e795c 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexGateway.java
@@ -22,7 +22,7 @@ package org.elasticsearch.index.gateway.cloud;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
-import org.elasticsearch.cloud.jclouds.JCloudsUtils;
+import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.cloud.CloudGateway;
import org.elasticsearch.index.AbstractIndexComponent;
@@ -48,13 +48,15 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
private final String indexContainer;
+ private final String indexDirectory;
+
private final Location location;
private final SizeValue chunkSize;
private final BlobStoreContext blobStoreContext;
- @Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, CloudBlobStoreService blobStoreService, Gateway gateway) {
+ @Inject public CloudIndexGateway(Index index, @IndexSettings Settings indexSettings, ClusterName clusterName, CloudBlobStoreService blobStoreService, Gateway gateway) {
super(index, indexSettings);
this.blobStoreContext = blobStoreService.context();
this.gateway = gateway;
@@ -66,7 +68,7 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
if (gateway instanceof CloudGateway) {
CloudGateway cloudGateway = (CloudGateway) gateway;
if (container == null) {
- container = cloudGateway.container() + JCloudsUtils.BLOB_CONTAINER_SEP + index.name();
+ container = cloudGateway.container();
}
if (chunkSize == null) {
chunkSize = cloudGateway.chunkSize();
@@ -99,11 +101,10 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
}
}
this.indexContainer = container;
+ this.indexDirectory = clusterName.value() + "/" + index.name();
this.chunkSize = chunkSize;
- logger.debug("Using location [{}], container [{}], chunk_size [{}]", this.location, this.indexContainer, this.chunkSize);
-
-// blobStoreContext.getBlobStore().createContainerInLocation(this.location, this.indexContainer);
+ logger.debug("Using location [{}], container [{}], index_directory [{}], chunk_size [{}]", this.location, this.indexContainer, this.indexDirectory, this.chunkSize);
}
public Location indexLocation() {
@@ -114,6 +115,10 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
return this.indexContainer;
}
+ public String indexDirectory() {
+ return this.indexDirectory;
+ }
+
public SizeValue chunkSize() {
return this.chunkSize;
}
@@ -126,6 +131,5 @@ public class CloudIndexGateway extends AbstractIndexComponent implements IndexGa
if (!delete) {
return;
}
-// blobStoreContext.getBlobStore().deleteContainer(indexContainer);
}
}
diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java
index e63176c8337..2d886674a78 100644
--- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java
+++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java
@@ -24,7 +24,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
-import org.elasticsearch.cloud.jclouds.JCloudsUtils;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
@@ -82,11 +81,13 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
private final Location shardLocation;
- private final String shardContainer;
+ private final String container;
- private final String shardIndexContainer;
+ private final String shardDirectory;
- private final String shardTranslogContainer;
+ private final String shardIndexDirectory;
+
+ private final String shardTranslogDirectory;
private final BlobStoreContext blobStoreContext;
@@ -105,15 +106,13 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
this.chunkSize = cloudIndexGateway.chunkSize();
this.shardLocation = cloudIndexGateway.indexLocation();
- this.shardContainer = cloudIndexGateway.indexContainer() + JCloudsUtils.BLOB_CONTAINER_SEP + shardId.id();
+ this.container = cloudIndexGateway.indexContainer();
- this.shardIndexContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "index";
- this.shardTranslogContainer = shardContainer + JCloudsUtils.BLOB_CONTAINER_SEP + "translog";
+ this.shardDirectory = cloudIndexGateway.indexDirectory() + "/" + shardId.id();
+ this.shardIndexDirectory = shardDirectory + "/index";
+ this.shardTranslogDirectory = shardDirectory + "/translog";
- logger.trace("Using location [{}], container [{}]", this.shardLocation, this.shardContainer);
-
- blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardTranslogContainer);
- blobStoreContext.getBlobStore().createContainerInLocation(this.shardLocation, this.shardIndexContainer);
+ logger.trace("Using location [{}], container [{}], shard_directory [{}]", this.shardLocation, this.container, this.shardDirectory);
}
@Override public boolean requiresSnapshotScheduling() {
@@ -125,7 +124,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (shardLocation != null) {
sb.append(shardLocation).append("/");
}
- sb.append(shardContainer).append("]");
+ sb.append(container).append("]");
return sb.toString();
}
@@ -133,8 +132,15 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (!delete) {
return;
}
- blobStoreContext.getBlobStore().deleteContainer(shardIndexContainer);
- blobStoreContext.getBlobStore().deleteContainer(shardTranslogContainer);
+
+ Map metaDatas = listAllMetadatas(container, shardIndexDirectory);
+ for (Map.Entry entry : metaDatas.entrySet()) {
+ blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
+ }
+ metaDatas = listAllMetadatas(container, shardTranslogDirectory);
+ for (Map.Entry entry : metaDatas.entrySet()) {
+ blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
+ }
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
@@ -158,7 +164,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (snapshot.indexChanged()) {
long time = System.currentTimeMillis();
indexDirty = true;
- allIndicesMetadata = listAllMetadatas(shardIndexContainer);
+ allIndicesMetadata = listAllMetadatas(container, shardIndexDirectory);
final Map allIndicesMetadataF = allIndicesMetadata;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
@@ -196,7 +202,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
} catch (IOException e) {
// ignore...
}
- deleteFile(fileName, allIndicesMetadata);
+ deleteFile(shardIndexDirectory + "/" + fileName, allIndicesMetadata);
threadPool.execute(new Runnable() {
@Override public void run() {
try {
@@ -224,7 +230,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
long translogTime = 0;
if (snapshot.newTranslogCreated()) {
currentTranslogPartToWrite = 1;
- String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
+ String translogBlobName = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
try {
long time = System.currentTimeMillis();
@@ -239,7 +245,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
- blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob);
+ blobStoreContext.getBlobStore().putBlob(container, blob);
currentTranslogPartToWrite++;
@@ -248,7 +254,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogBlobName + "]", e);
}
} else if (snapshot.sameTranslogNewOperations()) {
- String translogBlobName = String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
+ String translogBlobName = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + "." + currentTranslogPartToWrite;
try {
long time = System.currentTimeMillis();
@@ -262,7 +268,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
Blob blob = blobStoreContext.getBlobStore().newBlob(translogBlobName);
blob.setContentLength(streamOutput.size());
blob.setPayload(new FastByteArrayInputStream(streamOutput.unsafeByteArray(), 0, streamOutput.size()));
- blobStoreContext.getBlobStore().putBlob(shardTranslogContainer, blob);
+ blobStoreContext.getBlobStore().putBlob(container, blob);
currentTranslogPartToWrite++;
@@ -283,11 +289,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
IndexInput indexInput = snapshotIndexCommit.getDirectory().openInput(snapshotIndexCommit.getSegmentsFileName());
try {
- Blob blob = blobStoreContext.getBlobStore().newBlob(snapshotIndexCommit.getSegmentsFileName());
+ Blob blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + snapshotIndexCommit.getSegmentsFileName());
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, Long.MAX_VALUE);
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
- blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
+ blobStoreContext.getBlobStore().putBlob(container, blob);
} finally {
try {
indexInput.close();
@@ -303,30 +309,32 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
// delete the old translog
if (snapshot.newTranslogCreated()) {
- String currentTranslogPrefix = String.valueOf(translogSnapshot.translogId()) + ".";
- Map allMetadatas = listAllMetadatas(shardTranslogContainer);
+ String currentTranslogPrefix = shardTranslogDirectory + "/" + String.valueOf(translogSnapshot.translogId()) + ".";
+ Map allMetadatas = listAllMetadatas(container, shardTranslogDirectory);
for (Map.Entry entry : allMetadatas.entrySet()) {
if (!entry.getKey().startsWith(currentTranslogPrefix)) {
- blobStoreContext.getAsyncBlobStore().removeBlob(shardTranslogContainer, entry.getKey());
+ blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
}
}
}
if (indexDirty) {
for (Map.Entry entry : allIndicesMetadata.entrySet()) {
- String blobName = entry.getKey();
- if (blobName.contains(".part")) {
- blobName = blobName.substring(0, blobName.indexOf(".part"));
+ String blobNameToMatch = entry.getKey();
+ if (blobNameToMatch.contains(".part")) {
+ blobNameToMatch = blobNameToMatch.substring(0, blobNameToMatch.indexOf(".part"));
}
+ // remove the directory prefix
+ blobNameToMatch = blobNameToMatch.substring(shardIndexDirectory.length() + 1);
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
- if (blobName.equals(fileName)) {
+ if (blobNameToMatch.equals(fileName)) {
found = true;
break;
}
}
if (!found) {
- blobStoreContext.getAsyncBlobStore().removeBlob(shardIndexContainer, entry.getKey());
+ blobStoreContext.getAsyncBlobStore().removeBlob(container, entry.getKey());
}
}
}
@@ -337,7 +345,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
}
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
- final Map allMetaDatas = listAllMetadatas(shardIndexContainer);
+ final Map allMetaDatas = listAllMetadatas(container, shardIndexDirectory);
// filter out to only have actual files
final Map filesMetaDatas = Maps.newHashMap();
@@ -395,11 +403,12 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
- final Map allMetaDatas = listAllMetadatas(shardTranslogContainer);
+ final Map allMetaDatas = listAllMetadatas(container, shardTranslogDirectory);
long latestTranslogId = -1;
for (String name : allMetaDatas.keySet()) {
- long translogId = Long.parseLong(name.substring(0, name.indexOf('.')));
+ String translogName = name.substring(shardTranslogDirectory.length() + 1);
+ long translogId = Long.parseLong(translogName.substring(0, translogName.lastIndexOf('.')));
if (translogId > latestTranslogId) {
latestTranslogId = translogId;
}
@@ -418,11 +427,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
long size = 0;
int index = 1;
while (true) {
- String translogPartName = String.valueOf(latestTranslogId) + "." + index;
+ String translogPartName = shardTranslogDirectory + "/" + String.valueOf(latestTranslogId) + "." + index;
if (!allMetaDatas.containsKey(translogPartName)) {
break;
}
- Blob blob = blobStoreContext.getBlobStore().getBlob(shardTranslogContainer, translogPartName);
+ Blob blob = blobStoreContext.getBlobStore().getBlob(container, translogPartName);
if (blob == null) {
break;
}
@@ -443,12 +452,12 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
}
}
- private Map listAllMetadatas(String container) {
+ private Map listAllMetadatas(String container, String directory) {
final Map allMetaDatas = Maps.newHashMap();
String nextMarker = null;
while (true) {
- ListContainerOptions options = ListContainerOptions.Builder.maxResults(10000);
+ ListContainerOptions options = ListContainerOptions.Builder.inDirectory(directory).maxResults(10000);
if (nextMarker != null) {
options.afterMarker(nextMarker);
}
@@ -472,7 +481,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
blobName = blobName.substring(0, blobName.indexOf(".part"));
}
if (blobName.equals(fileName)) {
- blobStoreContext.getBlobStore().removeBlob(shardIndexContainer, blobName);
+ blobStoreContext.getBlobStore().removeBlob(container, blobName);
}
}
}
@@ -482,11 +491,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
IndexInput indexInput = dir.openInput(fileName);
try {
- Blob blob = blobStoreContext.getBlobStore().newBlob(fileName);
+ Blob blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + fileName);
InputStreamIndexInput is = new InputStreamIndexInput(indexInput, chunkSize.bytes());
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
- blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
+ blobStoreContext.getBlobStore().putBlob(container, blob);
int part = 1;
while (indexInput.getFilePointer() < indexInput.length()) {
@@ -494,10 +503,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (is.actualSizeToRead() <= 0) {
break;
}
- blob = blobStoreContext.getBlobStore().newBlob(fileName + ".part" + part);
+ blob = blobStoreContext.getBlobStore().newBlob(shardIndexDirectory + "/" + fileName + ".part" + part);
blob.setPayload(is);
blob.setContentLength(is.actualSizeToRead());
- blobStoreContext.getBlobStore().putBlob(shardIndexContainer, blob);
+ blobStoreContext.getBlobStore().putBlob(container, blob);
part++;
}
} finally {
@@ -510,10 +519,12 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
}
private void copyToDirectory(StorageMetadata metadata, Map allMetadatas) throws IOException {
- Blob blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, metadata.getName());
+ String fileName = metadata.getName().substring(shardIndexDirectory.length() + 1);
+
+ Blob blob = blobStoreContext.getBlobStore().getBlob(container, metadata.getName());
byte[] buffer = new byte[16384];
- IndexOutput indexOutput = store.directory().createOutput(metadata.getName());
+ IndexOutput indexOutput = store.directory().createOutput(fileName);
copy(blob.getContent(), indexOutput, buffer);
blob.getContent().close();
@@ -525,7 +536,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (!allMetadatas.containsKey(partName)) {
break;
}
- blob = blobStoreContext.getBlobStore().getBlob(shardIndexContainer, partName);
+ blob = blobStoreContext.getBlobStore().getBlob(container, partName);
copy(blob.getContent(), indexOutput, buffer);
blob.getContent().close();
part++;
@@ -533,7 +544,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
indexOutput.close();
- Directories.sync(store.directory(), metadata.getName());
+ Directories.sync(store.directory(), fileName);
}
private void copy(InputStream is, IndexOutput indexOutput, byte[] buffer) throws IOException {