* Add Ability to List Child Containers to BlobContainer (#42653) * Add Ability to List Child Containers to BlobContainer * This is a prerequisite of #42189
This commit is contained in:
parent
9077c4402f
commit
455b12a4fb
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.common.blobstore.url;
|
||||
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
|
@ -74,6 +75,11 @@ public class URLBlobContainer extends AbstractBlobContainer {
|
|||
throw new UnsupportedOperationException("URL repository doesn't support this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
throw new UnsupportedOperationException("URL repository doesn't support this operation");
|
||||
}
|
||||
|
||||
/**
|
||||
* This operation is not supported by URLBlobContainer
|
||||
*/
|
||||
|
|
|
@ -116,7 +116,7 @@ public class URLRepository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected BlobPath basePath() {
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionRunnable;
|
|||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
|
@ -169,6 +170,16 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
return listBlobsByPrefix(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
final BlobPath path = path();
|
||||
try {
|
||||
return blobStore.children(path);
|
||||
} catch (URISyntaxException | StorageException e) {
|
||||
throw new IOException("Failed to list children in path [" + path.buildAsString() + "].", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected String buildKey(String blobName) {
|
||||
return keyPath + (blobName == null ? "" : blobName);
|
||||
}
|
||||
|
|
|
@ -34,7 +34,10 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
|
@ -97,6 +100,11 @@ public class AzureBlobStore implements BlobStore {
|
|||
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
|
||||
}
|
||||
|
||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
|
||||
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
|
||||
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
|
||||
}
|
||||
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
|
||||
|
|
|
@ -122,7 +122,7 @@ public class AzureRepository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected BlobPath basePath() {
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,8 +29,10 @@ import com.microsoft.azure.storage.StorageException;
|
|||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
|
@ -39,6 +41,7 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -54,8 +57,11 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -209,15 +215,40 @@ public class AzureStorageService {
|
|||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
|
||||
final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
|
||||
if (blobItem instanceof CloudBlob) {
|
||||
final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
|
||||
final String name = blobPath.substring(keyPath.length());
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
|
||||
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
|
||||
}
|
||||
}
|
||||
});
|
||||
return blobsBuilder.immutableMap();
|
||||
}
|
||||
|
||||
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
|
||||
final Set<String> blobsBuilder = new HashSet<>();
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final String keyPath = path.buildAsString();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
|
||||
if (blobItem instanceof CloudBlobDirectory) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
|
||||
// Lastly, we add the length of keyPath to the offset to strip this container's path.
|
||||
final String uriPath = uri.getPath();
|
||||
blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
|
||||
}
|
||||
}
|
||||
});
|
||||
return Collections.unmodifiableSet(blobsBuilder);
|
||||
}
|
||||
|
||||
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
|
||||
boolean failIfAlreadyExists)
|
||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||
|
@ -55,6 +56,11 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
|||
return blobStore.listBlobs(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
return blobStore.listChildren(path());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String prefix) throws IOException {
|
||||
return blobStore.listBlobsByPrefix(path, prefix);
|
||||
|
|
|
@ -142,6 +142,23 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
return mapBuilder.immutableMap();
|
||||
}
|
||||
|
||||
Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
|
||||
final String pathStr = path.buildAsString();
|
||||
final MapBuilder<String, BlobContainer> mapBuilder = MapBuilder.newMapBuilder();
|
||||
SocketAccess.doPrivilegedVoidIOException
|
||||
(() -> client().get(bucketName).list(BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)).iterateAll().forEach(
|
||||
blob -> {
|
||||
if (blob.isDirectory()) {
|
||||
assert blob.getName().startsWith(pathStr);
|
||||
final String suffixName = blob.getName().substring(pathStr.length());
|
||||
if (suffixName.isEmpty() == false) {
|
||||
mapBuilder.put(suffixName, new GoogleCloudStorageBlobContainer(path.add(suffixName), this));
|
||||
}
|
||||
}
|
||||
}));
|
||||
return mapBuilder.immutableMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the blob exists in the specific bucket
|
||||
*
|
||||
|
|
|
@ -94,7 +94,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected BlobPath basePath() {
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Options;
|
|||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
|
||||
|
@ -137,12 +138,14 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
|
||||
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
|
||||
path -> prefix == null || path.getName().startsWith(prefix))));
|
||||
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
|
||||
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path,
|
||||
path -> prefix == null || path.getName().startsWith(prefix)));
|
||||
Map<String, BlobMetaData> map = new LinkedHashMap<>();
|
||||
for (FileStatus file : files) {
|
||||
if (file.isFile()) {
|
||||
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
|
@ -151,6 +154,19 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
return listBlobsByPrefix(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path));
|
||||
Map<String, BlobContainer> map = new LinkedHashMap<>();
|
||||
for (FileStatus file : files) {
|
||||
if (file.isDirectory()) {
|
||||
final String name = file.getPath().getName();
|
||||
map.put(name, new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext));
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exists to wrap underlying InputStream methods that might make socket connections in
|
||||
* doPrivileged blocks. This is due to the way that hdfs client libraries might open
|
||||
|
|
|
@ -234,7 +234,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected BlobPath basePath() {
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories.hdfs;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.bootstrap.JavaVersion;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.SecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
|
||||
public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return pluginList(HdfsPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SecureSettings credentials() {
|
||||
return new MockSecureSettings();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createRepository(String repoName) {
|
||||
assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", JavaVersion.current().equals(JavaVersion.parse("11")));
|
||||
AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName)
|
||||
.setType("hdfs")
|
||||
.setSettings(Settings.builder()
|
||||
.put("uri", "hdfs:///")
|
||||
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
|
||||
.put("path", "foo")
|
||||
.put("chunk_size", randomIntBetween(100, 1000) + "k")
|
||||
.put("compress", randomBoolean())
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
}
|
||||
}
|
|
@ -163,7 +163,7 @@ if (useFixture) {
|
|||
def minioAddress = {
|
||||
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
|
||||
assert minioPort > 0
|
||||
return 'http://127.0.0.1:' + minioPort
|
||||
'http://127.0.0.1:' + minioPort
|
||||
}
|
||||
|
||||
File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address')
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
|
|||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.lucene.util.SetOnce;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||
|
@ -50,6 +52,8 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -202,12 +206,15 @@ class S3BlobContainer extends AbstractBlobContainer {
|
|||
final ObjectListing finalPrevListing = prevListing;
|
||||
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
|
||||
} else {
|
||||
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
|
||||
listObjectsRequest.setBucketName(blobStore.bucket());
|
||||
listObjectsRequest.setDelimiter("/");
|
||||
if (blobNamePrefix != null) {
|
||||
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(),
|
||||
buildKey(blobNamePrefix)));
|
||||
listObjectsRequest.setPrefix(buildKey(blobNamePrefix));
|
||||
} else {
|
||||
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), keyPath));
|
||||
listObjectsRequest.setPrefix(keyPath);
|
||||
}
|
||||
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
|
||||
}
|
||||
for (final S3ObjectSummary summary : list.getObjectSummaries()) {
|
||||
final String name = summary.getKey().substring(keyPath.length());
|
||||
|
@ -230,6 +237,52 @@ class S3BlobContainer extends AbstractBlobContainer {
|
|||
return listBlobsByPrefix(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
|
||||
ObjectListing prevListing = null;
|
||||
final Map<String, BlobContainer> entries = new HashMap<>();
|
||||
while (true) {
|
||||
ObjectListing list;
|
||||
if (prevListing != null) {
|
||||
final ObjectListing finalPrevListing = prevListing;
|
||||
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
|
||||
} else {
|
||||
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
|
||||
listObjectsRequest.setBucketName(blobStore.bucket());
|
||||
listObjectsRequest.setPrefix(keyPath);
|
||||
listObjectsRequest.setDelimiter("/");
|
||||
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
|
||||
}
|
||||
for (final String summary : list.getCommonPrefixes()) {
|
||||
final String name = summary.substring(keyPath.length());
|
||||
if (name.isEmpty() == false) {
|
||||
// Stripping the trailing slash off of the common prefix
|
||||
final String last = name.substring(0, name.length() - 1);
|
||||
final BlobPath path = path().add(last);
|
||||
entries.put(last, blobStore.blobContainer(path));
|
||||
}
|
||||
}
|
||||
assert list.getObjectSummaries().stream().noneMatch(s -> {
|
||||
for (String commonPrefix : list.getCommonPrefixes()) {
|
||||
if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}) : "Response contained children for listed common prefixes.";
|
||||
if (list.isTruncated()) {
|
||||
prevListing = list;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(entries);
|
||||
} catch (final AmazonClientException e) {
|
||||
throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildKey(String blobName) {
|
||||
return keyPath + blobName;
|
||||
}
|
||||
|
|
|
@ -226,12 +226,6 @@ class S3Repository extends BlobStoreRepository {
|
|||
return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetaData);
|
||||
}
|
||||
|
||||
// only use for testing
|
||||
@Override
|
||||
protected BlobStore blobStore() {
|
||||
return super.blobStore();
|
||||
}
|
||||
|
||||
// only use for testing
|
||||
@Override
|
||||
protected BlobStore getBlobStore() {
|
||||
|
@ -239,7 +233,7 @@ class S3Repository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected BlobPath basePath() {
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.SecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -28,6 +30,8 @@ import org.elasticsearch.test.StreamsUtils;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.blankOrNullString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -56,7 +60,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
|||
protected void createRepository(String repoName) {
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put("bucket", System.getProperty("test.s3.bucket"))
|
||||
.put("base_path", System.getProperty("test.s3.base", "/"));
|
||||
.put("base_path", System.getProperty("test.s3.base", "testpath"));
|
||||
final String endpointPath = System.getProperty("test.s3.endpoint");
|
||||
if (endpointPath != null) {
|
||||
try {
|
||||
|
@ -70,4 +74,18 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
|||
.setSettings(settings).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
|
||||
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
|
||||
// to become consistent.
|
||||
assertBusy(() -> super.assertBlobsByPrefix(path, prefix, blobs), 10L, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertChildren(BlobPath path, Collection<String> children) throws Exception {
|
||||
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
|
||||
// to become consistent.
|
||||
assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,6 +158,16 @@ public interface BlobContainer {
|
|||
*/
|
||||
Map<String, BlobMetaData> listBlobs() throws IOException;
|
||||
|
||||
/**
|
||||
* Lists all child containers under this container. A child container is defined as a container whose {@link #path()} method returns
|
||||
* a path that has this containers {@link #path()} return as its prefix and has one more path element than the current
|
||||
* container's path.
|
||||
*
|
||||
* @return Map of name of the child container to child container
|
||||
* @throws IOException on failure to list child containers
|
||||
*/
|
||||
Map<String, BlobContainer> children() throws IOException;
|
||||
|
||||
/**
|
||||
* Lists all blobs in the container that match the specified prefix.
|
||||
*
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.common.blobstore.fs;
|
||||
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||
|
@ -73,9 +74,22 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
return listBlobsByPrefix(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
Map<String, BlobContainer> builder = new HashMap<>();
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
|
||||
for (Path file : stream) {
|
||||
if (Files.isDirectory(file)) {
|
||||
final String name = file.getFileName().toString();
|
||||
builder.put(name, new FsBlobContainer(blobStore, path().add(name), file));
|
||||
}
|
||||
}
|
||||
}
|
||||
return unmodifiableMap(builder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||
// If we get duplicate files we should just take the last entry
|
||||
Map<String, BlobMetaData> builder = new HashMap<>();
|
||||
|
||||
blobNamePrefix = blobNamePrefix == null ? "" : blobNamePrefix;
|
||||
|
|
|
@ -285,6 +285,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
public ThreadPool threadPool() {
|
||||
return threadPool;
|
||||
}
|
||||
|
||||
// package private, only use for testing
|
||||
BlobContainer getBlobContainer() {
|
||||
return blobContainer.get();
|
||||
|
@ -316,9 +320,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
/**
|
||||
* maintains single lazy instance of {@link BlobStore}
|
||||
* Maintains single lazy instance of {@link BlobStore}.
|
||||
* Public for testing.
|
||||
*/
|
||||
protected BlobStore blobStore() {
|
||||
public BlobStore blobStore() {
|
||||
assertSnapshotOrGenericThread();
|
||||
|
||||
BlobStore store = blobStore.get();
|
||||
|
@ -351,7 +356,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
/**
|
||||
* Returns base path of the repository
|
||||
*/
|
||||
protected abstract BlobPath basePath();
|
||||
public abstract BlobPath basePath();
|
||||
|
||||
/**
|
||||
* Returns true if metadata and snapshot files should be compressed
|
||||
|
|
|
@ -125,7 +125,7 @@ public class FsRepository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected BlobPath basePath() {
|
||||
public BlobPath basePath() {
|
||||
return basePath;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,11 @@ public class BlobContainerWrapper implements BlobContainer {
|
|||
return delegate.listBlobs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
return delegate.children();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||
return delegate.listBlobsByPrefix(blobNamePrefix);
|
||||
|
|
|
@ -52,6 +52,7 @@ import java.security.MessageDigest;
|
|||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
@ -338,6 +339,15 @@ public class MockRepository extends FsRepository {
|
|||
return super.listBlobs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
final Map<String, BlobContainer> res = new HashMap<>();
|
||||
for (Map.Entry<String, BlobContainer> entry : super.children().entrySet()) {
|
||||
res.put(entry.getKey(), new MockBlobContainer(entry.getValue()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobNamePrefix);
|
||||
|
|
|
@ -18,12 +18,34 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.common.settings.SecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
|
@ -41,10 +63,34 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
|||
|
||||
protected abstract void createRepository(String repoName);
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
createRepository("test-repo");
|
||||
final BlobStoreRepository repo = getRepository();
|
||||
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
|
||||
repo.threadPool().generic().execute(new ActionRunnable<Void>(future) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
deleteContents(repo.blobStore().blobContainer(repo.basePath()));
|
||||
future.onResponse(null);
|
||||
}
|
||||
});
|
||||
future.actionGet();
|
||||
assertChildren(repo.basePath(), Collections.emptyList());
|
||||
}
|
||||
|
||||
private static void deleteContents(BlobContainer container) throws IOException {
|
||||
final List<String> toDelete = new ArrayList<>();
|
||||
for (Map.Entry<String, BlobContainer> child : container.children().entrySet()) {
|
||||
deleteContents(child.getValue());
|
||||
toDelete.add(child.getKey());
|
||||
}
|
||||
toDelete.addAll(container.listBlobs().keySet());
|
||||
container.deleteBlobsIgnoringIfNotExists(toDelete);
|
||||
}
|
||||
|
||||
public void testCreateSnapshot() {
|
||||
createRepository("test-repo");
|
||||
|
||||
createIndex("test-idx-1");
|
||||
createIndex("test-idx-2");
|
||||
createIndex("test-idx-3");
|
||||
|
@ -86,6 +132,75 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
|||
.prepareDeleteSnapshot("test-repo", snapshotName)
|
||||
.get()
|
||||
.isAcknowledged());
|
||||
}
|
||||
|
||||
public void testListChildren() throws Exception {
|
||||
final BlobStoreRepository repo = getRepository();
|
||||
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
|
||||
final Executor genericExec = repo.threadPool().generic();
|
||||
final int testBlobLen = randomIntBetween(1, 100);
|
||||
genericExec.execute(new ActionRunnable<Void>(future) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final BlobStore blobStore = repo.blobStore();
|
||||
blobStore.blobContainer(repo.basePath().add("foo"))
|
||||
.writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
|
||||
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
|
||||
.writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
|
||||
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
|
||||
.writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
|
||||
future.onResponse(null);
|
||||
}
|
||||
});
|
||||
future.actionGet();
|
||||
assertChildren(repo.basePath(), Collections.singleton("foo"));
|
||||
assertBlobsByPrefix(repo.basePath(), "fo", Collections.emptyMap());
|
||||
assertChildren(repo.basePath().add("foo"), Arrays.asList("nested", "nested2"));
|
||||
assertBlobsByPrefix(repo.basePath().add("foo"), "nest",
|
||||
Collections.singletonMap("nested-blob", new PlainBlobMetaData("nested-blob", testBlobLen)));
|
||||
assertChildren(repo.basePath().add("foo").add("nested"), Collections.emptyList());
|
||||
}
|
||||
|
||||
protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
|
||||
final PlainActionFuture<Map<String, BlobMetaData>> future = PlainActionFuture.newFuture();
|
||||
final BlobStoreRepository repository = getRepository();
|
||||
repository.threadPool().generic().execute(new ActionRunnable<Map<String, BlobMetaData>>(future) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final BlobStore blobStore = repository.blobStore();
|
||||
future.onResponse(blobStore.blobContainer(path).listBlobsByPrefix(prefix));
|
||||
}
|
||||
});
|
||||
Map<String, BlobMetaData> foundBlobs = future.actionGet();
|
||||
if (blobs.isEmpty()) {
|
||||
assertThat(foundBlobs.keySet(), empty());
|
||||
} else {
|
||||
assertThat(foundBlobs.keySet(), containsInAnyOrder(blobs.keySet().toArray(Strings.EMPTY_ARRAY)));
|
||||
for (Map.Entry<String, BlobMetaData> entry : foundBlobs.entrySet()) {
|
||||
assertEquals(entry.getValue().length(), blobs.get(entry.getKey()).length());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertChildren(BlobPath path, Collection<String> children) throws Exception {
|
||||
final PlainActionFuture<Set<String>> future = PlainActionFuture.newFuture();
|
||||
final BlobStoreRepository repository = getRepository();
|
||||
repository.threadPool().generic().execute(new ActionRunnable<Set<String>>(future) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final BlobStore blobStore = repository.blobStore();
|
||||
future.onResponse(blobStore.blobContainer(path).children().keySet());
|
||||
}
|
||||
});
|
||||
Set<String> foundChildren = future.actionGet();
|
||||
if (children.isEmpty()) {
|
||||
assertThat(foundChildren, empty());
|
||||
} else {
|
||||
assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY)));
|
||||
}
|
||||
}
|
||||
|
||||
private BlobStoreRepository getRepository() {
|
||||
return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue