Use Azure upload method instead of our own implementation (#26751)

* Use Azure upload method instead of our own implementation

We are not following the Azure documentation about uploading blobs to Azure storage. https://docs.microsoft.com/en-us/azure/storage/blobs/storage-java-how-to-use-blob-storage#upload-a-blob-into-a-container

Instead we are using our own implementation which might cause some troubles and rarely some blobs can be not immediately commited just after we close the stream. Using the standard implementation provided by Azure team should allow us to benefit from all the magic Azure SDK team already wrote.

And well... Let's just read the doc!

* Adapt integration tests to secure settings

That was a missing part in #23405.

* Simplify all the integration tests and *extends ESBlobStoreRepositoryIntegTestCase tests

    * removes IT `testForbiddenContainerName()` as it is useless. The plugin does not create anymore the container but expects that the user has created it before registering the repository
   * merges 2 IT classes so all IT tests are ran from one single class
   * We don't remove/create anymore the container between each single test but only for the test suite
This commit is contained in:
David Pilato 2017-09-28 13:15:37 +02:00 committed by GitHub
parent 25d6778d31
commit 1ccb497c0d
12 changed files with 175 additions and 593 deletions

View File

@ -26,13 +26,10 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.repositories.RepositoryException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException; import java.nio.file.FileAlreadyExistsException;
@ -96,24 +93,11 @@ public class AzureBlobContainer extends AbstractBlobContainer {
if (blobExists(blobName)) { if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
} }
logger.trace("writeBlob({}, stream, {})", blobName, blobSize); logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
try (OutputStream stream = createOutput(blobName)) {
Streams.copy(inputStream, stream);
}
}
private OutputStream createOutput(String blobName) throws IOException {
try { try {
return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName))); blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
} catch (StorageException e) { } catch (URISyntaxException|StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new IOException("Can not write blob " + blobName, e);
throw new NoSuchFileException(e.getMessage());
}
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new RepositoryException(repositoryName, e.getMessage());
} }
} }

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -124,11 +123,6 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
return this.client.getInputStream(this.clientName, this.locMode, container, blob); return this.client.getInputStream(this.clientName, this.locMode, container, blob);
} }
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException
{
return this.client.getOutputStream(this.clientName, this.locMode, container, blob);
}
public Map<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) public Map<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix)
throws URISyntaxException, StorageException { throws URISyntaxException, StorageException {
return this.client.listBlobsByPrefix(this.clientName, this.locMode, container, keyPath, prefix); return this.client.listBlobsByPrefix(this.clientName, this.locMode, container, keyPath, prefix);
@ -138,4 +132,8 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
{ {
this.client.moveBlob(this.clientName, this.locMode, container, sourceBlob, targetBlob); this.client.moveBlob(this.clientName, this.locMode, container, sourceBlob, targetBlob);
} }
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException {
this.client.writeBlob(this.clientName, this.locMode, container, blobName, inputStream, blobSize);
}
} }

View File

@ -1,46 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import java.io.IOException;
import java.io.OutputStream;
public class AzureOutputStream extends OutputStream {
private final OutputStream blobOutputStream;
public AzureOutputStream(OutputStream blobOutputStream) {
this.blobOutputStream = blobOutputStream;
}
@Override
public void write(int b) throws IOException {
blobOutputStream.write(b);
}
@Override
public void close() throws IOException {
try {
blobOutputStream.close();
} catch (IOException e) {
// Azure is sending a "java.io.IOException: Stream is already closed."
}
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map; import java.util.Map;
@ -55,12 +54,12 @@ public interface AzureStorageService {
InputStream getInputStream(String account, LocationMode mode, String container, String blob) InputStream getInputStream(String account, LocationMode mode, String container, String blob)
throws URISyntaxException, StorageException, IOException; throws URISyntaxException, StorageException, IOException;
OutputStream getOutputStream(String account, LocationMode mode, String container, String blob)
throws URISyntaxException, StorageException;
Map<String,BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) Map<String,BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException; throws URISyntaxException, StorageException;
void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob) void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob)
throws URISyntaxException, StorageException; throws URISyntaxException, StorageException;
void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws
URISyntaxException, StorageException;
} }

View File

@ -34,9 +34,6 @@ import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem; import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.repositories.azure.AzureStorageService;
import org.elasticsearch.repositories.azure.AzureStorageSettings;
import org.elasticsearch.repositories.azure.SocketAccess;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
@ -45,7 +42,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.EnumSet; import java.util.EnumSet;
@ -261,15 +257,6 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
return SocketAccess.doPrivilegedException(() -> blockBlobReference.openInputStream(null, null, generateOperationContext(account))); return SocketAccess.doPrivilegedException(() -> blockBlobReference.openInputStream(null, null, generateOperationContext(account)));
} }
@Override
public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob)
throws URISyntaxException, StorageException {
logger.trace("writing container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
return SocketAccess.doPrivilegedException(() -> blockBlobReference.openOutputStream(null, null, generateOperationContext(account)));
}
@Override @Override
public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException { throws URISyntaxException, StorageException {
@ -319,4 +306,15 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob); logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
} }
} }
@Override
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
throws URISyntaxException, StorageException {
logger.trace("writeBlob({}, stream, {})", blobName, blobSize);
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, null, null, generateOperationContext(account)));
logger.trace("writeBlob({}, stream, {}) - done", blobName, blobSize);
}
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.SpecialPermission; import org.elasticsearch.SpecialPermission;
import java.io.IOException;
import java.net.SocketPermission; import java.net.SocketPermission;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.AccessController; import java.security.AccessController;
@ -66,7 +67,7 @@ public final class SocketAccess {
@FunctionalInterface @FunctionalInterface
public interface StorageRunnable { public interface StorageRunnable {
void executeCouldThrow() throws StorageException, URISyntaxException; void executeCouldThrow() throws StorageException, URISyntaxException, IOException;
} }
} }

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.repositories.azure.AzureRepositoryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays;
import java.util.Collection;
/**
* Base class for Azure tests.
*/
public abstract class AbstractAzureIntegTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(AzureRepositoryPlugin.class);
}
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import java.util.Arrays;
import java.util.Collection;
import static org.elasticsearch.repositories.azure.AzureTestUtils.readSettingsFromFile;
/**
* Base class for Azure tests that require credentials.
* <p>
* You must specify {@code -Dtests.thirdparty=true -Dtests.config=/path/to/config}
* in order to run these tests.
*/
@ThirdParty
public abstract class AbstractAzureWithThirdPartyIntegTestCase extends AbstractAzureIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(readSettingsFromFile())
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(AzureRepositoryPlugin.class);
}
}

View File

@ -1,118 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.junit.After;
import org.junit.Before;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.repositories.azure.AzureTestUtils.readSettingsFromFile;
import static org.elasticsearch.repositories.azure.AzureSnapshotRestoreTests.getContainerName;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
* This test needs Azure to run and -Dtests.thirdparty=true to be set
* and -Dtests.config=/path/to/elasticsearch.yml
*
* Note that this test requires an Azure storage account, with the account
* and credentials set in the elasticsearch.yml config file passed in to the
* test. The Azure storage account type must be a Read-access geo-redundant
* storage (RA-GRS) account.
*
* @see AbstractAzureWithThirdPartyIntegTestCase
*/
@ClusterScope(
scope = ESIntegTestCase.Scope.SUITE,
supportsDedicatedMasters = false, numDataNodes = 1,
transportClientRatio = 0.0)
public class AzureSnapshotRestoreListSnapshotsTests extends AbstractAzureWithThirdPartyIntegTestCase {
private final AzureStorageService azureStorageService = new AzureStorageServiceImpl(readSettingsFromFile(),
AzureStorageSettings.load(readSettingsFromFile()));
private final String containerName = getContainerName();
public AzureSnapshotRestoreListSnapshotsTests() {
}
public void testList() throws Exception {
Client client = client();
logger.info("--> creating azure primary repository");
PutRepositoryResponse putRepositoryResponsePrimary = client.admin().cluster().preparePutRepository("primary")
.setType("azure").setSettings(Settings.builder()
.put(Repository.CONTAINER_SETTING.getKey(), containerName)
).get();
assertThat(putRepositoryResponsePrimary.isAcknowledged(), equalTo(true));
logger.info("--> start get snapshots on primary");
long startWait = System.currentTimeMillis();
client.admin().cluster().prepareGetSnapshots("primary").get();
long endWait = System.currentTimeMillis();
// definitely should be done in 30s, and if its not working as expected, it takes over 1m
assertThat(endWait - startWait, lessThanOrEqualTo(30000L));
logger.info("--> creating azure secondary repository");
PutRepositoryResponse putRepositoryResponseSecondary = client.admin().cluster().preparePutRepository("secondary")
.setType("azure").setSettings(Settings.builder()
.put(Repository.CONTAINER_SETTING.getKey(), containerName)
.put(Repository.LOCATION_MODE_SETTING.getKey(), "secondary_only")
).get();
assertThat(putRepositoryResponseSecondary.isAcknowledged(), equalTo(true));
logger.info("--> start get snapshots on secondary");
startWait = System.currentTimeMillis();
client.admin().cluster().prepareGetSnapshots("secondary").get();
endWait = System.currentTimeMillis();
logger.info("--> end of get snapshots on secondary. Took {} ms", endWait - startWait);
assertThat(endWait - startWait, lessThanOrEqualTo(30000L));
}
@Before
public void createContainer() throws Exception {
// It could happen that we run this test really close to a previous one
// so we might need some time to be able to create the container
assertBusy(() -> {
try {
azureStorageService.createContainer(null, LocationMode.PRIMARY_ONLY, containerName);
} catch (URISyntaxException e) {
// Incorrect URL. This should never happen.
fail();
} catch (StorageException e) {
// It could happen. Let's wait for a while.
fail();
}
}, 30, TimeUnit.SECONDS);
}
@After
public void removeContainer() throws Exception {
azureStorageService.removeContainer(null, LocationMode.PRIMARY_ONLY, containerName);
}
}

View File

@ -36,37 +36,102 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.azure.AzureRepository.Repository; import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.store.MockFSIndexStore;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.net.UnknownHostException;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.repositories.azure.AzureTestUtils.readSettingsFromFile; import static org.elasticsearch.repositories.azure.AzureTestUtils.generateMockSecureSettings;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/** /**
* This test needs Azure to run and -Dtests.thirdparty=true to be set * Those integration tests need an Azure access and must be run with
* and -Dtests.config=/path/to/elasticsearch.yml * {@code -Dtests.thirdparty=true -Dtests.azure.account=AzureStorageAccount -Dtests.azure.key=AzureStorageKey}
* @see AbstractAzureWithThirdPartyIntegTestCase * options
*/ */
@ClusterScope( @ClusterScope(
scope = ESIntegTestCase.Scope.SUITE, scope = ESIntegTestCase.Scope.SUITE,
supportsDedicatedMasters = false, numDataNodes = 1, supportsDedicatedMasters = false, numDataNodes = 1,
transportClientRatio = 0.0) transportClientRatio = 0.0)
public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegTestCase { @ThirdParty
public class AzureSnapshotRestoreTests extends ESBlobStoreRepositoryIntegTestCase {
private static Settings.Builder generateMockSettings() {
return Settings.builder().setSecureSettings(generateMockSecureSettings());
}
private static final AzureStorageService azureStorageService = new AzureStorageServiceImpl(generateMockSettings().build(),
AzureStorageSettings.load(generateMockSettings().build()));
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return generateMockSettings()
.put(super.nodeSettings(nodeOrdinal))
.build();
}
private static String getContainerName() {
/* Have a different name per test so that there is no possible race condition. As the long can be negative,
* there mustn't be a hyphen between the 2 concatenated numbers
* (can't have 2 consecutives hyphens on Azure containers)
*/
String testName = "snapshot-itest-"
.concat(RandomizedTest.getContext().getRunnerSeedAsString().toLowerCase(Locale.ROOT));
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
}
@BeforeClass
public static void createTestContainers() throws Exception {
createTestContainer(getContainerName());
// This is needed for testMultipleRepositories() test case
createTestContainer(getContainerName() + "-1");
createTestContainer(getContainerName() + "-2");
}
@AfterClass
public static void removeContainer() throws Exception {
removeTestContainer(getContainerName());
// This is needed for testMultipleRepositories() test case
removeTestContainer(getContainerName() + "-1");
removeTestContainer(getContainerName() + "-2");
}
/**
* Create a test container in Azure
* @param containerName container name to use
*/
private static void createTestContainer(String containerName) throws Exception {
// It could happen that we run this test really close to a previous one
// so we might need some time to be able to create the container
assertBusy(() -> {
azureStorageService.createContainer("default", LocationMode.PRIMARY_ONLY, containerName);
}, 30, TimeUnit.SECONDS);
}
/**
* Remove a test container in Azure
* @param containerName container name to use
*/
private static void removeTestContainer(String containerName) throws URISyntaxException, StorageException {
azureStorageService.removeContainer("default", LocationMode.PRIMARY_ONLY, containerName);
}
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
@ -78,16 +143,6 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName; return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
} }
public static String getContainerName() {
/* Have a different name per test so that there is no possible race condition. As the long can be negative,
* there mustn't be a hyphen between the 2 concatenated numbers
* (can't have 2 consecutives hyphens on Azure containers)
*/
String testName = "snapshot-itest-"
.concat(RandomizedTest.getContext().getRunnerSeedAsString().toLowerCase(Locale.ROOT));
return testName.contains(" ") ? Strings.split(testName, " ")[0] : testName;
}
@Override @Override
public Settings indexSettings() { public Settings indexSettings() {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same // During restore we frequently restore index to exactly the same state it was before, that might cause the same
@ -98,157 +153,12 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
.build(); .build();
} }
@Before @After @After
public final void wipeAzureRepositories() throws StorageException, URISyntaxException, UnknownHostException { public final void wipeAzureRepositories() {
wipeRepositories(); try {
cleanRepositoryFiles( client().admin().cluster().prepareDeleteRepository("*").get();
getContainerName(), } catch (RepositoryMissingException ignored) {
getContainerName().concat("-1"),
getContainerName().concat("-2"));
} }
public void testSimpleWorkflow() {
String repo_name = "test-repo-simple";
Client client = client();
logger.info("--> creating azure repository with path [{}]", getRepositoryPath());
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository(repo_name)
.setType("azure").setSettings(Settings.builder()
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
.put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repo_name, "test-snap")
.setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots(repo_name).setSnapshots("test-snap").get().getSnapshots()
.get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repo_name, "test-snap")
.setWaitForCompletion(true).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
cluster().wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repo_name, "test-snap").setWaitForCompletion(true)
.setIndices("test-idx-*", "-test-idx-2").get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
/**
* For issue #51: https://github.com/elastic/elasticsearch-cloud-azure/issues/51
*/
public void testMultipleSnapshots() throws URISyntaxException, StorageException {
final String indexName = "test-idx-1";
final String typeName = "doc";
final String repositoryName = "test-repo-multiple-snapshot";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";
Client client = client();
logger.info("creating index [{}]", indexName);
createIndex(indexName);
ensureGreen();
logger.info("indexing first document");
index(indexName, typeName, Integer.toString(1), "foo", "bar " + Integer.toString(1));
refresh();
assertThat(client.prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo(1L));
logger.info("creating Azure repository with path [{}]", getRepositoryPath());
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName)
.setType("azure").setSettings(Settings.builder()
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
.put(Repository.BASE_PATH_SETTING.getKey(), randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("creating snapshot [{}]", snapshot1Name);
CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1Name)
.setWaitForCompletion(true).setIndices(indexName).get();
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots(repositoryName).setSnapshots(snapshot1Name).get().getSnapshots()
.get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("indexing second document");
index(indexName, typeName, Integer.toString(2), "foo", "bar " + Integer.toString(2));
refresh();
assertThat(client.prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo(2L));
logger.info("creating snapshot [{}]", snapshot2Name);
CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2Name)
.setWaitForCompletion(true).setIndices(indexName).get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots(repositoryName).setSnapshots(snapshot2Name).get().getSnapshots()
.get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("closing index [{}]", indexName);
client.admin().indices().prepareClose(indexName).get();
logger.info("attempting restore from snapshot [{}]", snapshot1Name);
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repositoryName, snapshot1Name)
.setWaitForCompletion(true).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo(1L));
} }
public void testMultipleRepositories() { public void testMultipleRepositories() {
@ -381,8 +291,6 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
// Get all snapshots - should have one // Get all snapshots - should have one
assertThat(client.prepareGetSnapshots(repositoryName).get().getSnapshots().size(), equalTo(1)); assertThat(client.prepareGetSnapshots(repositoryName).get().getSnapshots().size(), equalTo(1));
} }
/** /**
@ -413,57 +321,6 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
} }
} }
/**
* For issue #21: https://github.com/elastic/elasticsearch-cloud-azure/issues/21
*/
public void testForbiddenContainerName() throws Exception {
checkContainerName("", false);
checkContainerName("es", false);
checkContainerName("-elasticsearch", false);
checkContainerName("elasticsearch--integration", false);
checkContainerName("elasticsearch_integration", false);
checkContainerName("ElAsTicsearch_integration", false);
checkContainerName("123456789-123456789-123456789-123456789-123456789-123456789-1234", false);
checkContainerName("123456789-123456789-123456789-123456789-123456789-123456789-123", true);
checkContainerName("elasticsearch-integration", true);
checkContainerName("elasticsearch-integration-007", true);
}
/**
* Create repository with wrong or correct container name
* @param container Container name we want to create
* @param correct Is this container name correct
*/
private void checkContainerName(final String container, final boolean correct) throws Exception {
String repositoryName = "test-repo-checkContainerName";
logger.info("--> creating azure repository with container name [{}]", container);
// It could happen that we just removed from a previous test the same container so
// we can not create it yet.
assertBusy(() -> {
try {
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repositoryName)
.setType("azure").setSettings(Settings.builder()
.put(Repository.CONTAINER_SETTING.getKey(), container)
.put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
.put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
).get();
client().admin().cluster().prepareDeleteRepository(repositoryName).get();
try {
logger.info("--> remove container [{}]", container);
cleanRepositoryFiles(container);
} catch (StorageException | URISyntaxException | UnknownHostException ignored) {
// We can ignore that as we just try to clean after the test
}
assertTrue(putRepositoryResponse.isAcknowledged() == correct);
} catch (RepositoryVerificationException e) {
if (correct) {
logger.debug(" -> container is being removed. Let's wait a bit...");
fail();
}
}
}, 5, TimeUnit.MINUTES);
}
/** /**
* Test case for issue #23: https://github.com/elastic/elasticsearch-cloud-azure/issues/23 * Test case for issue #23: https://github.com/elastic/elasticsearch-cloud-azure/issues/23
*/ */
@ -493,24 +350,9 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
*/ */
public void testRemoveAndCreateContainer() throws Exception { public void testRemoveAndCreateContainer() throws Exception {
final String container = getContainerName().concat("-testremove"); final String container = getContainerName().concat("-testremove");
final AzureStorageService storageService = new AzureStorageServiceImpl(nodeSettings(0),AzureStorageSettings.load(nodeSettings(0)));
// It could happen that we run this test really close to a previous one createTestContainer(container);
// so we might need some time to be able to create the container removeTestContainer(container);
assertBusy(() -> {
try {
storageService.createContainer(null, LocationMode.PRIMARY_ONLY, container);
logger.debug(" -> container created...");
} catch (URISyntaxException e) {
// Incorrect URL. This should never happen.
fail();
} catch (StorageException e) {
// It could happen. Let's wait for a while.
logger.debug(" -> container is being removed. Let's wait a bit...");
fail();
}
}, 30, TimeUnit.SECONDS);
storageService.removeContainer(null, LocationMode.PRIMARY_ONLY, container);
ClusterAdminClient client = client().admin().cluster(); ClusterAdminClient client = client().admin().cluster();
logger.info("--> creating azure repository while container is being removed"); logger.info("--> creating azure repository while container is being removed");
@ -526,30 +368,52 @@ public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegT
} }
/** /**
* Deletes repositories, supports wildcard notation. * Test that you can snapshot on the primary repository and list the available snapshots
* from the secondary repository.
*
* Note that this test requires an Azure storage account which must be a Read-access geo-redundant
* storage (RA-GRS) account type.
* @throws Exception If anything goes wrong
*/ */
public static void wipeRepositories(String... repositories) { public void testGeoRedundantStorage() throws Exception {
// if nothing is provided, delete all Client client = client();
if (repositories.length == 0) { logger.info("--> creating azure primary repository");
repositories = new String[]{"*"}; PutRepositoryResponse putRepositoryResponsePrimary = client.admin().cluster().preparePutRepository("primary")
} .setType("azure").setSettings(Settings.builder()
for (String repository : repositories) { .put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
try { ).get();
client().admin().cluster().prepareDeleteRepository(repository).get(); assertThat(putRepositoryResponsePrimary.isAcknowledged(), equalTo(true));
} catch (RepositoryMissingException ex) {
// ignore logger.info("--> start get snapshots on primary");
} long startWait = System.currentTimeMillis();
} client.admin().cluster().prepareGetSnapshots("primary").get();
long endWait = System.currentTimeMillis();
// definitely should be done in 30s, and if its not working as expected, it takes over 1m
assertThat(endWait - startWait, lessThanOrEqualTo(30000L));
logger.info("--> creating azure secondary repository");
PutRepositoryResponse putRepositoryResponseSecondary = client.admin().cluster().preparePutRepository("secondary")
.setType("azure").setSettings(Settings.builder()
.put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
.put(Repository.LOCATION_MODE_SETTING.getKey(), "secondary_only")
).get();
assertThat(putRepositoryResponseSecondary.isAcknowledged(), equalTo(true));
logger.info("--> start get snapshots on secondary");
startWait = System.currentTimeMillis();
client.admin().cluster().prepareGetSnapshots("secondary").get();
endWait = System.currentTimeMillis();
logger.info("--> end of get snapshots on secondary. Took {} ms", endWait - startWait);
assertThat(endWait - startWait, lessThanOrEqualTo(30000L));
} }
/** @Override
* Purge the test containers protected void createTestRepository(String name) {
*/ assertAcked(client().admin().cluster().preparePutRepository(name)
public void cleanRepositoryFiles(String... containers) throws StorageException, URISyntaxException, UnknownHostException { .setType(AzureRepository.TYPE)
Settings settings = readSettingsFromFile(); .setSettings(Settings.builder()
AzureStorageService client = new AzureStorageServiceImpl(settings, AzureStorageSettings.load(settings)); .put(Repository.CONTAINER_SETTING.getKey(), getContainerName())
for (String container : containers) { .put(Repository.BASE_PATH_SETTING.getKey(), getRepositoryPath())
client.removeContainer(null, LocationMode.PRIMARY_ONLY, container); .put(Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
}
} }
} }

View File

@ -25,14 +25,13 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.azure.AzureStorageService;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.Locale; import java.util.Locale;
@ -85,14 +84,6 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
return new ByteArrayInputStream(blobs.get(blob).toByteArray()); return new ByteArrayInputStream(blobs.get(blob).toByteArray());
} }
@Override
public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob)
throws URISyntaxException, StorageException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
blobs.put(blob, outputStream);
return outputStream;
}
@Override @Override
public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) { public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) {
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder(); MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
@ -123,6 +114,17 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
} }
} }
@Override
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
throws URISyntaxException, StorageException {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
blobs.put(blobName, outputStream);
Streams.copy(inputStream, outputStream);
} catch (IOException e) {
throw new StorageException("MOCK", "Error while writing mock stream", e);
}
}
/** /**
* Test if the given String starts with the specified prefix, * Test if the given String starts with the specified prefix,
* ignoring upper/lower case. * ignoring upper/lower case.

View File

@ -20,36 +20,27 @@
package org.elasticsearch.repositories.azure; package org.elasticsearch.repositories.azure;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.SettingsException;
import java.io.IOException;
public class AzureTestUtils { public class AzureTestUtils {
/** /**
* Read settings from file when running integration tests with ThirdParty annotation. * Mock secure settings from sysprops when running integration tests with ThirdParty annotation.
* elasticsearch.yml file path has to be set with -Dtests.config=/path/to/elasticsearch.yml. * Start the tests with {@code -Dtests.azure.account=AzureStorageAccount and -Dtests.azure.key=AzureStorageKey}
* @return Settings from elasticsearch.yml integration test file (for 3rd party tests) * @return Mock Settings from sysprops
*/ */
public static Settings readSettingsFromFile() { public static SecureSettings generateMockSecureSettings() {
Settings.Builder settings = Settings.builder(); MockSecureSettings secureSettings = new MockSecureSettings();
// if explicit, just load it and don't load from env if (Strings.isEmpty(System.getProperty("tests.azure.account")) ||
try { Strings.isEmpty(System.getProperty("tests.azure.key"))) {
if (Strings.hasText(System.getProperty("tests.config"))) {
try {
settings.loadFromPath(PathUtils.get((System.getProperty("tests.config"))));
} catch (IOException e) {
throw new IllegalArgumentException("could not load azure tests config", e);
}
} else {
throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and " + throw new IllegalStateException("to run integration tests, you need to set -Dtests.thirdparty=true and " +
"-Dtests.config=/path/to/elasticsearch.yml"); "-Dtests.azure.account=azure-account -Dtests.azure.key=azure-key");
} }
} catch (SettingsException exception) {
throw new IllegalStateException("your test configuration file is incorrect: " + System.getProperty("tests.config"), exception); secureSettings.setString("azure.client.default.account", System.getProperty("tests.azure.account"));
} secureSettings.setString("azure.client.default.key", System.getProperty("tests.azure.key"));
return settings.build();
return secureSettings;
} }
} }