listBlobsByPrefix uses a wrong path
When listing existing blobs for an azure repository, `path` to look at is incorrectly computed which leads to 404 errors. Closes #26. (cherry picked from commit 656fadc)
This commit is contained in:
parent
d613f5f097
commit
6046f172c8
|
@ -57,7 +57,7 @@ public interface AzureStorageService {
|
|||
|
||||
InputStream getInputStream(String container, String blob) throws ServiceException;
|
||||
|
||||
ImmutableMap<String,BlobMetaData> listBlobsByPrefix(String container, String prefix) throws URISyntaxException, StorageException, ServiceException;
|
||||
ImmutableMap<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException;
|
||||
|
||||
void putObject(String container, String blob, InputStream is, long length) throws URISyntaxException, StorageException, IOException;
|
||||
|
||||
|
|
|
@ -113,7 +113,14 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
|
|||
@Override
|
||||
public void removeContainer(String container) throws URISyntaxException, StorageException {
|
||||
CloudBlobContainer blob_container = client.getContainerReference(container);
|
||||
blob_container.delete();
|
||||
// TODO Should we set some timeout and retry options?
|
||||
/*
|
||||
BlobRequestOptions options = new BlobRequestOptions();
|
||||
options.setTimeoutIntervalInMs(1000);
|
||||
options.setRetryPolicyFactory(new RetryNoRetry());
|
||||
blob_container.deleteIfExists(options, null);
|
||||
*/
|
||||
blob_container.deleteIfExists();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -188,13 +195,13 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
|
|||
}
|
||||
|
||||
@Override
|
||||
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String prefix) throws URISyntaxException, StorageException, ServiceException {
|
||||
logger.debug("listBlobsByPrefix container [{}], prefix [{}]", container, prefix);
|
||||
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException {
|
||||
logger.debug("listBlobsByPrefix container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix);
|
||||
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
|
||||
|
||||
CloudBlobContainer blob_container = client.getContainerReference(container);
|
||||
if (blob_container.exists()) {
|
||||
Iterable<ListBlobItem> blobs = blob_container.listBlobs(prefix);
|
||||
Iterable<ListBlobItem> blobs = blob_container.listBlobs(keyPath + prefix);
|
||||
for (ListBlobItem blob : blobs) {
|
||||
URI uri = blob.getUri();
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -202,7 +209,7 @@ public class AzureStorageServiceImpl extends AbstractLifecycleComponent<AzureSto
|
|||
}
|
||||
String blobpath = uri.getPath().substring(container.length() + 1);
|
||||
BlobProperties properties = service.getBlobProperties(container, blobpath).getProperties();
|
||||
String name = uri.getPath().substring(prefix.length());
|
||||
String name = blobpath.substring(keyPath.length() + 1);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("blob url [{}], name [{}], size [{}]", uri, name, properties.getContentLength());
|
||||
}
|
||||
|
|
|
@ -111,24 +111,18 @@ public class AbstractAzureBlobContainer extends AbstractBlobContainer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
|
||||
final String prefix;
|
||||
if (blobNamePrefix != null) {
|
||||
prefix = buildKey(blobNamePrefix);
|
||||
} else {
|
||||
prefix = keyPath;
|
||||
}
|
||||
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
|
||||
|
||||
try {
|
||||
return blobStore.client().listBlobsByPrefix(blobStore.container(), prefix);
|
||||
return blobStore.client().listBlobsByPrefix(blobStore.container(), keyPath, prefix);
|
||||
} catch (URISyntaxException e) {
|
||||
logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage());
|
||||
logger.warn("can not access [{}] in container {{}}: {}", prefix, blobStore.container(), e.getMessage());
|
||||
throw new IOException(e);
|
||||
} catch (StorageException e) {
|
||||
logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage());
|
||||
logger.warn("can not access [{}] in container {{}}: {}", prefix, blobStore.container(), e.getMessage());
|
||||
throw new IOException(e);
|
||||
} catch (ServiceException e) {
|
||||
logger.warn("can not access [{}] in container {{}}: {}", blobNamePrefix, blobStore.container(), e.getMessage());
|
||||
logger.warn("can not access [{}] in container {{}}: {}", prefix, blobStore.container(), e.getMessage());
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResp
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.ClusterAdminClient;
|
||||
import org.elasticsearch.cloud.azure.AbstractAzureTest;
|
||||
import org.elasticsearch.cloud.azure.AzureStorageService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -55,14 +56,18 @@ import static org.hamcrest.Matchers.*;
|
|||
@AbstractAzureTest.AzureTest
|
||||
@ElasticsearchIntegrationTest.ClusterScope(
|
||||
scope = ElasticsearchIntegrationTest.Scope.SUITE,
|
||||
numDataNodes = 2,
|
||||
numDataNodes = 1,
|
||||
transportClientRatio = 0.0)
|
||||
public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
||||
|
||||
private final String basePath;
|
||||
private String getRepositoryPath() {
|
||||
String testName = "/snapshot-itest/repo-".concat("" + randomIntBetween(1, 1000));
|
||||
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
|
||||
}
|
||||
|
||||
public AzureSnapshotRestoreITest() {
|
||||
basePath = "/snapshot-itest/repo-" + randomInt();
|
||||
private String getContainerName() {
|
||||
String testName = "it-".concat(Strings.toUnderscoreCase(getTestName()).replaceAll("_", "-"));
|
||||
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,23 +83,29 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
|||
@Before
|
||||
public final void wipeBefore() throws StorageException, ServiceException, URISyntaxException {
|
||||
wipeRepositories();
|
||||
cleanRepositoryFiles(basePath);
|
||||
cleanRepositoryFiles(
|
||||
getContainerName(),
|
||||
getContainerName().concat("-1"),
|
||||
getContainerName().concat("-2"));
|
||||
}
|
||||
|
||||
@After
|
||||
public final void wipeAfter() throws StorageException, ServiceException, URISyntaxException {
|
||||
wipeRepositories();
|
||||
cleanRepositoryFiles(basePath);
|
||||
cleanRepositoryFiles(
|
||||
getContainerName(),
|
||||
getContainerName().concat("-1"),
|
||||
getContainerName().concat("-2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleWorkflow() {
|
||||
Client client = client();
|
||||
logger.info("--> creating azure repository with path [{}]", basePath);
|
||||
logger.info("--> creating azure repository with path [{}]", getRepositoryPath());
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration")
|
||||
.put(AzureStorageService.Fields.BASE_PATH, basePath)
|
||||
.put(AzureStorageService.Fields.CONTAINER, getContainerName())
|
||||
.put(AzureStorageService.Fields.BASE_PATH, getRepositoryPath())
|
||||
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
@ -163,18 +174,18 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
|||
@Test
|
||||
public void testMultipleRepositories() {
|
||||
Client client = client();
|
||||
logger.info("--> creating azure repository with path [{}]", basePath);
|
||||
logger.info("--> creating azure repository with path [{}]", getRepositoryPath());
|
||||
PutRepositoryResponse putRepositoryResponse1 = client.admin().cluster().preparePutRepository("test-repo1")
|
||||
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration1")
|
||||
.put(AzureStorageService.Fields.BASE_PATH, basePath)
|
||||
.put(AzureStorageService.Fields.CONTAINER, getContainerName().concat("-1"))
|
||||
.put(AzureStorageService.Fields.BASE_PATH, getRepositoryPath())
|
||||
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse1.isAcknowledged(), equalTo(true));
|
||||
PutRepositoryResponse putRepositoryResponse2 = client.admin().cluster().preparePutRepository("test-repo2")
|
||||
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration2")
|
||||
.put(AzureStorageService.Fields.BASE_PATH, basePath)
|
||||
.put(AzureStorageService.Fields.CONTAINER, getContainerName().concat("-2"))
|
||||
.put(AzureStorageService.Fields.BASE_PATH, getRepositoryPath())
|
||||
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse2.isAcknowledged(), equalTo(true));
|
||||
|
@ -226,6 +237,65 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
|||
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* For issue #26: https://github.com/elasticsearch/elasticsearch-cloud-azure/issues/26
|
||||
*/
|
||||
@Test
|
||||
public void testListBlobs_26() throws StorageException, ServiceException, URISyntaxException {
|
||||
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
|
||||
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
|
||||
}
|
||||
refresh();
|
||||
|
||||
ClusterAdminClient client = client().admin().cluster();
|
||||
logger.info("--> creating azure repository without any path");
|
||||
PutRepositoryResponse putRepositoryResponse = client.preparePutRepository("test-repo").setType("azure")
|
||||
.setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, getContainerName())
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
// Get all snapshots - should be empty
|
||||
assertThat(client.prepareGetSnapshots("test-repo").get().getSnapshots().size(), equalTo(0));
|
||||
|
||||
logger.info("--> snapshot");
|
||||
CreateSnapshotResponse createSnapshotResponse = client.prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
|
||||
// Get all snapshots - should have one
|
||||
assertThat(client.prepareGetSnapshots("test-repo").get().getSnapshots().size(), equalTo(1));
|
||||
|
||||
// Clean the snapshot
|
||||
client.prepareDeleteSnapshot("test-repo", "test-snap").get();
|
||||
client.prepareDeleteRepository("test-repo").get();
|
||||
|
||||
logger.info("--> creating azure repository path [{}]", getRepositoryPath());
|
||||
putRepositoryResponse = client.preparePutRepository("test-repo").setType("azure")
|
||||
.setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, getContainerName())
|
||||
.put(AzureStorageService.Fields.BASE_PATH, getRepositoryPath())
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
// Get all snapshots - should be empty
|
||||
assertThat(client.prepareGetSnapshots("test-repo").get().getSnapshots().size(), equalTo(0));
|
||||
|
||||
logger.info("--> snapshot");
|
||||
createSnapshotResponse = client.prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
|
||||
// Get all snapshots - should have one
|
||||
assertThat(client.prepareGetSnapshots("test-repo").get().getSnapshots().size(), equalTo(1));
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* For issue #21: https://github.com/elasticsearch/elasticsearch-cloud-azure/issues/21
|
||||
*/
|
||||
|
@ -254,7 +324,7 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
|||
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, container)
|
||||
.put(AzureStorageService.Fields.BASE_PATH, basePath)
|
||||
.put(AzureStorageService.Fields.BASE_PATH, getRepositoryPath())
|
||||
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), is(correct));
|
||||
|
@ -273,11 +343,11 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
|||
@Test
|
||||
public void testNonExistingRepo_23() {
|
||||
Client client = client();
|
||||
logger.info("--> creating azure repository with path [{}]", basePath);
|
||||
logger.info("--> creating azure repository with path [{}]", getRepositoryPath());
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("azure").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put(AzureStorageService.Fields.CONTAINER, "elasticsearch-integration")
|
||||
.put(AzureStorageService.Fields.BASE_PATH, basePath)
|
||||
.put(AzureStorageService.Fields.CONTAINER, getContainerName())
|
||||
.put(AzureStorageService.Fields.BASE_PATH, getRepositoryPath())
|
||||
.put(AzureStorageService.Fields.CHUNK_SIZE, randomIntBetween(1000, 10000))
|
||||
).get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
@ -309,17 +379,13 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Purge the test container
|
||||
* Purge the test containers
|
||||
*/
|
||||
public void cleanRepositoryFiles(String path) throws StorageException, ServiceException, URISyntaxException {
|
||||
String container = internalCluster().getInstance(Settings.class).get("repositories.azure.container",
|
||||
AzureRepository.CONTAINER_DEFAULT);
|
||||
logger.info("--> remove blobs in container [{}], path [{}]", container, path);
|
||||
public void cleanRepositoryFiles(String... containers) throws StorageException, ServiceException, URISyntaxException {
|
||||
AzureStorageService client = internalCluster().getInstance(AzureStorageService.class);
|
||||
|
||||
// Remove starting / if any
|
||||
path = Strings.trimLeadingCharacter(path, '/');
|
||||
|
||||
client.deleteFiles(container, path);
|
||||
for (String container : containers) {
|
||||
logger.info("--> remove container [{}]", container);
|
||||
client.removeContainer(container);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ public class AzureStorageServiceMock extends AbstractLifecycleComponent<AzureSto
|
|||
}
|
||||
|
||||
@Override
|
||||
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String prefix) {
|
||||
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) {
|
||||
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
|
||||
for (String blobName : blobs.keySet()) {
|
||||
if (Strings.startsWithIgnoreCase(blobName, prefix)) {
|
||||
|
|
Loading…
Reference in New Issue