more work on cloud gateway

This commit is contained in:
kimchy 2010-07-17 15:20:13 +03:00
parent 6194b3ab0f
commit 08ca383fd5
4 changed files with 23 additions and 38 deletions

View File

@ -107,6 +107,7 @@
<w>startup</w>
<w>stopwords</w>
<w>streamable</w>
<w>substring</w>
<w>successul</w>
<w>tagline</w>
<w>threadpool</w>

View File

@ -86,6 +86,8 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl
part++;
if (container.blobExists(blobName + ".a" + part)) {
container.readBlob(blobName + ".a" + part, this);
} else {
listener.onCompleted();
}
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.cloud.blobstore;
import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
@ -33,7 +33,6 @@ import org.jclouds.blobstore.options.ListContainerOptions;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
@ -60,13 +59,17 @@ public class AbstractCloudBlobContainer extends AbstractBlobContainer {
return cloudBlobStore.sync().blobExists(cloudBlobStore.container(), buildBlobPath(blobName));
}
@Override public void readBlob(String blobName, final ReadBlobListener listener) {
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
final ListenableFuture<? extends Blob> future = cloudBlobStore.async().getBlob(cloudBlobStore.container(), buildBlobPath(blobName));
future.addListener(new Runnable() {
@Override public void run() {
Blob blob;
try {
blob = future.get();
if (blob == null) {
listener.onFailure(new BlobStoreException("No blob found for [" + buildBlobPath(blobName) + "]"));
return;
}
} catch (InterruptedException e) {
listener.onFailure(e);
return;
@ -94,20 +97,23 @@ public class AbstractCloudBlobContainer extends AbstractBlobContainer {
}, cloudBlobStore.executorService());
}
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(blobNamePrefix));
ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
for (StorageMetadata storageMetadata : list) {
blobs.put(storageMetadata.getName(), new PlainBlobMetaData(storageMetadata.getName(), storageMetadata.getSize(), null));
}
return blobs.build();
}
// inDirectory expects a directory, not a blob prefix
// @Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
// PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(buildBlobPath(blobNamePrefix)));
// ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
// for (StorageMetadata storageMetadata : list) {
// String name = storageMetadata.getName().substring(cloudPath.length() + 1);
// blobs.put(name, new PlainBlobMetaData(name, storageMetadata.getSize(), null));
// }
// return blobs.build();
// }
@Override public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive());
PageSet<? extends StorageMetadata> list = cloudBlobStore.sync().list(cloudBlobStore.container(), ListContainerOptions.Builder.recursive().inDirectory(cloudPath));
ImmutableMap.Builder<String, BlobMetaData> blobs = ImmutableMap.builder();
for (StorageMetadata storageMetadata : list) {
blobs.put(storageMetadata.getName(), new PlainBlobMetaData(storageMetadata.getName(), storageMetadata.getSize(), null));
String name = storageMetadata.getName().substring(cloudPath.length() + 1);
blobs.put(name, new PlainBlobMetaData(name, storageMetadata.getSize(), null));
}
return blobs.build();
}
@ -115,28 +121,4 @@ public class AbstractCloudBlobContainer extends AbstractBlobContainer {
protected String buildBlobPath(String blobName) {
return cloudPath + "/" + blobName;
}
private Map<String, StorageMetadata> list(String container, String prefix) {
final Map<String, StorageMetadata> allMetaDatas = Maps.newHashMap();
String nextMarker = null;
while (true) {
ListContainerOptions options = ListContainerOptions.Builder.recursive();
if (prefix != null) {
options = options.inDirectory(prefix);
}
if (nextMarker != null) {
options.afterMarker(nextMarker);
}
PageSet<? extends StorageMetadata> pageSet = cloudBlobStore.sync().list(container, options);
for (StorageMetadata metadata : pageSet) {
allMetaDatas.put(metadata.getName(), metadata);
}
nextMarker = pageSet.getNextMarker();
if (nextMarker == null) {
break;
}
}
return allMetaDatas;
}
}

View File

@ -39,7 +39,7 @@ public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer impl
}
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes, final WriterListener listener) {
Blob blob = cloudBlobStore.sync().newBlob(blobName);
Blob blob = cloudBlobStore.sync().newBlob(buildBlobPath(blobName));
blob.setPayload(is);
blob.setContentLength(sizeInBytes);
final ListenableFuture<String> future = cloudBlobStore.async().putBlob(cloudBlobStore.container(), blob);