parent
749d39061a
commit
b5f05f676c
|
@ -82,11 +82,6 @@ public class URLBlobContainer extends AbstractBlobContainer {
|
|||
throw new UnsupportedOperationException("URL repository doesn't support this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String from, String to) throws IOException {
|
||||
throw new UnsupportedOperationException("URL repository doesn't support this operation");
|
||||
}
|
||||
|
||||
/**
|
||||
* This operation is not supported by URLBlobContainer
|
||||
*/
|
||||
|
|
|
@ -33,7 +33,6 @@ import java.util.Objects;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
||||
|
@ -159,44 +158,13 @@ public class AzureStorageTestServer {
|
|||
objectsPaths("PUT " + endpoint + "/{container}").forEach(path ->
|
||||
handlers.insert(path, (params, headers, body, requestId) -> {
|
||||
final String destContainerName = params.get("container");
|
||||
final String destBlobName = objectName(params);
|
||||
|
||||
final Container destContainer =containers.get(destContainerName);
|
||||
if (destContainer == null) {
|
||||
return newContainerNotFoundError(requestId);
|
||||
}
|
||||
|
||||
final String destBlobName = objectName(params);
|
||||
|
||||
// Request is a copy request
|
||||
List<String> headerCopySource = headers.getOrDefault("x-ms-copy-source", emptyList());
|
||||
if (headerCopySource.isEmpty() == false) {
|
||||
String srcBlobName = headerCopySource.get(0);
|
||||
|
||||
Container srcContainer = null;
|
||||
for (Container container : containers.values()) {
|
||||
String prefix = endpoint + "/" + container.name + "/";
|
||||
if (srcBlobName.startsWith(prefix)) {
|
||||
srcBlobName = srcBlobName.replaceFirst(prefix, "");
|
||||
srcContainer = container;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (srcContainer == null || srcContainer.objects.containsKey(srcBlobName) == false) {
|
||||
return newBlobNotFoundError(requestId);
|
||||
}
|
||||
|
||||
byte[] bytes = srcContainer.objects.get(srcBlobName);
|
||||
if (bytes != null) {
|
||||
destContainer.objects.put(destBlobName, bytes);
|
||||
return new Response(RestStatus.ACCEPTED, singletonMap("x-ms-copy-status", "success"), "text/plain", EMPTY_BYTE);
|
||||
} else {
|
||||
return newBlobNotFoundError(requestId);
|
||||
}
|
||||
} else {
|
||||
destContainer.objects.put(destBlobName, body);
|
||||
}
|
||||
|
||||
destContainer.objects.put(destBlobName, body);
|
||||
return new Response(RestStatus.CREATED, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
})
|
||||
);
|
||||
|
|
|
@ -127,22 +127,6 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
logger.trace("move({}, {})", sourceBlobName, targetBlobName);
|
||||
try {
|
||||
String source = keyPath + sourceBlobName;
|
||||
String target = keyPath + targetBlobName;
|
||||
|
||||
logger.debug("moving blob [{}] to [{}] in container {{}}", source, target, blobStore);
|
||||
|
||||
blobStore.moveBlob(source, target);
|
||||
} catch (URISyntaxException | StorageException e) {
|
||||
logger.warn("can not move blob [{}] to [{}] in container {{}}: {}", sourceBlobName, targetBlobName, blobStore, e.getMessage());
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
||||
logger.trace("listBlobs()");
|
||||
|
|
|
@ -97,31 +97,23 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
|
|||
return this.client.doesContainerExist(this.clientName, this.locMode, container);
|
||||
}
|
||||
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException
|
||||
{
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException {
|
||||
return this.client.blobExists(this.clientName, this.locMode, container, blob);
|
||||
}
|
||||
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException
|
||||
{
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException {
|
||||
this.client.deleteBlob(this.clientName, this.locMode, container, blob);
|
||||
}
|
||||
|
||||
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException
|
||||
{
|
||||
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
return this.client.getInputStream(this.clientName, this.locMode, container, blob);
|
||||
}
|
||||
|
||||
public Map<String,BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException {
|
||||
return this.client.listBlobsByPrefix(this.clientName, this.locMode, container, keyPath, prefix);
|
||||
}
|
||||
|
||||
public void moveBlob(String sourceBlob, String targetBlob) throws URISyntaxException, StorageException
|
||||
{
|
||||
this.client.moveBlob(this.clientName, this.locMode, container, sourceBlob, targetBlob);
|
||||
}
|
||||
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException {
|
||||
this.client.writeBlob(this.clientName, this.locMode, container, blobName, inputStream, blobSize);
|
||||
}
|
||||
|
|
|
@ -57,9 +57,6 @@ public interface AzureStorageService {
|
|||
Map<String,BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException;
|
||||
|
||||
void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob)
|
||||
throws URISyntaxException, StorageException;
|
||||
|
||||
void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws
|
||||
URISyntaxException, StorageException;
|
||||
|
||||
|
|
|
@ -287,24 +287,6 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|||
return blobsBuilder.immutableMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob)
|
||||
throws URISyntaxException, StorageException {
|
||||
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}]", container, sourceBlob, targetBlob);
|
||||
|
||||
CloudBlobClient client = this.getSelectedClient(account, mode);
|
||||
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
||||
CloudBlockBlob blobSource = blobContainer.getBlockBlobReference(sourceBlob);
|
||||
if (SocketAccess.doPrivilegedException(() -> blobSource.exists(null, null, generateOperationContext(account)))) {
|
||||
CloudBlockBlob blobTarget = blobContainer.getBlockBlobReference(targetBlob);
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
blobTarget.startCopy(blobSource, null, null, null, generateOperationContext(account));
|
||||
blobSource.delete(DeleteSnapshotsOption.NONE, null, null, generateOperationContext(account));
|
||||
});
|
||||
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
|
||||
throws URISyntaxException, StorageException {
|
||||
|
|
|
@ -106,18 +106,6 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
|
|||
return blobsBuilder.immutableMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob)
|
||||
throws URISyntaxException, StorageException {
|
||||
for (String blobName : blobs.keySet()) {
|
||||
if (endsWithIgnoreCase(blobName, sourceBlob)) {
|
||||
ByteArrayOutputStream outputStream = blobs.get(blobName);
|
||||
blobs.put(blobName.replace(sourceBlob, targetBlob), outputStream);
|
||||
blobs.remove(blobName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
|
||||
throws URISyntaxException, StorageException {
|
||||
|
@ -137,7 +125,7 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
|
|||
* @param prefix the prefix to look for
|
||||
* @see java.lang.String#startsWith
|
||||
*/
|
||||
public static boolean startsWithIgnoreCase(String str, String prefix) {
|
||||
private static boolean startsWithIgnoreCase(String str, String prefix) {
|
||||
if (str == null || prefix == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -152,29 +140,6 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
|
|||
return lcStr.equals(lcPrefix);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the given String ends with the specified suffix,
|
||||
* ignoring upper/lower case.
|
||||
*
|
||||
* @param str the String to check
|
||||
* @param suffix the suffix to look for
|
||||
* @see java.lang.String#startsWith
|
||||
*/
|
||||
public static boolean endsWithIgnoreCase(String str, String suffix) {
|
||||
if (str == null || suffix == null) {
|
||||
return false;
|
||||
}
|
||||
if (str.endsWith(suffix)) {
|
||||
return true;
|
||||
}
|
||||
if (str.length() < suffix.length()) {
|
||||
return false;
|
||||
}
|
||||
String lcStr = str.substring(0, suffix.length()).toLowerCase(Locale.ROOT);
|
||||
String lcPrefix = suffix.toLowerCase(Locale.ROOT);
|
||||
return lcStr.equals(lcPrefix);
|
||||
}
|
||||
|
||||
private static class PermissionRequiringInputStream extends ByteArrayInputStream {
|
||||
|
||||
private PermissionRequiringInputStream(byte[] buf) {
|
||||
|
|
|
@ -367,47 +367,6 @@ public class GoogleCloudStorageTestServer {
|
|||
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, objectId, body));
|
||||
});
|
||||
|
||||
// Rewrite or Copy Object
|
||||
//
|
||||
// https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite
|
||||
// https://cloud.google.com/storage/docs/json_api/v1/objects/copy
|
||||
handlers.insert("POST " + endpoint + "/storage/v1/b/{srcBucket}/o/{src}/{action}/b/{destBucket}/o/{dest}",
|
||||
(params, headers, body) -> {
|
||||
final String action = params.get("action");
|
||||
if ((action.equals("rewriteTo") == false) && (action.equals("copyTo") == false)) {
|
||||
return newError(RestStatus.INTERNAL_SERVER_ERROR, "Action not implemented. None of \"rewriteTo\" or \"copyTo\".");
|
||||
}
|
||||
final String source = params.get("src");
|
||||
if (Strings.hasText(source) == false) {
|
||||
return newError(RestStatus.INTERNAL_SERVER_ERROR, "source object name is missing");
|
||||
}
|
||||
final Bucket srcBucket = buckets.get(params.get("srcBucket"));
|
||||
if (srcBucket == null) {
|
||||
return newError(RestStatus.NOT_FOUND, "source bucket not found");
|
||||
}
|
||||
final String dest = params.get("dest");
|
||||
if (Strings.hasText(dest) == false) {
|
||||
return newError(RestStatus.INTERNAL_SERVER_ERROR, "destination object name is missing");
|
||||
}
|
||||
final Bucket destBucket = buckets.get(params.get("destBucket"));
|
||||
if (destBucket == null) {
|
||||
return newError(RestStatus.NOT_FOUND, "destination bucket not found");
|
||||
}
|
||||
final byte[] sourceBytes = srcBucket.objects.get(source);
|
||||
if (sourceBytes == null) {
|
||||
return newError(RestStatus.NOT_FOUND, "source object not found");
|
||||
}
|
||||
destBucket.objects.put(dest, sourceBytes);
|
||||
if (action.equals("rewriteTo")) {
|
||||
final XContentBuilder respBuilder = jsonBuilder();
|
||||
buildRewriteResponse(respBuilder, destBucket.name, dest, sourceBytes.length);
|
||||
return newResponse(RestStatus.OK, emptyMap(), respBuilder);
|
||||
} else {
|
||||
assert action.equals("copyTo");
|
||||
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(destBucket.name, dest, sourceBytes));
|
||||
}
|
||||
});
|
||||
|
||||
// List Objects
|
||||
//
|
||||
// https://cloud.google.com/storage/docs/json_api/v1/objects/list
|
||||
|
@ -701,28 +660,4 @@ public class GoogleCloudStorageTestServer {
|
|||
.field("size", String.valueOf(bytes.length))
|
||||
.endObject();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the rewrite response as defined by
|
||||
* https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite
|
||||
*/
|
||||
private static XContentBuilder buildRewriteResponse(final XContentBuilder builder,
|
||||
final String destBucket,
|
||||
final String dest,
|
||||
final int byteSize) throws IOException {
|
||||
builder.startObject()
|
||||
.field("kind", "storage#rewriteResponse")
|
||||
.field("totalBytesRewritten", String.valueOf(byteSize))
|
||||
.field("objectSize", String.valueOf(byteSize))
|
||||
.field("done", true)
|
||||
.startObject("resource")
|
||||
.field("kind", "storage#object")
|
||||
.field("id", String.join("/", destBucket, dest))
|
||||
.field("name", dest)
|
||||
.field("bucket", destBucket)
|
||||
.field("size", String.valueOf(byteSize))
|
||||
.endObject()
|
||||
.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.Map;
|
||||
|
||||
class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
||||
|
@ -74,11 +73,6 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
|||
blobStore.deleteBlob(buildKey(blobName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
blobStore.moveBlob(buildKey(sourceBlobName), buildKey(targetBlobName));
|
||||
}
|
||||
|
||||
protected String buildKey(String blobName) {
|
||||
assert blobName != null;
|
||||
return path + blobName;
|
||||
|
|
|
@ -27,7 +27,6 @@ import com.google.cloud.storage.BlobInfo;
|
|||
import com.google.cloud.storage.Bucket;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.Storage.BlobListOption;
|
||||
import com.google.cloud.storage.Storage.CopyRequest;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
|
@ -314,29 +313,6 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves a blob within the same bucket
|
||||
*
|
||||
* @param sourceBlobName name of the blob to move
|
||||
* @param targetBlobName new name of the blob in the same bucket
|
||||
*/
|
||||
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);
|
||||
final BlobId targetBlobId = BlobId.of(bucket, targetBlobName);
|
||||
final CopyRequest request = CopyRequest.newBuilder()
|
||||
.setSource(sourceBlobId)
|
||||
.setTarget(targetBlobId)
|
||||
.build();
|
||||
SocketAccess.doPrivilegedVoidIOException(() -> {
|
||||
// There's no atomic "move" in GCS so we need to copy and delete
|
||||
storage.copy(request).getResult();
|
||||
final boolean deleted = storage.delete(sourceBlobId);
|
||||
if (deleted == false) {
|
||||
throw new IOException("Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static String buildKey(String keyPath, String s) {
|
||||
assert s != null;
|
||||
return keyPath + s;
|
||||
|
|
|
@ -21,8 +21,6 @@ package com.google.cloud.storage;
|
|||
|
||||
import com.google.cloud.storage.spi.v1.StorageRpc;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Utility class that exposed Google SDK package protected methods to
|
||||
* create specific StorageRpc objects in unit tests.
|
||||
|
@ -42,13 +40,4 @@ public class StorageRpcOptionUtils {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static CopyWriter createCopyWriter(final Blob result) {
|
||||
return new CopyWriter(mock(StorageOptions.class), mock(StorageRpc.RewriteResponse.class)) {
|
||||
@Override
|
||||
public Blob getResult() {
|
||||
return result;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,7 +38,6 @@ import com.google.cloud.storage.StorageException;
|
|||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.google.cloud.storage.StorageRpcOptionUtils;
|
||||
import com.google.cloud.storage.StorageTestUtils;
|
||||
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -125,24 +124,6 @@ class MockStorage implements Storage {
|
|||
return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CopyWriter copy(CopyRequest copyRequest) {
|
||||
if (bucketName.equals(copyRequest.getSource().getBucket()) == false) {
|
||||
throw new StorageException(404, "Source bucket not found");
|
||||
}
|
||||
if (bucketName.equals(copyRequest.getTarget().getBucket()) == false) {
|
||||
throw new StorageException(404, "Target bucket not found");
|
||||
}
|
||||
|
||||
final byte[] bytes = blobs.get(copyRequest.getSource().getName());
|
||||
if (bytes == null) {
|
||||
throw new StorageException(404, "Source blob does not exist");
|
||||
}
|
||||
blobs.put(copyRequest.getTarget().getName(), bytes);
|
||||
return StorageRpcOptionUtils
|
||||
.createCopyWriter(get(BlobId.of(copyRequest.getTarget().getBucket(), copyRequest.getTarget().getName())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<Blob> list(String bucket, BlobListOption... options) {
|
||||
if (bucketName.equals(bucket) == false) {
|
||||
|
@ -269,6 +250,11 @@ class MockStorage implements Storage {
|
|||
|
||||
// Everything below this line is not implemented.
|
||||
|
||||
@Override
|
||||
public CopyWriter copy(CopyRequest copyRequest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) {
|
||||
return null;
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
|
@ -37,9 +36,6 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -79,14 +75,6 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
store.execute((Operation<Void>) fileContext -> {
|
||||
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String blobName) throws IOException {
|
||||
// FSDataInputStream does buffering internally
|
||||
|
|
|
@ -162,10 +162,9 @@ public class AmazonS3TestServer {
|
|||
})
|
||||
);
|
||||
|
||||
// PUT Object & PUT Object Copy
|
||||
// PUT Object
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html
|
||||
objectsPaths("PUT " + endpoint + "/{bucket}").forEach(path ->
|
||||
handlers.insert(path, (params, headers, body, id) -> {
|
||||
final String destBucketName = params.get("bucket");
|
||||
|
@ -177,65 +176,38 @@ public class AmazonS3TestServer {
|
|||
|
||||
final String destObjectName = objectName(params);
|
||||
|
||||
// Request is a copy request
|
||||
List<String> headerCopySource = headers.getOrDefault("x-amz-copy-source", emptyList());
|
||||
if (headerCopySource.isEmpty() == false) {
|
||||
String srcObjectName = headerCopySource.get(0);
|
||||
// This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
|
||||
// to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||
//
|
||||
List<String> headerDecodedContentLength = headers.getOrDefault("X-amz-decoded-content-length", emptyList());
|
||||
if (headerDecodedContentLength.size() == 1) {
|
||||
int contentLength = Integer.valueOf(headerDecodedContentLength.get(0));
|
||||
|
||||
Bucket srcBucket = null;
|
||||
for (Bucket bucket : buckets.values()) {
|
||||
String prefix = "/" + bucket.name + "/";
|
||||
if (srcObjectName.startsWith(prefix)) {
|
||||
srcObjectName = srcObjectName.replaceFirst(prefix, "");
|
||||
srcBucket = bucket;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (srcBucket == null || srcBucket.objects.containsKey(srcObjectName) == false) {
|
||||
return newObjectNotFoundError(id, srcObjectName);
|
||||
}
|
||||
|
||||
byte[] bytes = srcBucket.objects.get(srcObjectName);
|
||||
if (bytes != null) {
|
||||
destBucket.objects.put(destObjectName, bytes);
|
||||
return newCopyResultResponse(id);
|
||||
} else {
|
||||
return newObjectNotFoundError(id, srcObjectName);
|
||||
}
|
||||
} else {
|
||||
// This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
|
||||
// to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
|
||||
// Chunked requests have a payload like this:
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||
// 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8
|
||||
// ... bytes of data ....
|
||||
// 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4
|
||||
//
|
||||
List<String> headerDecodedContentLength = headers.getOrDefault("X-amz-decoded-content-length", emptyList());
|
||||
if (headerDecodedContentLength.size() == 1) {
|
||||
int contentLength = Integer.valueOf(headerDecodedContentLength.get(0));
|
||||
|
||||
// Chunked requests have a payload like this:
|
||||
//
|
||||
// 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8
|
||||
// ... bytes of data ....
|
||||
// 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4
|
||||
//
|
||||
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(body))) {
|
||||
int b;
|
||||
// Moves to the end of the first signature line
|
||||
while ((b = inputStream.read()) != -1) {
|
||||
if (b == '\n') {
|
||||
break;
|
||||
}
|
||||
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(body))) {
|
||||
int b;
|
||||
// Moves to the end of the first signature line
|
||||
while ((b = inputStream.read()) != -1) {
|
||||
if (b == '\n') {
|
||||
break;
|
||||
}
|
||||
|
||||
final byte[] bytes = new byte[contentLength];
|
||||
inputStream.read(bytes, 0, contentLength);
|
||||
|
||||
destBucket.objects.put(destObjectName, bytes);
|
||||
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
}
|
||||
|
||||
final byte[] bytes = new byte[contentLength];
|
||||
inputStream.read(bytes, 0, contentLength);
|
||||
|
||||
destBucket.objects.put(destObjectName, bytes);
|
||||
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
}
|
||||
}
|
||||
|
||||
return newInternalError(id, "Something is wrong with this PUT request");
|
||||
})
|
||||
);
|
||||
|
@ -466,20 +438,6 @@ public class AmazonS3TestServer {
|
|||
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 Copy Result Response
|
||||
*/
|
||||
private static Response newCopyResultResponse(final long requestId) {
|
||||
final String id = Long.toString(requestId);
|
||||
final StringBuilder response = new StringBuilder();
|
||||
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
response.append("<CopyObjectResult>");
|
||||
response.append("<LastModified>").append(DateUtils.formatISO8601Date(new Date())).append("</LastModified>");
|
||||
response.append("<ETag>").append(requestId).append("</ETag>");
|
||||
response.append("</CopyObjectResult>");
|
||||
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 DeleteResult Response
|
||||
*/
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.amazonaws.services.s3.AmazonS3;
|
|||
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -152,28 +151,6 @@ class S3BlobContainer extends AbstractBlobContainer {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
try {
|
||||
CopyObjectRequest request = new CopyObjectRequest(blobStore.bucket(), buildKey(sourceBlobName),
|
||||
blobStore.bucket(), buildKey(targetBlobName));
|
||||
|
||||
if (blobStore.serverSideEncryption()) {
|
||||
ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||
request.setNewObjectMetadata(objectMetadata);
|
||||
}
|
||||
|
||||
SocketAccess.doPrivilegedVoid(() -> {
|
||||
blobStore.client().copyObject(request);
|
||||
blobStore.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName));
|
||||
});
|
||||
|
||||
} catch (AmazonS3Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
||||
return listBlobsByPrefix(null);
|
||||
|
|
|
@ -23,8 +23,6 @@ import com.amazonaws.AmazonClientException;
|
|||
import com.amazonaws.SdkClientException;
|
||||
import com.amazonaws.services.s3.AbstractAmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.CopyObjectResult;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
||||
|
@ -148,24 +146,6 @@ class MockAmazonS3 extends AbstractAmazonS3 {
|
|||
return listing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CopyObjectResult copyObject(final CopyObjectRequest request) throws AmazonClientException {
|
||||
assertThat(request.getSourceBucketName(), equalTo(bucket));
|
||||
assertThat(request.getDestinationBucketName(), equalTo(bucket));
|
||||
|
||||
final String sourceBlobName = request.getSourceKey();
|
||||
|
||||
final byte[] content = blobs.get(sourceBlobName);
|
||||
if (content == null) {
|
||||
AmazonS3Exception exception = new AmazonS3Exception("[" + sourceBlobName + "] does not exist.");
|
||||
exception.setStatusCode(404);
|
||||
throw exception;
|
||||
}
|
||||
|
||||
blobs.put(request.getDestinationKey(), content);
|
||||
return new CopyObjectResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteObject(final DeleteObjectRequest request) throws AmazonClientException {
|
||||
assertThat(request.getBucketName(), equalTo(bucket));
|
||||
|
|
|
@ -142,20 +142,4 @@ public interface BlobContainer {
|
|||
* @throws IOException if there were any failures in reading from the blob container.
|
||||
*/
|
||||
Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;
|
||||
|
||||
/**
|
||||
* Renames the source blob into the target blob. If the source blob does not exist or the
|
||||
* target blob already exists, an exception is thrown. Atomicity of the move operation
|
||||
* can only be guaranteed on an implementation-by-implementation basis. The only current
|
||||
* implementation of {@link BlobContainer} for which atomicity can be guaranteed is the
|
||||
* {@link org.elasticsearch.common.blobstore.fs.FsBlobContainer}.
|
||||
*
|
||||
* @param sourceBlobName
|
||||
* The blob to rename.
|
||||
* @param targetBlobName
|
||||
* The name of the blob after the renaming.
|
||||
* @throws IOException if the source blob does not exist, the target blob already exists,
|
||||
* or there were any failures in reading from the blob container.
|
||||
*/
|
||||
void move(String sourceBlobName, String targetBlobName) throws IOException;
|
||||
}
|
||||
|
|
|
@ -142,14 +142,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
Streams.copy(inputStream, outputStream);
|
||||
}
|
||||
IOUtils.fsync(tempBlobPath, false);
|
||||
|
||||
final Path blobPath = path.resolve(blobName);
|
||||
// If the target file exists then Files.move() behaviour is implementation specific
|
||||
// the existing file might be replaced or this method fails by throwing an IOException.
|
||||
if (Files.exists(blobPath)) {
|
||||
throw new FileAlreadyExistsException("blob [" + blobPath + "] already exists, cannot overwrite");
|
||||
}
|
||||
Files.move(tempBlobPath, blobPath, StandardCopyOption.ATOMIC_MOVE);
|
||||
moveBlobAtomic(tempBlob, blobName);
|
||||
} catch (IOException ex) {
|
||||
try {
|
||||
deleteBlobIgnoringIfNotExists(tempBlob);
|
||||
|
@ -162,6 +155,17 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
}
|
||||
|
||||
public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName) throws IOException {
|
||||
final Path sourceBlobPath = path.resolve(sourceBlobName);
|
||||
final Path targetBlobPath = path.resolve(targetBlobName);
|
||||
// If the target file exists then Files.move() behaviour is implementation specific
|
||||
// the existing file might be replaced or this method fails by throwing an IOException.
|
||||
if (Files.exists(targetBlobPath)) {
|
||||
throw new FileAlreadyExistsException("blob [" + targetBlobPath + "] already exists, cannot overwrite");
|
||||
}
|
||||
Files.move(sourceBlobPath, targetBlobPath, StandardCopyOption.ATOMIC_MOVE);
|
||||
}
|
||||
|
||||
public static String tempBlobName(final String blobName) {
|
||||
return "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
|
||||
}
|
||||
|
@ -174,17 +178,4 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
public static boolean isTempBlobName(final String blobName) {
|
||||
return blobName.startsWith(TEMP_FILE_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String source, String target) throws IOException {
|
||||
Path sourcePath = path.resolve(source);
|
||||
Path targetPath = path.resolve(target);
|
||||
// If the target file exists then Files.move() behaviour is implementation specific
|
||||
// the existing file might be replaced or this method fails by throwing an IOException.
|
||||
if (Files.exists(targetPath)) {
|
||||
throw new FileAlreadyExistsException("blob [" + targetPath + "] already exists, cannot overwrite");
|
||||
}
|
||||
Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
|
||||
IOUtils.fsync(path, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3103,7 +3103,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
.put("location", repoPath)
|
||||
.put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)
|
||||
// test that we can take a snapshot after a failed one, even if a partial index-N was written
|
||||
.put("atomic_move", false)
|
||||
.put("allow_atomic_operations", false)
|
||||
.put("random", randomAlphaOfLength(10))));
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
|
|
|
@ -77,9 +77,4 @@ public class BlobContainerWrapper implements BlobContainer {
|
|||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||
return delegate.listBlobsByPrefix(blobNamePrefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
delegate.move(sourceBlobName, targetBlobName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ public class MockRepository extends FsRepository {
|
|||
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
|
||||
private volatile boolean blockAndFailOnWriteSnapFile;
|
||||
|
||||
private volatile boolean atomicMove;
|
||||
private volatile boolean allowAtomicOperations;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
|
||||
|
@ -126,7 +126,7 @@ public class MockRepository extends FsRepository {
|
|||
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
|
||||
randomPrefix = metadata.settings().get("random", "default");
|
||||
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
|
||||
atomicMove = metadata.settings().getAsBoolean("atomic_move", true);
|
||||
allowAtomicOperations = metadata.settings().getAsBoolean("allow_atomic_operations", true);
|
||||
logger.info("starting mock repository with random prefix {}", randomPrefix);
|
||||
mockBlobStore = new MockBlobStore(super.blobStore());
|
||||
}
|
||||
|
@ -345,24 +345,6 @@ public class MockRepository extends FsRepository {
|
|||
return super.listBlobsByPrefix(blobNamePrefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlob, String targetBlob) throws IOException {
|
||||
if (blockOnWriteIndexFile && targetBlob.startsWith("index-")) {
|
||||
blockExecutionAndMaybeWait(targetBlob);
|
||||
}
|
||||
if (atomicMove) {
|
||||
// atomic move since this inherits from FsBlobContainer which provides atomic moves
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
super.move(sourceBlob, targetBlob);
|
||||
} else {
|
||||
// simulate a non-atomic move, since many blob container implementations
|
||||
// will not have an atomic move, and we should be able to handle that
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
|
||||
super.deleteBlob(sourceBlob);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
|
@ -377,14 +359,14 @@ public class MockRepository extends FsRepository {
|
|||
@Override
|
||||
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
|
||||
final Random random = RandomizedContext.current().getRandom();
|
||||
if (random.nextBoolean()) {
|
||||
if (allowAtomicOperations && random.nextBoolean()) {
|
||||
if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
|
||||
// Simulate a failure between the write and move operation in FsBlobContainer
|
||||
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
|
||||
super.writeBlob(tempBlobName, inputStream, blobSize);
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
|
||||
fsBlobContainer.move(tempBlobName, blobName);
|
||||
fsBlobContainer.moveBlobAtomic(tempBlobName, blobName);
|
||||
} else {
|
||||
// Atomic write since it is potentially supported
|
||||
// by the delegating blob container
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
|
||||
import static org.elasticsearch.repositories.ESBlobStoreTestCase.readBlobFully;
|
||||
import static org.elasticsearch.repositories.ESBlobStoreTestCase.writeRandomBlob;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
|
@ -73,7 +72,7 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testMoveAndList() throws IOException {
|
||||
public void testList() throws IOException {
|
||||
try(BlobStore store = newBlobStore()) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
assertThat(container.listBlobs().size(), equalTo(0));
|
||||
|
@ -109,15 +108,6 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
assertThat(container.listBlobsByPrefix("foo-").size(), equalTo(numberOfFooBlobs));
|
||||
assertThat(container.listBlobsByPrefix("bar-").size(), equalTo(numberOfBarBlobs));
|
||||
assertThat(container.listBlobsByPrefix("baz-").size(), equalTo(0));
|
||||
|
||||
String newName = "bar-new";
|
||||
// Move to a new location
|
||||
container.move(name, newName);
|
||||
assertThat(container.listBlobsByPrefix(name).size(), equalTo(0));
|
||||
blobs = container.listBlobsByPrefix(newName);
|
||||
assertThat(blobs.size(), equalTo(1));
|
||||
assertThat(blobs.get(newName).length(), equalTo(generatedBlobs.get(name)));
|
||||
assertThat(data, equalTo(readBlobFully(container, newName, length)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue